Read data from and write data to Kafka using PySpark

Spark has an extensive documentation at their website under the heading – Structured Streaming + Kafka Integration Guide.

Read data from Kafka – Batch

Use spark.readstream.format(“kafka”) for streaming query.

# Subscribe to 1 topic, with headers
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

Use case – Kafka batch queries for storing and handing web server access log files

  • ingest server log files and create a dataframe
  • send data in chunks to Kafka
  • verify data is stored in Kafka as expected
  • Kafka server is up and running and a topic is available for storing and reading
  • a dataframe containing web server access logs data

Ingest server log files and create a dataframe

Send data in chunks to Kafka

# function to write data to kafka 
def kafka_write_data(df,hosts,topic):
  (df
    .select(F.monotonically_increasing_id().cast('string').alias('key'),F.to_json(F.struct([F.col(c).alias(c) for c in df.columns])).alias("value"))
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", hosts)
    .option("topic", topic)
    .save()
    )
  return True
# save data in chunks
# Define the number of splits you want
n_splits = 4

# Calculate count of each dataframe rows
each_len = df.count() // n_splits

# Create a copy of original dataframe
copy_df = df

# Iterate for each dataframe
i = 0
while i < n_splits:
  # Get the top `each_len` number of rows
  temp_df = copy_df.limit(each_len)

  # Truncate the `copy_df` to remove
  # the contents fetched for `temp_df`
  copy_df = copy_df.subtract(temp_df)

  # View the dataframe
  temp_df.show(10,truncate=False)
  kafka_write_data(temp_df,"broadoakdata.uk:9092","apache2")

  # Increment the split number
  i += 1

Verify data in Kafka

# only 1 partition
def kafka_read_data(spark,hosts,topic,topic_start,tpoic_end):
  df = (spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", hosts)
    .option("subscribe", topic)
    .option("startingOffsets", topic_start)
    .option("endingOffsets", topic_end)
    .load()
    )
  return df
# set hosts and topic 
hosts = "broadoakdata.uk:9092"
topic = "apache2"
topic_start = '{"'+topic+'":{"0":81147}}'
topic_end = '{"'+topic+'":{"0":91147}}'
# get data
df_read = kafka_read_data(spark,hosts,topic,topic_start,topic_end)
df_read = df_readselectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df_read.show(20,truncate=False)
reading data from kafka

It seems my web server was getting unwanted requests from number of clients. In particular they were calling ‘xmlrpc.php’ via POST method. I needed to find how many calls per day.

# filter only endpoint=='/xmlrpc.php'
codes_by_date_sorted_df = (df.filter(df.endpoint=='/xmlrpc.php')
                                .groupBy(F.dayofmonth(F.to_date(F.substring(F.col('datetime'),0,11),'dd/MMM/yyyy')).alias('day'))
                                .count()
                                .sort("day"))

codes_by_date_sorted_pd_df = codes_by_date_sorted_df.toPandas()
codes_by_date_sorted_pd_df
import plotly.express as px
px.bar(
  codes_by_date_sorted_pd_df,
  x="day",
  color="day",
  y="count",
  barmode='group'
)