Change data capture ETL pipelines

In this article we look we will look into change data capture ETL pipelines. How do you implement change data capture (CDC)?

We will use PySpark to implement CDC using data from Companies House . ETL pipelines have been deployed and tested in Google Cloud Platform.

Outline of ETL pipelines for change data capture:

  1. From historical table (target) identify
    1. current records
    2. historical records
  2. Join snapshot (source data) with current records and identify the following:
    1. records exists in both datasets
    2. records not changed in target data
    3. new records
    4. records not exists in the source but exists in the target
  3. Identify data changes and save them into data change history table
  4. Join inactive and all the current records and update the target table

Code snippets of change data capture

# identify and save changes - dealing with delta records
def process_save_changes(spark,logging,df_source,df_target,cols,targettable=None,pchtable=None):
  """
  identify changes in two dataframes
  """
  logging.info('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
  print('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
  change_col = "data_changes"
  id_col = df_target.columns[0]
  # separate target data by live_flag
  df_active = df_target.filter(df_target['live_flag'] == True) 
  df_inactive = df_target.filter(df_target['live_flag'] == False) 
  # record already exist
  df_source_exists = df_source.join(df_active,df_source[id_col] == df_active[id_col],'inner')\
                           .select(df_active['*'])\
                           .dropDuplicates([id_col]) 
  df_source_exists = df_source_exists.withColumn('live_flag',F.lit(False))
  
  #  no changes
  df_no_changes = df_active.join(df_source,df_active[id_col] == df_source[id_col],"left")\
                .where(df_source[id_col].isNull())\
                .select(df_active['*'])
  # join dataframes to create an updated target dataframe
  df_target_updated = df_no_changes.union(df_source_exists)
  df_target_updated = df_target_updated.union(df_inactive)

  # new records
  df_new = df_source.join(df_active, on=[id_col], how='leftanti')
  df_new = df_new.withColumn(change_col,F.lit(2))
  # identify deleted records
  df_deleted = df_active.join(df_source, on=[id_col], how='leftanti')
  df_deleted = df_deleted.withColumn(change_col,F.lit("D"))

  # identify updated records
  df_changes = find_changes(spark,logging,df_source,df_source_exists,cols)
  
  # join new and updated source records
  df_new_changes = df_new.union(df_changes)
  df_new_changes_deleted = df_new_changes.union(df_deleted)
    
  # save change history
  if pchtable is not None:
    save_change_history(spark,df_new_changes_deleted,pchtable)
      
  # join source and target
  df_new_changes = df_new_changes.drop(change_col)
  df_source_target = df_new_changes.union(df_target_updated)
  
  # save data
  if targettable is not None:
    save_target_data(spark,df_source_target,targettable)

  logging.info('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
  print('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
                             
  return df_source_target

See code can be found at: https://gitlab.com/akkasali910/companieshouse-data

Change data capture ETL pipelines

Identifying changes

How do you identify changes: We need source and target data frames together with columns needed for capturing changes.

# process change history
def find_changes(spark,logging,df_source,df_source_exists,cols):
  change_col = "data_changes"
  id_col = df_source.columns[0]
  logging.info('Using ID column: ' + id_col)  
  # identify updated records
  df_t = df_source_exists.select(*(F.col(x).alias(x + '_t') for x in df_source_exists.columns))
  df_changes = df_source.join(df_t,df_source[id_col] == df_t[id_col+'_t'],"inner")\
                .select('*')
  df_changes = df_changes.withColumn(change_col,F.lit(""))
    
  logging.info('Tracking changes for:') 
  i = 0
  for col in cols:
    logging.info(col) 
    print(col) 
    i=i+1
    df_changes = df_changes.withColumn(change_col, add_to_string_col(
      df_changes[change_col],f"{col}",
      check_changes(df_changes[col],df_changes[col+'_t'])))
    # incase if you want to avoid over writing previous non null values
    # coalesce does not work with no value. i.e. value is space
    df_changes = df_changes.withColumn(col,F.when(F.length(F.trim(F.col(col)))==0,df_changes[col+'_t']) \
         .otherwise(F.coalesce(df_changes[col],df_changes[col+'_t'])))    

    #df_changes = df_changes.withColumn(col,F.coalesce(df_changes[col],df_changes[col+'_t']))
    df_changes.cache().count()

  # drop columns
  df_changes = df_changes.drop(*df_t.columns)
  
  return df_changes

Update – 10/06/2023

Using different approach to CDC (capture data change). Let the HIVE SQL do the most of the work.

# implementation of CDC using dataframes
def change_data_capture(df_source,df_target,id_col=None,change_cols=None):
  if id_col is None:
    id_col = df_target_columns[:1]
  if change_cols is None:
    change_cols = df_target_columns    
  # 0=no change, 2=New, 3=change and 99=delete  
  conditions = [F.when(df_source[c] != df_target[c], F.lit(c)).otherwise("") for c in change_cols if c not in [id_col]]
  change_type = (F.when(df_target[id_col].isNull(), F.lit(2))
          .when(df_source[id_col].isNull(), F.lit(99))
          .when(F.size(F.array_remove(F.array(*conditions), "")) > 0, F.lit(3))
          .otherwise(0)
         )
  select_expr =[ 
                F.col(id_col),
                *[F.coalesce(df_source[c], df_target[c]).alias(c) for c in change_cols if c not in [id_col]],                
                F.array_remove(F.array(*conditions), "").alias("data_changes"),
                change_type.alias("change_type"),
  ]
  df_out = df_source.join(df_target, [id_col],"outer").select(*select_expr)
  return df_out

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *