We will use Apache Kafka and Apache Flink to process data from Companies House Stream API . First, we will setup Apache Kafka and Flink in Google Colab platform. You can download the script to install both software from here – https://gitlab.com/akkasali910/companieshouse-data .
Or you can use the following bash codes:
##bash
# get latest flink
wget https://dlcdn.apache.org/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
tar -xzf flink-1.18.0-bin-scala_2.12.tgz
# kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
# start kafka
./kafka_2.13-3.6.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.0/config/zookeeper.properties
./kafka_2.13-3.6.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.0/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
#start flink clusster
flink-1.18.0/bin/start-cluster.sh &
# create two topics
./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic ello1
./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic ello2
# get Flink SQL connector
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar
# python packages
pip -q install apache_beam apache_flink kafka-python
After running the above codes. It will install both Apache Kafka and Flink and run them in order (start Kafka then Flink). It will also install the following python packages:
- apache_beam
- apache_flink
- kafka-python
Writing events to Kafka topic
The code below will generate few thousands of records and publish them Kafka topic.
def producer(topic):
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
)
for i in range(5,7000):
num = {"number": i}
producer.send(topic, num)
# send data to Kafka
from kafka import KafkaProducer
import json
# send data
topic_name = 'ello1'
producer(topic_name)
Reading data from Kafka topic
Use Python KafkaConsumer method to read data from Kafka topic and save the output as Pandas dataframe.
# def read_from_kafka(topic_name) - works
from kafka import KafkaConsumer
from time import sleep
import pandas as pd
topic_name = 'ello1'
consumer = KafkaConsumer(topic_name,
auto_offset_reset='earliest',
bootstrap_servers=['localhost:9092'],
api_version=(0, 10),
consumer_timeout_ms=1000)
l = []
for msg in consumer:
#print(msg.value)
l.append(msg.value)
consumer.close()
sleep(5)
df = pd.DataFrame(l)
df.head()
Use of Flink SQL and Kafka Connector
Use Flink SQL to create a table with Kafka as data source. You need to download Flink and SQL Connector jar file before running the SQL script.
%%writefile k.sql
SET sql-client.execution.result-mode=TABLEAU;
CREATE TABLE KafkaTable (
`number` INT,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'ello1',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
select * from KafkaTable limit 10;
# invoke Flink SQL shell with input file
/content/flink-1.18.0/bin/sql-client.sh -j flink-sql-connector-kafka-3.0.2-1.18.jar -f k.sql
Installing Apache Kafka on RaspberryPI
Follow the following steps:
- Download latest version from Apache Kafka website
- Edit server.properties and zookeeper.proprties files and set directory for logs and snapshot data on zookeeper
- open the port 9092 if needed for accessing outside local environment
- create few topics for testing your deployment
- update your software on RaspberryPI – sudo apt update
setting kafka
Edit server.properties file
# listeners = PLAINTEXT://your.host.name:9092
# listeners=PLAINTEXT://broadoakdata.uk:9092
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
# A comma separated list of directories under which to store log files
log.dirs=/home/pi/kafka_2.13-3.6.0/data/kafka-logs
Edit zookeeper.properties file
# the directory where the snapshot is stored.
dataDir=/home/pi/kafka_2.13-3.6.0/data/zookeeper
start zookeeper and then kafka server
# !/bin/bash
./kafka_2.13-3.6.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.0/config/zookeeper.properties
./kafka_2.13-3.6.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.0/config/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic ello
./kafka_2.13-3.6.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic ello
Reading CSV file in chunk and sending rows to Kafka
Initialise Kafka Producer
# Import packages
import pandas as pd
import json
import datetime as dt
from time import sleep
from kafka import KafkaProducer
# Initialize Kafka Producer Client
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}')
Send data to Kafka
# Set a basic message counter and define the file path
counter = 0
file = "/content/sample_data/california_housing_train.csv"
for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=100):
# Set the counter as the message key
key = str(counter).encode()
# Convert the data frame chunk into a dictionary
chunkd = chunk.to_dict()
# Encode the dictionary into a JSON Byte Array
data = json.dumps(chunkd, default=str).encode('utf-8')
# Send the data to Kafka
producer.send(topic="ello", key=key, value=data)
# Sleep to simulate a real-world interval
sleep(0.5)
# Increment the message counter for the message key
counter = counter + 1
print(f'Sent record to topic at time {dt.datetime.utcnow()}')
Getting data from Kafka
from kafka import KafkaConsumer
consumer = (KafkaConsumer('ello-test',
auto_offset_reset='earliest',
bootstrap_servers=['localhost:9092'],
consumer_timeout_ms=1000)
)
for msg in consumer:
print (msg)