I need to use Apache Flink to process data which stored in Kafka and MySql. In my previous article I shared my notes on how to use a free MySQL server (db4free.net) instance for development work.
Apache Flink is good processing engine and has nice features for manipulating data using Batch and/or Streaming processing. I am not sure how how it will perform on big datasets.
A simple use case
Need to read data from MySQL database, perform aggregations on data and saving the result sets in Kafka.
Approach
I have followed the same approach as described in my article – https://broadoakdata.uk/using-apache-flink-to-process-apache-web-log-files/ for basic setup.
For this use case, there are few additional libraries are needed – MySQL and Flink MySQL connectors:
- flink-connector-jdbc-3.1.1-1.17.jar’
- mysql-connector-java-8.0.30.jar
- flink-sql-connector-kafka-3.0.2-1.18.jar
Structure of a Flink Applicaation
- Set up execution environment (in Spark we use SparkContext)
- Create Source and Sink – input and output data sources
- Specify transformation/business logic on your datasets
- Specify where the store the results of computations (Sink)
- Trigger the computation – Flink uses Lazy Evaluations methods
Prerequisites
- Flink is available (I am using pyFlink – apache-flink)
- Access to MySQL Server and Kafka
- Platform to carryout your development – I am using Google CoLab
Python Code snippets
# get flink and mysql connectors
%%bash
pip install apache-flink kafka-python
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
wget wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar
# create execution environment
from pyflink.table import EnvironmentSettings, TableEnvironment
# or create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
tbl_env = TableEnvironment.create(env_settings)
(tbl_env.get_config()
.get_configuration()
.set_string("pipeline.jars", \
"file:///content/flink-sql-connector-kafka-3.0.2-1.18.jar,\
file:///content/mysql-connector-java-8.0.30.jar,\
file:///content/flink-connector-jdbc-3.1.1-1.17.jar")
)
stmt = """
CREATE TABLE invoice_report (
id INT,
headquarter STRING,
invoice_month STRING,
invoice_year INT,
customer STRING,
invoice_no INT,
invoice_value FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'invoice-report',
'properties.group.id' = 'report-group',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
"""
mysql_stmt = """
CREATE TABLE invoice (
id INT,
headquarter STRING,
invoice_month STRING,
invoice_year INT,
customer STRING,
invoice_no INT,
invoice_value FLOAT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://db4free.net:3306/dfatabase',
'table-name' = 'invoice',
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'userid',
'password' = 'password'
);
"""
# create PyTables
invoice = tbl_env.execute_sql(stmt)
mysql_invoice = tbl_env.execute_sql(mysql_stmt)
# need to create PyFlink table and update the sink
result = tbl_env.from_path("invoice")
result.execute_insert("invoice_report")
# aggregate data and create PyFlink table
out = tbl_env.sql_query("""
select headquarter,
invoice_year,invoice_month,invoice_sum
from (
select headquarter,
invoice_year,invoice_month,
sum(invoice_value) as invoice_sum
from invoice
group by headquarter,
invoice_year,invoice_month
)
""")
# convert PyFlink Table to pandas dataframe
# Note - as GroupBy generate update stream therefore we cannot use PyFlink Table (kafka)
df = out.to_pandas()
# workaround - convert pandas datafame to PyTable again
out_t = tbl_env.from_pandas(df)
# insert data into sink (kafka)
out_t.execute_insert("invoice_sum")