Using Apache Flink to process data from Kafka and MySQL database

Written by

in

Need to read data from MySQL database, perform aggregations on data and saving the result sets in Kafka.

Approach

I have followed the same approach as described in my article – https://broadoakdata.uk/using-apache-flink-to-process-apache-web-log-files/ for basic setup.

For this use case, there are few additional libraries are needed – MySQL and Flink MySQL connectors:

  • flink-connector-jdbc-3.1.1-1.17.jar’
  • mysql-connector-java-8.0.30.jar
  • flink-sql-connector-kafka-3.0.2-1.18.jar
  • Set up execution environment (in Spark we use SparkContext)
  • Create Source and Sink – input and output data sources
  • Specify transformation/business logic on your datasets
  • Specify where the store the results of computations (Sink)
  • Trigger the computation – Flink uses Lazy Evaluations methods
  • Flink is available (I am using pyFlink – apache-flink)
  • Access to MySQL Server and Kafka
  • Platform to carryout your development – I am using Google CoLab

# get flink and mysql connectors
%%bash
pip install apache-flink kafka-python

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
wget wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar

# create execution environment 
from pyflink.table import EnvironmentSettings, TableEnvironment
# or create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
tbl_env = TableEnvironment.create(env_settings)
(tbl_env.get_config()
            .get_configuration()
            .set_string("pipeline.jars", \
                        "file:///content/flink-sql-connector-kafka-3.0.2-1.18.jar,\
                         file:///content/mysql-connector-java-8.0.30.jar,\
                         file:///content/flink-connector-jdbc-3.1.1-1.17.jar")
)

stmt = """
CREATE TABLE invoice_report (
	id INT,
  headquarter STRING,
  invoice_month STRING,
  invoice_year INT,
  customer STRING,
  invoice_no INT,
  invoice_value FLOAT
  ) WITH (
    'connector' = 'kafka',
    'topic'     = 'invoice-report',
    'properties.group.id' = 'report-group',   
    'properties.bootstrap.servers' = 'kafka:9092',
    'format'    = 'json'
  );
 """

mysql_stmt = """
 CREATE TABLE invoice (
	id INT,
  headquarter STRING,
  invoice_month STRING,
  invoice_year INT,
  customer STRING,
  invoice_no INT,
  invoice_value FLOAT
  ) WITH (
    'connector'  = 'jdbc',
    'url'        = 'jdbc:mysql://db4free.net:3306/dfatabase',
    'table-name' = 'invoice',
    'driver'     = 'com.mysql.jdbc.Driver',
    'username'   = 'userid',
    'password'   = 'password'
  );
"""
# create PyTables
invoice = tbl_env.execute_sql(stmt)
mysql_invoice = tbl_env.execute_sql(mysql_stmt)

# need to create PyFlink table and update the sink
result = tbl_env.from_path("invoice")
result.execute_insert("invoice_report")

# aggregate data and create PyFlink table
out = tbl_env.sql_query("""
select headquarter,
invoice_year,invoice_month,invoice_sum
from (
select headquarter,
invoice_year,invoice_month,
sum(invoice_value) as invoice_sum 
from invoice
group by headquarter,
invoice_year,invoice_month
)
""")

# convert PyFlink Table to pandas dataframe
# Note - as GroupBy generate update stream therefore we cannot use PyFlink Table (kafka)
df = out.to_pandas()

# workaround - convert pandas datafame to PyTable again
out_t = tbl_env.from_pandas(df)

# insert data into sink (kafka)
out_t.execute_insert("invoice_sum")
sum of invoice