In this article we look we will look into change data capture ETL pipelines. How do you implement change data capture (CDC)?
We will use PySpark to implement CDC using data from Companies House . ETL pipelines have been deployed and tested in Google Cloud Platform.
Outline of ETL pipelines for change data capture:
- From historical table (target) identify
- current records
- historical records
- Join snapshot (source data) with current records and identify the following:
- records exists in both datasets
- records not changed in target data
- new records
- records not exists in the source but exists in the target
- Identify data changes and save them into data change history table
- Join inactive and all the current records and update the target table
Code snippets of change data capture
# identify and save changes - dealing with delta records
def process_save_changes(spark,logging,df_source,df_target,cols,targettable=None,pchtable=None):
"""
identify changes in two dataframes
"""
logging.info('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
print('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
change_col = "data_changes"
id_col = df_target.columns[0]
# separate target data by live_flag
df_active = df_target.filter(df_target['live_flag'] == True)
df_inactive = df_target.filter(df_target['live_flag'] == False)
# record already exist
df_source_exists = df_source.join(df_active,df_source[id_col] == df_active[id_col],'inner')\
.select(df_active['*'])\
.dropDuplicates([id_col])
df_source_exists = df_source_exists.withColumn('live_flag',F.lit(False))
# no changes
df_no_changes = df_active.join(df_source,df_active[id_col] == df_source[id_col],"left")\
.where(df_source[id_col].isNull())\
.select(df_active['*'])
# join dataframes to create an updated target dataframe
df_target_updated = df_no_changes.union(df_source_exists)
df_target_updated = df_target_updated.union(df_inactive)
# new records
df_new = df_source.join(df_active, on=[id_col], how='leftanti')
df_new = df_new.withColumn(change_col,F.lit(2))
# identify deleted records
df_deleted = df_active.join(df_source, on=[id_col], how='leftanti')
df_deleted = df_deleted.withColumn(change_col,F.lit("D"))
# identify updated records
df_changes = find_changes(spark,logging,df_source,df_source_exists,cols)
# join new and updated source records
df_new_changes = df_new.union(df_changes)
df_new_changes_deleted = df_new_changes.union(df_deleted)
# save change history
if pchtable is not None:
save_change_history(spark,df_new_changes_deleted,pchtable)
# join source and target
df_new_changes = df_new_changes.drop(change_col)
df_source_target = df_new_changes.union(df_target_updated)
# save data
if targettable is not None:
save_target_data(spark,df_source_target,targettable)
logging.info('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
print('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
return df_source_target
See code can be found at: https://gitlab.com/akkasali910/companieshouse-data
Identifying changes
How do you identify changes: We need source and target data frames together with columns needed for capturing changes.
# process change history
def find_changes(spark,logging,df_source,df_source_exists,cols):
change_col = "data_changes"
id_col = df_source.columns[0]
logging.info('Using ID column: ' + id_col)
# identify updated records
df_t = df_source_exists.select(*(F.col(x).alias(x + '_t') for x in df_source_exists.columns))
df_changes = df_source.join(df_t,df_source[id_col] == df_t[id_col+'_t'],"inner")\
.select('*')
df_changes = df_changes.withColumn(change_col,F.lit(""))
logging.info('Tracking changes for:')
i = 0
for col in cols:
logging.info(col)
print(col)
i=i+1
df_changes = df_changes.withColumn(change_col, add_to_string_col(
df_changes[change_col],f"{col}",
check_changes(df_changes[col],df_changes[col+'_t'])))
# incase if you want to avoid over writing previous non null values
# coalesce does not work with no value. i.e. value is space
df_changes = df_changes.withColumn(col,F.when(F.length(F.trim(F.col(col)))==0,df_changes[col+'_t']) \
.otherwise(F.coalesce(df_changes[col],df_changes[col+'_t'])))
#df_changes = df_changes.withColumn(col,F.coalesce(df_changes[col],df_changes[col+'_t']))
df_changes.cache().count()
# drop columns
df_changes = df_changes.drop(*df_t.columns)
return df_changes
Update – 10/06/2023
Using different approach to CDC (capture data change). Let the HIVE SQL do the most of the work.
# implementation of CDC using dataframes
def change_data_capture(df_source,df_target,id_col=None,change_cols=None):
if id_col is None:
id_col = df_target_columns[:1]
if change_cols is None:
change_cols = df_target_columns
# 0=no change, 2=New, 3=change and 99=delete
conditions = [F.when(df_source[c] != df_target[c], F.lit(c)).otherwise("") for c in change_cols if c not in [id_col]]
change_type = (F.when(df_target[id_col].isNull(), F.lit(2))
.when(df_source[id_col].isNull(), F.lit(99))
.when(F.size(F.array_remove(F.array(*conditions), "")) > 0, F.lit(3))
.otherwise(0)
)
select_expr =[
F.col(id_col),
*[F.coalesce(df_source[c], df_target[c]).alias(c) for c in change_cols if c not in [id_col]],
F.array_remove(F.array(*conditions), "").alias("data_changes"),
change_type.alias("change_type"),
]
df_out = df_source.join(df_target, [id_col],"outer").select(*select_expr)
return df_out
Leave a Reply