In this article, we will describe an approach for Change Data Capture Implementation using PySpark. All the functions are included in the example together with test data. The is an updated version Change data capture ETL pipelines.
UPDATE – 10/06/2023 using HIVE SQL to implement find_changes will take less time than processing dataframe using PySpark.
# implementation of CDC using HIVE SQL
def change_data_capture(df_source,df_target,id_col=None,change_cols=None):
if id_col is None:
id_col = df_target_columns[:1]
if change_cols is None:
change_cols = df_target_columns
# 0=no change, 2=New, 3=change and 99=delete
conditions = [F.when(df_source[c] != df_target[c], F.lit(c)).otherwise("") for c in change_cols if c not in [id_col]]
change_type = (F.when(df_target[id_col].isNull(), F.lit(2))
.when(df_source[id_col].isNull(), F.lit(99))
.when(F.size(F.array_remove(F.array(*conditions), "")) > 0, F.lit(3))
.otherwise(0)
)
select_expr =[
F.col(id_col),
*[F.coalesce(df_source[c], df_target[c]).alias(c) for c in change_cols if c not in [id_col]],
F.array_remove(F.array(*conditions), "").alias("data_changes"),
change_type.alias("change_type"),
]
df_out = df_source.join(df_target, [id_col],"outer").select(*select_expr)
return df_out
Change Data Capture Implementation
- target dataframe (table containing historical and current data)
- source dataframe (incoming file)
- identify changes in variables and update the target table
- separate target data by live_flag
- records already exist
- no changes
- join dataframes to create an updated target dataframe with history
- dataframe to hold records exist in the incoming file
- dataframe to hold records does not exist in the incoming file – looking at target table
- dataframe storing updated records
- save change histroy
- join dataframes and update target table
Code snippet listed below showing implementation of CDC
# identify and save changes
def process_save_changes(spark,logging,df_source,df_target,change_cols,targettable=None,pchtable=None):
"""
identify changes in two dataframes
parameters:
df_source - dataframe containing new dataset
df_target - existing datasets (target dataframe will updated accordingly)
change_cols - list of varibles to for capturing chnages
"""
logging.info('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
print('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
change_col = "data_changes"
col = df_source.columns[0]
# separate target data by live_flag
df_active = df_target.filter(df_target['live_flag'] == True)
df_inactive = df_target.filter(df_target['live_flag'] == False)
df_active.cache().count()
df_inactive.cache().count()
df_source.cache().count()
# records already exist
df_t = df_active.select(*(F.col(x).alias(x + '_t') for x in df_active.columns))
df_amends = df_source.join(df_t,df_source[col] == df_t[col+'_t'],'inner')\
.select('*')\
.dropDuplicates([col])
df_amends.cache().count()
# no changes
df_no_changes = df_active.join(df_source,df_active[col] == df_source[col],"left")\
.where(df_source[col].isNull())\
.select(df_active['*'])
# to keep history
df_amends_prev = df_amends.select(df_t['*'])
df_amends_prev = rename_cols(df_amends_prev,'_t','')
df_amends_prev = df_amends_prev.withColumn("live_flag",F.lit(False))
df_target_updated = df_no_changes.union(df_amends_prev)
df_target_updated = df_target_updated.union(df_inactive)
# join dataframes to create an updated target dataframe without history
#df_target_updated = df_no_changes.union(df_inactive)
#df_target_updated.cache().count()
# birth
df_birth = df_source.join(df_target, on=[col], how='leftanti')
df_birth = df_birth.withColumn(change_col,F.lit(2))
df_birth.cache().count()
# death
df_death = df_target.join(df_source, on=[col], how='leftanti')
df_death = df_death.withColumn(change_col,F.lit(99))
df_death.cache().count()
# identify updated records
df_changes = find_changes(spark,logging,df_amends,change_cols)
df_changes = df_changes.drop(*df_t.columns)
# join new and updated source records
df_all_changes = df_birth.union(df_changes)
df_all_changes = df_all_changes.union(df_death)
# save change history
if pchtable is not None:
save_change_history(spark,df_all_changes,pchtable)
# join source and target
df_changes = df_all_changes.drop(change_col)
df_source_target = df_changes.union(df_target_updated)
# save data
if targettable is not None:
save_target_data(spark,df_source_target,targettable)
logging.info('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
print('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
return df_source_target, df_all_changes
Working Python script or Jupyter notebook can be found at https://gitlab.com/akkasali910/companieshouse-data