Tag: kafka

  • Streaming data processing using Apache Kafka and Flink

    We will use Apache Kafka and Apache Flink to process data from Companies House Stream API . First, we will setup Apache Kafka and Flink in Google Colab platform. You can download the script to install both software from here – https://gitlab.com/akkasali910/companieshouse-data .

    Or you can use the following bash codes:

    ##bash
    # get latest flink
    wget  https://dlcdn.apache.org/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
    tar -xzf flink-1.18.0-bin-scala_2.12.tgz
    
    # kafka
    wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
    tar -xzf kafka_2.13-3.6.0.tgz
    
    # start kafka
    ./kafka_2.13-3.6.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.0/config/zookeeper.properties
    ./kafka_2.13-3.6.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.0/config/server.properties
    echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
    sleep 10
    
    #start flink clusster
    flink-1.18.0/bin/start-cluster.sh &
    
    # create two topics
    ./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic ello1
    ./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic ello2
    
    # get Flink SQL connector
    wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar
    
    # python packages
    pip -q install apache_beam apache_flink kafka-python
    

    After running the above codes. It will install both Apache Kafka and Flink and run them in order (start Kafka then Flink). It will also install the following python packages:

    • apache_beam
    • apache_flink
    • kafka-python

    Writing events to Kafka topic

    The code below will generate few thousands of records and publish them Kafka topic.

    def producer(topic):
      producer = KafkaProducer(
          bootstrap_servers=['localhost:9092'],
          value_serializer=lambda x: json.dumps(x).encode('utf-8'),
      )
      for i in range(5,7000):
        num = {"number": i}
        producer.send(topic, num)
    
    # send data to Kafka
    from kafka import KafkaProducer
    import json
    # send data
    topic_name = 'ello1'
    producer(topic_name)

    Reading data from Kafka topic

    Use Python KafkaConsumer method to read data from Kafka topic and save the output as Pandas dataframe.

    # def read_from_kafka(topic_name) - works
    from kafka import KafkaConsumer
    from time import sleep
    import pandas as pd
    
    topic_name = 'ello1'
    consumer = KafkaConsumer(topic_name, 
                            auto_offset_reset='earliest',
                            bootstrap_servers=['localhost:9092'], 
                            api_version=(0, 10), 
                            consumer_timeout_ms=1000)
    l = []
    for msg in consumer:
      #print(msg.value)
      l.append(msg.value)
    consumer.close()
    sleep(5)
    df = pd.DataFrame(l)
    df.head()

    Use of Flink SQL and Kafka Connector

    Use Flink SQL to create a table with Kafka as data source. You need to download Flink and SQL Connector jar file before running the SQL script.

    %%writefile k.sql
    SET sql-client.execution.result-mode=TABLEAU;
    
    CREATE TABLE KafkaTable (
    `number` INT,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'ello1',
    'properties.bootstrap.servers' = 'localhost:9092',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
    );
    
    select * from KafkaTable limit 10;
    
    # invoke Flink SQL shell with input file
    /content/flink-1.18.0/bin/sql-client.sh -j flink-sql-connector-kafka-3.0.2-1.18.jar -f k.sql
    Apache Kafka and Flink in Action

    Installing Apache Kafka on RaspberryPI

    Follow the following steps:

    • Download latest version from Apache Kafka website
    • Edit server.properties and zookeeper.proprties files and set directory for logs and snapshot data on zookeeper
    • open the port 9092 if needed for accessing outside local environment
    • create few topics for testing your deployment
    • update your software on RaspberryPI – sudo apt update

    setting kafka

    Edit server.properties file

    #     listeners = PLAINTEXT://your.host.name:9092
    # listeners=PLAINTEXT://broadoakdata.uk:9092
    
    # Listener name, hostname and port the broker will advertise to clients.
    # If not set, it uses the value for "listeners".
    advertised.listeners=PLAINTEXT://localhost:9092
    
    # A comma separated list of directories under which to store log files
    log.dirs=/home/pi/kafka_2.13-3.6.0/data/kafka-logs
    
    Edit zookeeper.properties file
    
    # the directory where the snapshot is stored.
    dataDir=/home/pi/kafka_2.13-3.6.0/data/zookeeper

    start zookeeper and then kafka server

    # !/bin/bash
    
    ./kafka_2.13-3.6.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.0/config/zookeeper.properties
    ./kafka_2.13-3.6.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.0/config/server.properties
    echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
    sleep 10
    
    ./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic ello
    ./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic ello

    Reading CSV file in chunk and sending rows to Kafka

    Initialise Kafka Producer

    # Import packages
    import pandas as pd
    import json
    import datetime as dt
    from time import sleep
    from kafka import KafkaProducer
    
    # Initialize Kafka Producer Client
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    print(f'Initialized Kafka producer at {dt.datetime.utcnow()}')

    Send data to Kafka

    # Set a basic message counter and define the file path
    counter = 0
    file = "/content/sample_data/california_housing_train.csv"
    
    for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=100):
      # Set the counter as the message key
      key = str(counter).encode()
      # Convert the data frame chunk into a dictionary
      chunkd = chunk.to_dict()
      # Encode the dictionary into a JSON Byte Array
      data = json.dumps(chunkd, default=str).encode('utf-8')
      # Send the data to Kafka
      producer.send(topic="ello", key=key, value=data)
      # Sleep to simulate a real-world interval
      sleep(0.5)
      # Increment the message counter for the message key
      counter = counter + 1
      print(f'Sent record to topic at time {dt.datetime.utcnow()}')

    Getting data from Kafka

    from kafka import KafkaConsumer
    consumer = (KafkaConsumer('ello-test',
                             auto_offset_reset='earliest',
                             bootstrap_servers=['localhost:9092'],
                             consumer_timeout_ms=1000)
    )
    for msg in consumer:
      print (msg)