Streaming data processing using Apache Kafka and Flink

We will use Apache Kafka and Apache Flink to process data from Companies House Stream API . First, we will setup Apache Kafka and Flink in Google Colab platform. You can download the script to install both software from here – https://gitlab.com/akkasali910/companieshouse-data .

Or you can use the following bash codes:

##bash
# get latest flink
wget  https://dlcdn.apache.org/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
tar -xzf flink-1.18.0-bin-scala_2.12.tgz

# kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz

# start kafka
./kafka_2.13-3.6.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.0/config/zookeeper.properties
./kafka_2.13-3.6.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.0/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10

#start flink clusster
flink-1.18.0/bin/start-cluster.sh &

# create two topics
./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic ello1
./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic ello2

# get Flink SQL connector
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar

# python packages
pip -q install apache_beam apache_flink kafka-python

After running the above codes. It will install both Apache Kafka and Flink and run them in order (start Kafka then Flink). It will also install the following python packages:

  • apache_beam
  • apache_flink
  • kafka-python

Writing events to Kafka topic

The code below will generate few thousands of records and publish them Kafka topic.

def producer(topic):
  producer = KafkaProducer(
      bootstrap_servers=['localhost:9092'],
      value_serializer=lambda x: json.dumps(x).encode('utf-8'),
  )
  for i in range(5,7000):
    num = {"number": i}
    producer.send(topic, num)

# send data to Kafka
from kafka import KafkaProducer
import json
# send data
topic_name = 'ello1'
producer(topic_name)

Reading data from Kafka topic

Use Python KafkaConsumer method to read data from Kafka topic and save the output as Pandas dataframe.

# def read_from_kafka(topic_name) - works
from kafka import KafkaConsumer
from time import sleep
import pandas as pd

topic_name = 'ello1'
consumer = KafkaConsumer(topic_name, 
                        auto_offset_reset='earliest',
                        bootstrap_servers=['localhost:9092'], 
                        api_version=(0, 10), 
                        consumer_timeout_ms=1000)
l = []
for msg in consumer:
  #print(msg.value)
  l.append(msg.value)
consumer.close()
sleep(5)
df = pd.DataFrame(l)
df.head()

Use of Flink SQL and Kafka Connector

Use Flink SQL to create a table with Kafka as data source. You need to download Flink and SQL Connector jar file before running the SQL script.

%%writefile k.sql
SET sql-client.execution.result-mode=TABLEAU;

CREATE TABLE KafkaTable (
`number` INT,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'ello1',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

select * from KafkaTable limit 10;

# invoke Flink SQL shell with input file
/content/flink-1.18.0/bin/sql-client.sh -j flink-sql-connector-kafka-3.0.2-1.18.jar -f k.sql
Apache Kafka and Flink in Action

Installing Apache Kafka on RaspberryPI

Follow the following steps:

  • Download latest version from Apache Kafka website
  • Edit server.properties and zookeeper.proprties files and set directory for logs and snapshot data on zookeeper
  • open the port 9092 if needed for accessing outside local environment
  • create few topics for testing your deployment
  • update your software on RaspberryPI – sudo apt update

setting kafka

Edit server.properties file

#     listeners = PLAINTEXT://your.host.name:9092
# listeners=PLAINTEXT://broadoakdata.uk:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092

# A comma separated list of directories under which to store log files
log.dirs=/home/pi/kafka_2.13-3.6.0/data/kafka-logs

Edit zookeeper.properties file

# the directory where the snapshot is stored.
dataDir=/home/pi/kafka_2.13-3.6.0/data/zookeeper

start zookeeper and then kafka server

# !/bin/bash

./kafka_2.13-3.6.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.0/config/zookeeper.properties
./kafka_2.13-3.6.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.0/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10

./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic ello
./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic ello

Reading CSV file in chunk and sending rows to Kafka

Initialise Kafka Producer

# Import packages
import pandas as pd
import json
import datetime as dt
from time import sleep
from kafka import KafkaProducer

# Initialize Kafka Producer Client
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}')

Send data to Kafka

# Set a basic message counter and define the file path
counter = 0
file = "/content/sample_data/california_housing_train.csv"

for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=100):
  # Set the counter as the message key
  key = str(counter).encode()
  # Convert the data frame chunk into a dictionary
  chunkd = chunk.to_dict()
  # Encode the dictionary into a JSON Byte Array
  data = json.dumps(chunkd, default=str).encode('utf-8')
  # Send the data to Kafka
  producer.send(topic="ello", key=key, value=data)
  # Sleep to simulate a real-world interval
  sleep(0.5)
  # Increment the message counter for the message key
  counter = counter + 1
  print(f'Sent record to topic at time {dt.datetime.utcnow()}')

Getting data from Kafka

from kafka import KafkaConsumer
consumer = (KafkaConsumer('ello-test',
                         auto_offset_reset='earliest',
                         bootstrap_servers=['localhost:9092'],
                         consumer_timeout_ms=1000)
)
for msg in consumer:
  print (msg)