Tag: kafka

  • Read data from and write data to Kafka using PySpark

    Spark has an extensive documentation at their website under the heading – Structured Streaming + Kafka Integration Guide.

    Read data from Kafka – Batch

    Use spark.readstream.format(“kafka”) for streaming query.

    # Subscribe to 1 topic, with headers
    df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
      .option("subscribe", "topic1") \
      .option("includeHeaders", "true") \
      .load()
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

    Use case – Kafka batch queries for storing and handing web server access log files

    • ingest server log files and create a dataframe
    • send data in chunks to Kafka
    • verify data is stored in Kafka as expected
    • Kafka server is up and running and a topic is available for storing and reading
    • a dataframe containing web server access logs data

    Ingest server log files and create a dataframe

    Send data in chunks to Kafka

    # function to write data to kafka 
    def kafka_write_data(df,hosts,topic):
      (df
        .select(F.monotonically_increasing_id().cast('string').alias('key'),F.to_json(F.struct([F.col(c).alias(c) for c in df.columns])).alias("value"))
        .write
        .format("kafka")
        .option("kafka.bootstrap.servers", hosts)
        .option("topic", topic)
        .save()
        )
      return True
    # save data in chunks
    # Define the number of splits you want
    n_splits = 4
    
    # Calculate count of each dataframe rows
    each_len = df.count() // n_splits
    
    # Create a copy of original dataframe
    copy_df = df
    
    # Iterate for each dataframe
    i = 0
    while i < n_splits:
      # Get the top `each_len` number of rows
      temp_df = copy_df.limit(each_len)
    
      # Truncate the `copy_df` to remove
      # the contents fetched for `temp_df`
      copy_df = copy_df.subtract(temp_df)
    
      # View the dataframe
      temp_df.show(10,truncate=False)
      kafka_write_data(temp_df,"broadoakdata.uk:9092","apache2")
    
      # Increment the split number
      i += 1

    Verify data in Kafka

    # only 1 partition
    def kafka_read_data(spark,hosts,topic,topic_start,tpoic_end):
      df = (spark
        .read
        .format("kafka")
        .option("kafka.bootstrap.servers", hosts)
        .option("subscribe", topic)
        .option("startingOffsets", topic_start)
        .option("endingOffsets", topic_end)
        .load()
        )
      return df
    # set hosts and topic 
    hosts = "broadoakdata.uk:9092"
    topic = "apache2"
    topic_start = '{"'+topic+'":{"0":81147}}'
    topic_end = '{"'+topic+'":{"0":91147}}'
    # get data
    df_read = kafka_read_data(spark,hosts,topic,topic_start,topic_end)
    df_read = df_readselectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    df_read.show(20,truncate=False)
    reading data from kafka

    It seems my web server was getting unwanted requests from number of clients. In particular they were calling ‘xmlrpc.php’ via POST method. I needed to find how many calls per day.

    # filter only endpoint=='/xmlrpc.php'
    codes_by_date_sorted_df = (df.filter(df.endpoint=='/xmlrpc.php')
                                    .groupBy(F.dayofmonth(F.to_date(F.substring(F.col('datetime'),0,11),'dd/MMM/yyyy')).alias('day'))
                                    .count()
                                    .sort("day"))
    
    codes_by_date_sorted_pd_df = codes_by_date_sorted_df.toPandas()
    codes_by_date_sorted_pd_df
    import plotly.express as px
    px.bar(
      codes_by_date_sorted_pd_df,
      x="day",
      color="day",
      y="count",
      barmode='group'
    )