Tag: PySpark

  • Generating delta for Synthetic data

    How do you include delta records in new synthetic dataset? In previous article we describe how to generate synthetic data and will use the same method to solve this question.

    Approach

    • taking a snapshot of your synthetic data
    • create new set of synthetic data using the row count from the snapshot and add few more records to it
    • select variables which will contain changes
    • create a data frame by joining new dataset with snapshot on rowno. Use coalesc function for capturing changes.
    • append updated data frame into your existing snapshot

    Code showing generating delta

    See Google CoLab notebook

    # delta records
    def generate_delta(df_source,df_target,id_col=None,change_cols=None):
      if id_col is None:
        id_col = df_target.columns[:1]
      if change_cols is None:
        change_cols = df_target.columns
      # delta
      select_expr =[
        *[F.coalesce(crn[c], crn_s[c]).alias(c) for c in crn.columns if c not in change_cols],
        *[F.coalesce(crn_s[c], crn[c]).alias(c) for c in change_cols if c not in [id_col]],
      ]
      df_out = df_source.join(df_target, [df_source[id_col]==df_target[id_col]],how="outer").select(*select_expr)
      return df_out

    Implementation – codes

    # generate synthetic data
    crn = crn_data(spark,1000)
    crn_s = crn_data(spark,1200)
    # save data as parquet table with partition key
    crn.coalesce(1).write.partitionBy('loaddate').format("parquet").mode("overwrite").saveAsTable("companies_house")
    # update source data frame with changes on seleted variables
    from pyspark.sql import functions as F
    id_col = 'rowno'
    change_cols = [
       'ch_legalstatus',
       'sic2007'
    ]
    df_out = generate_delta(crn_s,crn,id_col,change_cols)  
    # append data 
    df_out.write.partitionBy('loaddate').format("parquet").mode("append").saveAsTable("companies_house")
    
    # verify data changes
    df_changes = change_data_capture(df_out,crn,id_col,change_cols)
     

    Screen shots

    synthetic delta data
    delta changes