How do you include delta records in new synthetic dataset? In previous article we describe how to generate synthetic data and will use the same method to solve this question.
Approach
- taking a snapshot of your synthetic data
- create new set of synthetic data using the row count from the snapshot and add few more records to it
- select variables which will contain changes
- create a data frame by joining new dataset with snapshot on rowno. Use coalesc function for capturing changes.
- append updated data frame into your existing snapshot
Code showing generating delta
# delta records
def generate_delta(df_source,df_target,id_col=None,change_cols=None):
if id_col is None:
id_col = df_target.columns[:1]
if change_cols is None:
change_cols = df_target.columns
# delta
select_expr =[
*[F.coalesce(crn[c], crn_s[c]).alias(c) for c in crn.columns if c not in change_cols],
*[F.coalesce(crn_s[c], crn[c]).alias(c) for c in change_cols if c not in [id_col]],
]
df_out = df_source.join(df_target, [df_source[id_col]==df_target[id_col]],how="outer").select(*select_expr)
return df_out
Implementation – codes
# generate synthetic data
crn = crn_data(spark,1000)
crn_s = crn_data(spark,1200)
# save data as parquet table with partition key
crn.coalesce(1).write.partitionBy('loaddate').format("parquet").mode("overwrite").saveAsTable("companies_house")
# update source data frame with changes on seleted variables
from pyspark.sql import functions as F
id_col = 'rowno'
change_cols = [
'ch_legalstatus',
'sic2007'
]
df_out = generate_delta(crn_s,crn,id_col,change_cols)
# append data
df_out.write.partitionBy('loaddate').format("parquet").mode("append").saveAsTable("companies_house")
# verify data changes
df_changes = change_data_capture(df_out,crn,id_col,change_cols)