Tag: PySpark

  • Merge multiple rows sharing id into one row

    UPDATE – 07/07/2022

    It can be achieved using few lines of PySpark codes. See below:

    cols_merge = ['transaction_type','correction_marker', etc...]
    exprs = [F.concat_ws(" ",F.collect_set(col)).alias(col) for col in cols_to_merge]
    df = df.groupBy('company_number').agg(*exprs)
    

    In this article, we will show how to merge multiple rows sharing id into one row using PySpark. We will use Companies House dataset for this article. You may way find previous articles about how to get companies house data using rest api useful.

    Getting data

    Let gets some real data for this exercise. We will use Companies HouseRest API to get filling history data for a company.

    We will follow the steps below to achieve our goals:

    • get data from Companies House using Python requests library
    • extract data from response
    • save json output into a dataframe
    • create a temporary table to transform data and merge multiple rows sharing id into one row

    Algorithm for merging rows into a single row

    Lets look into few records. We want to merge records with same date into one row.

    +--------+----------+------------------------+----+--------+-----------+
    |crn     |date      |transaction_id          |type|category|action_date|
    +--------+----------+------------------------+----+--------+-----------+
    |06048488|2018-05-21|MzIwNTM3OTkwNmFkaXF6a2N4|TM01|officers|2018-04-30 |
    |06048488|2018-05-21|MzIwNTM3OTg5NWFkaXF6a2N4|TM01|officers|2018-04-30 |
    |06048488|2018-05-21|MzIwNTM3OTgyN2FkaXF6a2N4|TM01|officers|2018-04-30 |
    |06048488|2018-05-21|MzIwNTM3OTgyNWFkaXF6a2N4|AP01|officers|2018-04-30 |
    |06048488|2018-05-21|MzIwNTM3OTY5M2FkaXF6a2N4|AP01|officers|2018-04-30 |
    |06048488|2018-05-21|MzIwNTM3OTUwMWFkaXF6a2N4|AP01|officers|2018-04-30 |
    +--------+----------+------------------------+----+--------+-----------+
    

    There are number of ways it can be achieved. We are using a temporary SQL table and grouping rows by date and using collect_list in a list. Seee the result below.

    df4 = spark.sql("""
    select 
    date,
    collect_list(type) as type_list,
    collect_list(category) as category_list,
    collect_list(action_date)as action_date
    from transtbl
    group by date
    having count(*) > 1
    order by date
    """)
    
    +----------+------------------------------------+------------------------------------------------------------+------------------------------------------------------------------------+
    |date      |type_list                           |category_list                                               |action_date                                                             |
    +----------+------------------------------------+------------------------------------------------------------+------------------------------------------------------------------------+
    |2007-01-11|[288b, NEWINC]                      |[officers, incorporation]                                   |[]                                                                      |
    |2010-04-07|[CH01, CH04]                        |[officers, officers]                                        |[2009-10-15, 2010-01-01]                                                |
    |2014-07-22|[CERTNM, CONNOT]                    |[change-of-name, change-of-name]                            |[]                                                                      |
    |2015-02-08|[AR01, CH01]                        |[annual-return, officers]                                   |[2015-01-11, 2014-12-01]                                                |
    |2015-07-10|[MR04, MR04, MR04]                  |[mortgage, mortgage, mortgage]                              |[]                                                                      |
    |2015-09-16|[TM02, TM01, AP01, AP01, AP01]      |[officers, officers, officers, officers, officers]          |[2015-07-17, 2015-07-17, 2015-07-17, 2015-07-17, 2015-07-17]            |
    |2015-09-22|[MA, RESOLUTIONS]                   |[incorporation, resolution]                                 |[]                                                                      |
    |2017-02-07|[TM01, CS01, TM01]                  |[officers, confirmation-statement, officers]                |[2017-02-07, 2017-01-11, 2017-02-07]                                    |
    |2018-02-15|[CS01, CH01]                        |[confirmation-statement, officers]                          |[2018-01-11, 2018-02-14]                                                |
    |2018-05-21|[TM01, TM01, TM01, AP01, AP01, AP01]|[officers, officers, officers, officers, officers, officers]|[2018-04-30, 2018-04-30, 2018-04-30, 2018-04-30, 2018-04-30, 2018-04-30]|
    |2018-12-18|[SH20, SH19, CAP-SS, RESOLUTIONS]   |[capital, capital, insolvency, resolution]                  |[2018-12-18]                                                            |
    |2019-01-28|[PSC02, SH01]                       |[persons-with-significant-control, capital]                 |[2018-12-17, 2013-06-01]                                                |
    +----------+------------------------------------+------------------------------------------------------------+------------------------------------------------------------------------+

    Example using dataframe

    column_names = ['username', 'start_range', 'end_range', 'user_type']
    data = [
        ('a', None, 99, 'admin'),
        ('a', 140, 200, 'admin'),
        ('a', 150, 100, 'admin'),
        ('b', 100, 150, 'admin'),
        ('a', 300, 350, 'reader'),
        ('b', 200, 300, 'admin'),
    ]
    df = spark.createDataFrame(data=data, schema=column_names)
    df.show()
    
    w = (
        Window
        .partitionBy("username","user_type")
        .orderBy(F.col("username"),F.col("user_type").asc())
    )
    w_unbounded = (
        w.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )
    
    cols_to_merge = [col for col in df.columns if col not in ["username","user_type"]]
    merged_cols = [F.max(col).over(w_unbounded).alias(col) for col in cols_to_merge]
    #merged_cols = [F.greatest(col,F.last(col).over(w_unbounded)).alias(col) for col in cols_to_merge]
    df_merged = (
        df
        .select([F.col("username"), F.col("user_type")] + merged_cols)
        .withColumn("rn", F.row_number().over(w))
        .filter(F.col("rn") == 1)
        .drop("rn")
    )
    df_merged.show()
    +--------+-----------+---------+---------+
    |username|start_range|end_range|user_type|
    +--------+-----------+---------+---------+
    |       a|       null|       99|    admin|
    |       a|        140|      200|    admin|
    |       a|        150|      100|    admin|
    |       b|        100|      150|    admin|
    |       a|        300|      350|   reader|
    |       b|        200|      300|    admin|
    +--------+-----------+---------+---------+
    
    +--------+---------+-----------+---------+
    |username|user_type|start_range|end_range|
    +--------+---------+-----------+---------+
    |       a|    admin|        150|      200|
    |       a|   reader|        300|      350|
    |       b|    admin|        200|      300|
    +--------+---------+-----------+---------+