Using Apache Flink to process data from Kafka and MySQL database

Need to read data from MySQL database, perform aggregations on data and saving the result sets in Kafka.

Approach

I have followed the same approach as described in my article – https://broadoakdata.uk/using-apache-flink-to-process-apache-web-log-files/ for basic setup.

For this use case, there are few additional libraries are needed – MySQL and Flink MySQL connectors:

  • flink-connector-jdbc-3.1.1-1.17.jar’
  • mysql-connector-java-8.0.30.jar
  • flink-sql-connector-kafka-3.0.2-1.18.jar
  • Set up execution environment (in Spark we use SparkContext)
  • Create Source and Sink – input and output data sources
  • Specify transformation/business logic on your datasets
  • Specify where the store the results of computations (Sink)
  • Trigger the computation – Flink uses Lazy Evaluations methods
  • Flink is available (I am using pyFlink – apache-flink)
  • Access to MySQL Server and Kafka
  • Platform to carryout your development – I am using Google CoLab

# get flink and mysql connectors
%%bash
pip install apache-flink kafka-python

wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
wget wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar

# create execution environment 
from pyflink.table import EnvironmentSettings, TableEnvironment
# or create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
tbl_env = TableEnvironment.create(env_settings)
(tbl_env.get_config()
            .get_configuration()
            .set_string("pipeline.jars", \
                        "file:///content/flink-sql-connector-kafka-3.0.2-1.18.jar,\
                         file:///content/mysql-connector-java-8.0.30.jar,\
                         file:///content/flink-connector-jdbc-3.1.1-1.17.jar")
)

stmt = """
CREATE TABLE invoice_report (
	id INT,
  headquarter STRING,
  invoice_month STRING,
  invoice_year INT,
  customer STRING,
  invoice_no INT,
  invoice_value FLOAT
  ) WITH (
    'connector' = 'kafka',
    'topic'     = 'invoice-report',
    'properties.group.id' = 'report-group',   
    'properties.bootstrap.servers' = 'kafka:9092',
    'format'    = 'json'
  );
 """

mysql_stmt = """
 CREATE TABLE invoice (
	id INT,
  headquarter STRING,
  invoice_month STRING,
  invoice_year INT,
  customer STRING,
  invoice_no INT,
  invoice_value FLOAT
  ) WITH (
    'connector'  = 'jdbc',
    'url'        = 'jdbc:mysql://db4free.net:3306/dfatabase',
    'table-name' = 'invoice',
    'driver'     = 'com.mysql.jdbc.Driver',
    'username'   = 'userid',
    'password'   = 'password'
  );
"""
# create PyTables
invoice = tbl_env.execute_sql(stmt)
mysql_invoice = tbl_env.execute_sql(mysql_stmt)

# need to create PyFlink table and update the sink
result = tbl_env.from_path("invoice")
result.execute_insert("invoice_report")

# aggregate data and create PyFlink table
out = tbl_env.sql_query("""
select headquarter,
invoice_year,invoice_month,invoice_sum
from (
select headquarter,
invoice_year,invoice_month,
sum(invoice_value) as invoice_sum 
from invoice
group by headquarter,
invoice_year,invoice_month
)
""")

# convert PyFlink Table to pandas dataframe
# Note - as GroupBy generate update stream therefore we cannot use PyFlink Table (kafka)
df = out.to_pandas()

# workaround - convert pandas datafame to PyTable again
out_t = tbl_env.from_pandas(df)

# insert data into sink (kafka)
out_t.execute_insert("invoice_sum")
sum of invoice