In this article we will how to generate synthetic data and linking records on particular field.
Approach
We will use three dataframes and will link records using value my matching a variable.
- variables: id, name and legalstatus
- dataframe – df_c, df_p and df_v
See previous article https://broadoakdata.uk/generating-synthetic-data-using-hive-sql/ for reference.
Code snippets
# V P C V_P V_C P_C V_P_C
df_c = spark.sql("""
select
lpad(cast(floor(rand() * 8000000) as varchar(7)),8,'7') as ch_no,
concat(Array("C1","C2","F1","V_C","V_P_C","C_P","F3")[floor(rand()*6+0)],floor(rand() * 20)) as ch_name,
floor(rand() * 3 + 1) as ch_legalstatus
from (select 100 as start_r, 1 as end_r) r
lateral view posexplode(split(space(start_r - end_r),' ')) pe as i,s
""")
df_v = spark.sql("""
select
lpad(cast(floor(rand() * 7000000) as varchar(7)),12,'8') as v_no,
concat(Array("V1","V2","VF1","V_C","V_P_C","V_P","F3")[floor(rand()*6+0)],floor(rand() * 20)) as v_name,
floor(rand() * 7 + 1) as v_legalstatus
from (select 100 as start_r, 1 as end_r) r
lateral view posexplode(split(space(start_r - end_r),' ')) pe as i,s
""")
df_p = spark.sql("""
select
lpad(cast(floor(rand() * 6000000) as varchar(7)),13,'9') as p_no,
concat(Array("P1","P2","PF1","V_P","V_P_C","P_C","F4")[floor(rand()*6+0)],floor(rand() * 20)) as p_name,
floor(rand() * 7 + 1) as p_legalstatus
from (select 100 as start_r, 1 as end_r) r
lateral view posexplode(split(space(start_r - end_r),' ')) pe as i,s
""")
# group them by name
df_cgrp = df_c.groupBy('ch_name').agg(F.collect_set('ch_no').alias('c_no'))
df_vgrp = df_v.groupBy('v_name').agg(F.collect_set('v_no').alias('v_no'))
df_pgrp = df_p.groupBy('p_name').agg(F.collect_set('p_no').alias('p_no'))
# distinct number
df_cgrp = df_cgrp.filter(F.size(df_cgrp.c_no)==1)
df_vgrp = df_vgrp.filter(F.size(df_vgrp.v_no)==1)
df_pgrp = df_pgrp.filter(F.size(df_pgrp.p_no)==1)
# join dataframes on name
df_lu = df_cgrp.join(df_vgrp,on=[df_vgrp.v_name == df_cgrp.ch_name],how='full')
df_lu = df_lu.join(df_pgrp,on=[df_lu.v_name == df_pgrp.p_name],how='full')
# add additional variables including link id (lu)
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.orderBy(F.col('c_no'),F.col('v_no'),F.col('p_no'))
df_lu = (df_lu.withColumn('lu',F.row_number().over(w))
.withColumn('lu',F.col('lu') + 200000000000000)
.withColumn('load_timestamp',F.current_timestamp())
.withColumn('created_at',F.current_timestamp())
.withColumn('load_date',F.current_date().cast('varchar(10)'))
)
# select relevant variables
df_fact = df_lu.select('lu','c_no','v_no','p_no','load_timestamp','created_at','load_date')
# flatten fact dataframe
df_fact.createOrReplaceTempView('bi_fact')
df1 = spark.sql("""
select
lu,crn as unitref,
'c' as unittype
from bi_fact
lateral view explode(c_no) crefs as crn
union
select
lu,vat as unitref,
'v' as unittype
from bi_fact
lateral view explode(v_no) vrefs as vat
union
select
lu,paye as unitref,
'p' as unittype
from bi_fact
lateral view explode(p_no) prefs as paye
""")
# save as table
df1.write.mode('overwrite').saveAsTable('fact_tbl')
# save df_c as table
df_c.write.saveAsTable('crn_tbl')
# join crn_tbl and fact_tbl on unitref
df2 = spark.sql("""
select
distinct f.lu,c.*
from crn_tbl c
inner join fact_tbl f on c.ch_no = f.unitref and unittype='c'
""")
Generating synthetic data using PostgreSQL
create table as ch_test
select
lpad(cast(floor(random() * 8000000) as varchar(7)),8,'7') as ch_no,
concat((array['C1','C2','F1','V_C','V_P_C','C_P','F3'])[floor(random()*6+0)],floor(random() * 20)) as ch_name,
floor(random() * 3 + 1) as ch_legalstatus,
(enddate - '300 day'::interval)::date ch_registration_date,
case
when random() > 0.30 then null
else enddate
end ch_deregisration_date
from (
SELECT date(current_date - trunc(random() * 365) * '1 day'::interval) as enddate, generate_series(1,1000)
) r
;