Synthetic data creation and linking records

In this article we will how to generate synthetic data and linking records on particular field.

Approach

We will use three dataframes and will link records using value my matching a variable.

  • variables: id, name and legalstatus
  • dataframe – df_c, df_p and df_v

See previous article https://broadoakdata.uk/generating-synthetic-data-using-hive-sql/ for reference.

Code snippets

# V P C V_P V_C P_C V_P_C
df_c = spark.sql("""
select
lpad(cast(floor(rand() * 8000000) as varchar(7)),8,'7') as ch_no,
concat(Array("C1","C2","F1","V_C","V_P_C","C_P","F3")[floor(rand()*6+0)],floor(rand() * 20)) as ch_name,
floor(rand() * 3 + 1) as ch_legalstatus
from (select 100 as start_r, 1 as end_r) r 
lateral view posexplode(split(space(start_r - end_r),' ')) pe as i,s
""")
df_v = spark.sql("""
select
lpad(cast(floor(rand() * 7000000) as varchar(7)),12,'8') as v_no,
concat(Array("V1","V2","VF1","V_C","V_P_C","V_P","F3")[floor(rand()*6+0)],floor(rand() * 20)) as v_name,
floor(rand() * 7 + 1) as v_legalstatus
from (select 100 as start_r, 1 as end_r) r 
lateral view posexplode(split(space(start_r - end_r),' ')) pe as i,s
""")
df_p = spark.sql("""
select
lpad(cast(floor(rand() * 6000000) as varchar(7)),13,'9') as p_no,
concat(Array("P1","P2","PF1","V_P","V_P_C","P_C","F4")[floor(rand()*6+0)],floor(rand() * 20)) as p_name,
floor(rand() * 7 + 1) as p_legalstatus
from (select 100 as start_r, 1 as end_r) r 
lateral view posexplode(split(space(start_r - end_r),' ')) pe as i,s
""")

# group them by name
df_cgrp = df_c.groupBy('ch_name').agg(F.collect_set('ch_no').alias('c_no'))
df_vgrp = df_v.groupBy('v_name').agg(F.collect_set('v_no').alias('v_no'))
df_pgrp = df_p.groupBy('p_name').agg(F.collect_set('p_no').alias('p_no'))

# distinct number
df_cgrp = df_cgrp.filter(F.size(df_cgrp.c_no)==1)
df_vgrp = df_vgrp.filter(F.size(df_vgrp.v_no)==1)
df_pgrp = df_pgrp.filter(F.size(df_pgrp.p_no)==1)

# join dataframes on name
df_lu = df_cgrp.join(df_vgrp,on=[df_vgrp.v_name == df_cgrp.ch_name],how='full')
df_lu = df_lu.join(df_pgrp,on=[df_lu.v_name == df_pgrp.p_name],how='full')

# add additional variables including link id (lu)
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.orderBy(F.col('c_no'),F.col('v_no'),F.col('p_no'))
df_lu = (df_lu.withColumn('lu',F.row_number().over(w))
               .withColumn('lu',F.col('lu') + 200000000000000)
               .withColumn('load_timestamp',F.current_timestamp())
               .withColumn('created_at',F.current_timestamp())
               .withColumn('load_date',F.current_date().cast('varchar(10)'))
)
# select relevant variables
df_fact = df_lu.select('lu','c_no','v_no','p_no','load_timestamp','created_at','load_date')

# flatten fact dataframe
df_fact.createOrReplaceTempView('bi_fact')
df1 = spark.sql("""
select
lu,crn as unitref,
'c' as unittype
from bi_fact
lateral view explode(c_no) crefs as crn
union
select
lu,vat as unitref,
'v' as unittype
from bi_fact
lateral view explode(v_no) vrefs as vat
union
select
lu,paye as unitref,
'p' as unittype
from bi_fact
lateral view explode(p_no) prefs as paye
""")

# save as table
df1.write.mode('overwrite').saveAsTable('fact_tbl')

# save df_c as table
df_c.write.saveAsTable('crn_tbl')

# join crn_tbl and fact_tbl on unitref
df2 = spark.sql("""
select 
distinct f.lu,c.*
from crn_tbl c
inner join fact_tbl f on c.ch_no = f.unitref and unittype='c'
""")
sample data
link table

Generating synthetic data using PostgreSQL

create table as ch_test
select
lpad(cast(floor(random() * 8000000) as varchar(7)),8,'7') as ch_no,
concat((array['C1','C2','F1','V_C','V_P_C','C_P','F3'])[floor(random()*6+0)],floor(random() * 20)) as ch_name,
floor(random() * 3 + 1) as ch_legalstatus,
(enddate - '300 day'::interval)::date ch_registration_date,
case
when random() > 0.30 then null
else enddate 
end ch_deregisration_date
from (
SELECT date(current_date - trunc(random() * 365) * '1 day'::interval) as enddate, generate_series(1,1000)
) r
;