Category: ETL

  • Create CompaniesHouse index in Elasticsearch using PySpark

    We are using Spark – 3.1.2 (spark._sc.version). Elasticsearch (7.9.3) running on a docker container with port 9200 is being exposed to host.

    Perquisites

    1. get elasticsearch-spark-30_2.12-7.12.0.jar and add it to spark-jar classpath
    2. read companieshouse data into a dataframe
    3. write dataframe to elasticsearch

    Code snippets listed below

    # create dataframe
    df = spark.read.parquet('spark-warehouse/ch202003_parquet')
    
    # save data into elasticsearch
    df.write.format("org.elasticsearch.spark.sql")\
    .option("es.nodes.wan.only","true")\
    .option("es.port","9200")\
    .option("es.nodes", "127.0.0.1")\
    .mode("Overwrite")\
    .save("companieshouse/crn")
    
    # read data from elasticsearch into a dataframe
    df = spark.read.format("org.elasticsearch.spark.sql")\
        .option("es.nodes.wan.only","true")\
        .option("es.port","9200")\
        .option("es.nodes", "127.0.0.1")\
        .load("companieshouse/crn")
    showing data from e alsticsearch