Tag: flink

  • Using Apache Flink to process data from Kafka and MySQL database

    Need to read data from MySQL database, perform aggregations on data and saving the result sets in Kafka.

    Approach

    I have followed the same approach as described in my article – https://broadoakdata.uk/using-apache-flink-to-process-apache-web-log-files/ for basic setup.

    For this use case, there are few additional libraries are needed – MySQL and Flink MySQL connectors:

    • flink-connector-jdbc-3.1.1-1.17.jar’
    • mysql-connector-java-8.0.30.jar
    • flink-sql-connector-kafka-3.0.2-1.18.jar
    • Set up execution environment (in Spark we use SparkContext)
    • Create Source and Sink – input and output data sources
    • Specify transformation/business logic on your datasets
    • Specify where the store the results of computations (Sink)
    • Trigger the computation – Flink uses Lazy Evaluations methods
    • Flink is available (I am using pyFlink – apache-flink)
    • Access to MySQL Server and Kafka
    • Platform to carryout your development – I am using Google CoLab
    
    # get flink and mysql connectors
    %%bash
    pip install apache-flink kafka-python
    
    wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
    wget wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/flink-connector-jdbc-3.1.1-1.17.jar
    wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar
    
    # create execution environment 
    from pyflink.table import EnvironmentSettings, TableEnvironment
    # or create a batch TableEnvironment
    env_settings = EnvironmentSettings.in_batch_mode()
    tbl_env = TableEnvironment.create(env_settings)
    (tbl_env.get_config()
                .get_configuration()
                .set_string("pipeline.jars", \
                            "file:///content/flink-sql-connector-kafka-3.0.2-1.18.jar,\
                             file:///content/mysql-connector-java-8.0.30.jar,\
                             file:///content/flink-connector-jdbc-3.1.1-1.17.jar")
    )
    
    stmt = """
    CREATE TABLE invoice_report (
    	id INT,
      headquarter STRING,
      invoice_month STRING,
      invoice_year INT,
      customer STRING,
      invoice_no INT,
      invoice_value FLOAT
      ) WITH (
        'connector' = 'kafka',
        'topic'     = 'invoice-report',
        'properties.group.id' = 'report-group',   
        'properties.bootstrap.servers' = 'kafka:9092',
        'format'    = 'json'
      );
     """
    
    mysql_stmt = """
     CREATE TABLE invoice (
    	id INT,
      headquarter STRING,
      invoice_month STRING,
      invoice_year INT,
      customer STRING,
      invoice_no INT,
      invoice_value FLOAT
      ) WITH (
        'connector'  = 'jdbc',
        'url'        = 'jdbc:mysql://db4free.net:3306/dfatabase',
        'table-name' = 'invoice',
        'driver'     = 'com.mysql.jdbc.Driver',
        'username'   = 'userid',
        'password'   = 'password'
      );
    """
    # create PyTables
    invoice = tbl_env.execute_sql(stmt)
    mysql_invoice = tbl_env.execute_sql(mysql_stmt)
    
    # need to create PyFlink table and update the sink
    result = tbl_env.from_path("invoice")
    result.execute_insert("invoice_report")
    
    # aggregate data and create PyFlink table
    out = tbl_env.sql_query("""
    select headquarter,
    invoice_year,invoice_month,invoice_sum
    from (
    select headquarter,
    invoice_year,invoice_month,
    sum(invoice_value) as invoice_sum 
    from invoice
    group by headquarter,
    invoice_year,invoice_month
    )
    """)
    
    # convert PyFlink Table to pandas dataframe
    # Note - as GroupBy generate update stream therefore we cannot use PyFlink Table (kafka)
    df = out.to_pandas()
    
    # workaround - convert pandas datafame to PyTable again
    out_t = tbl_env.from_pandas(df)
    
    # insert data into sink (kafka)
    out_t.execute_insert("invoice_sum")
    
    sum of invoice