UPDATE – 07/07/2022
It can be achieved using few lines of PySpark codes. See below:
cols_merge = ['transaction_type','correction_marker', etc...]
exprs = [F.concat_ws(" ",F.collect_set(col)).alias(col) for col in cols_to_merge]
df = df.groupBy('company_number').agg(*exprs)
In this article, we will show how to merge multiple rows sharing id into one row using PySpark. We will use Companies House dataset for this article. You may way find previous articles about how to get companies house data using rest api useful.
Getting data
Let gets some real data for this exercise. We will use Companies HouseRest API to get filling history data for a company.
We will follow the steps below to achieve our goals:
- get data from Companies House using Python requests library
- extract data from response
- save json output into a dataframe
- create a temporary table to transform data and merge multiple rows sharing id into one row
Algorithm for merging rows into a single row
Lets look into few records. We want to merge records with same date into one row.
+--------+----------+------------------------+----+--------+-----------+
|crn |date |transaction_id |type|category|action_date|
+--------+----------+------------------------+----+--------+-----------+
|06048488|2018-05-21|MzIwNTM3OTkwNmFkaXF6a2N4|TM01|officers|2018-04-30 |
|06048488|2018-05-21|MzIwNTM3OTg5NWFkaXF6a2N4|TM01|officers|2018-04-30 |
|06048488|2018-05-21|MzIwNTM3OTgyN2FkaXF6a2N4|TM01|officers|2018-04-30 |
|06048488|2018-05-21|MzIwNTM3OTgyNWFkaXF6a2N4|AP01|officers|2018-04-30 |
|06048488|2018-05-21|MzIwNTM3OTY5M2FkaXF6a2N4|AP01|officers|2018-04-30 |
|06048488|2018-05-21|MzIwNTM3OTUwMWFkaXF6a2N4|AP01|officers|2018-04-30 |
+--------+----------+------------------------+----+--------+-----------+
There are number of ways it can be achieved. We are using a temporary SQL table and grouping rows by date and using collect_list in a list. Seee the result below.
df4 = spark.sql("""
select
date,
collect_list(type) as type_list,
collect_list(category) as category_list,
collect_list(action_date)as action_date
from transtbl
group by date
having count(*) > 1
order by date
""")
+----------+------------------------------------+------------------------------------------------------------+------------------------------------------------------------------------+
|date |type_list |category_list |action_date |
+----------+------------------------------------+------------------------------------------------------------+------------------------------------------------------------------------+
|2007-01-11|[288b, NEWINC] |[officers, incorporation] |[] |
|2010-04-07|[CH01, CH04] |[officers, officers] |[2009-10-15, 2010-01-01] |
|2014-07-22|[CERTNM, CONNOT] |[change-of-name, change-of-name] |[] |
|2015-02-08|[AR01, CH01] |[annual-return, officers] |[2015-01-11, 2014-12-01] |
|2015-07-10|[MR04, MR04, MR04] |[mortgage, mortgage, mortgage] |[] |
|2015-09-16|[TM02, TM01, AP01, AP01, AP01] |[officers, officers, officers, officers, officers] |[2015-07-17, 2015-07-17, 2015-07-17, 2015-07-17, 2015-07-17] |
|2015-09-22|[MA, RESOLUTIONS] |[incorporation, resolution] |[] |
|2017-02-07|[TM01, CS01, TM01] |[officers, confirmation-statement, officers] |[2017-02-07, 2017-01-11, 2017-02-07] |
|2018-02-15|[CS01, CH01] |[confirmation-statement, officers] |[2018-01-11, 2018-02-14] |
|2018-05-21|[TM01, TM01, TM01, AP01, AP01, AP01]|[officers, officers, officers, officers, officers, officers]|[2018-04-30, 2018-04-30, 2018-04-30, 2018-04-30, 2018-04-30, 2018-04-30]|
|2018-12-18|[SH20, SH19, CAP-SS, RESOLUTIONS] |[capital, capital, insolvency, resolution] |[2018-12-18] |
|2019-01-28|[PSC02, SH01] |[persons-with-significant-control, capital] |[2018-12-17, 2013-06-01] |
+----------+------------------------------------+------------------------------------------------------------+------------------------------------------------------------------------+
Example using dataframe
column_names = ['username', 'start_range', 'end_range', 'user_type']
data = [
('a', None, 99, 'admin'),
('a', 140, 200, 'admin'),
('a', 150, 100, 'admin'),
('b', 100, 150, 'admin'),
('a', 300, 350, 'reader'),
('b', 200, 300, 'admin'),
]
df = spark.createDataFrame(data=data, schema=column_names)
df.show()
w = (
Window
.partitionBy("username","user_type")
.orderBy(F.col("username"),F.col("user_type").asc())
)
w_unbounded = (
w.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)
cols_to_merge = [col for col in df.columns if col not in ["username","user_type"]]
merged_cols = [F.max(col).over(w_unbounded).alias(col) for col in cols_to_merge]
#merged_cols = [F.greatest(col,F.last(col).over(w_unbounded)).alias(col) for col in cols_to_merge]
df_merged = (
df
.select([F.col("username"), F.col("user_type")] + merged_cols)
.withColumn("rn", F.row_number().over(w))
.filter(F.col("rn") == 1)
.drop("rn")
)
df_merged.show()
+--------+-----------+---------+---------+
|username|start_range|end_range|user_type|
+--------+-----------+---------+---------+
| a| null| 99| admin|
| a| 140| 200| admin|
| a| 150| 100| admin|
| b| 100| 150| admin|
| a| 300| 350| reader|
| b| 200| 300| admin|
+--------+-----------+---------+---------+
+--------+---------+-----------+---------+
|username|user_type|start_range|end_range|
+--------+---------+-----------+---------+
| a| admin| 150| 200|
| a| reader| 300| 350|
| b| admin| 200| 300|
+--------+---------+-----------+---------+