Tag: PySpark

  • Synthetic data creation and linking records

    In this article we will how to generate synthetic data and linking records on particular field.

    Approach

    We will use three dataframes and will link records using value my matching a variable.

    • variables: id, name and legalstatus
    • dataframe – df_c, df_p and df_v

    See previous article https://broadoakdata.uk/generating-synthetic-data-using-hive-sql/ for reference.

    Code snippets

    # V P C V_P V_C P_C V_P_C
    df_c = spark.sql("""
    select
    lpad(cast(floor(rand() * 8000000) as varchar(7)),8,'7') as ch_no,
    concat(Array("C1","C2","F1","V_C","V_P_C","C_P","F3")[floor(rand()*6+0)],floor(rand() * 20)) as ch_name,
    floor(rand() * 3 + 1) as ch_legalstatus
    from (select 100 as start_r, 1 as end_r) r 
    lateral view posexplode(split(space(start_r - end_r),' ')) pe as i,s
    """)
    df_v = spark.sql("""
    select
    lpad(cast(floor(rand() * 7000000) as varchar(7)),12,'8') as v_no,
    concat(Array("V1","V2","VF1","V_C","V_P_C","V_P","F3")[floor(rand()*6+0)],floor(rand() * 20)) as v_name,
    floor(rand() * 7 + 1) as v_legalstatus
    from (select 100 as start_r, 1 as end_r) r 
    lateral view posexplode(split(space(start_r - end_r),' ')) pe as i,s
    """)
    df_p = spark.sql("""
    select
    lpad(cast(floor(rand() * 6000000) as varchar(7)),13,'9') as p_no,
    concat(Array("P1","P2","PF1","V_P","V_P_C","P_C","F4")[floor(rand()*6+0)],floor(rand() * 20)) as p_name,
    floor(rand() * 7 + 1) as p_legalstatus
    from (select 100 as start_r, 1 as end_r) r 
    lateral view posexplode(split(space(start_r - end_r),' ')) pe as i,s
    """)
    
    # group them by name
    df_cgrp = df_c.groupBy('ch_name').agg(F.collect_set('ch_no').alias('c_no'))
    df_vgrp = df_v.groupBy('v_name').agg(F.collect_set('v_no').alias('v_no'))
    df_pgrp = df_p.groupBy('p_name').agg(F.collect_set('p_no').alias('p_no'))
    
    # distinct number
    df_cgrp = df_cgrp.filter(F.size(df_cgrp.c_no)==1)
    df_vgrp = df_vgrp.filter(F.size(df_vgrp.v_no)==1)
    df_pgrp = df_pgrp.filter(F.size(df_pgrp.p_no)==1)
    
    # join dataframes on name
    df_lu = df_cgrp.join(df_vgrp,on=[df_vgrp.v_name == df_cgrp.ch_name],how='full')
    df_lu = df_lu.join(df_pgrp,on=[df_lu.v_name == df_pgrp.p_name],how='full')
    
    # add additional variables including link id (lu)
    from pyspark.sql import functions as F
    from pyspark.sql import Window
    w = Window.orderBy(F.col('c_no'),F.col('v_no'),F.col('p_no'))
    df_lu = (df_lu.withColumn('lu',F.row_number().over(w))
                   .withColumn('lu',F.col('lu') + 200000000000000)
                   .withColumn('load_timestamp',F.current_timestamp())
                   .withColumn('created_at',F.current_timestamp())
                   .withColumn('load_date',F.current_date().cast('varchar(10)'))
    )
    # select relevant variables
    df_fact = df_lu.select('lu','c_no','v_no','p_no','load_timestamp','created_at','load_date')
    
    # flatten fact dataframe
    df_fact.createOrReplaceTempView('bi_fact')
    df1 = spark.sql("""
    select
    lu,crn as unitref,
    'c' as unittype
    from bi_fact
    lateral view explode(c_no) crefs as crn
    union
    select
    lu,vat as unitref,
    'v' as unittype
    from bi_fact
    lateral view explode(v_no) vrefs as vat
    union
    select
    lu,paye as unitref,
    'p' as unittype
    from bi_fact
    lateral view explode(p_no) prefs as paye
    """)
    
    # save as table
    df1.write.mode('overwrite').saveAsTable('fact_tbl')
    
    # save df_c as table
    df_c.write.saveAsTable('crn_tbl')
    
    # join crn_tbl and fact_tbl on unitref
    df2 = spark.sql("""
    select 
    distinct f.lu,c.*
    from crn_tbl c
    inner join fact_tbl f on c.ch_no = f.unitref and unittype='c'
    """)
    sample data
    link table

    Generating synthetic data using PostgreSQL

    create table as ch_test
    select
    lpad(cast(floor(random() * 8000000) as varchar(7)),8,'7') as ch_no,
    concat((array['C1','C2','F1','V_C','V_P_C','C_P','F3'])[floor(random()*6+0)],floor(random() * 20)) as ch_name,
    floor(random() * 3 + 1) as ch_legalstatus,
    (enddate - '300 day'::interval)::date ch_registration_date,
    case
    when random() > 0.30 then null
    else enddate 
    end ch_deregisration_date
    from (
    SELECT date(current_date - trunc(random() * 365) * '1 day'::interval) as enddate, generate_series(1,1000)
    ) r
    ;