Change Data Capture Implementation using PySpark

In this article, we will describe an approach for Change Data Capture Implementation using PySpark. All the functions are included in the example together with test data. The is an updated version Change data capture ETL pipelines.

UPDATE – 10/06/2023 using HIVE SQL to implement find_changes will take less time than processing dataframe using PySpark.

# implementation of CDC using HIVE SQL
def change_data_capture(df_source,df_target,id_col=None,change_cols=None):
  if id_col is None:
    id_col = df_target_columns[:1]
  if change_cols is None:
    change_cols = df_target_columns    
  # 0=no change, 2=New, 3=change and 99=delete  
  conditions = [F.when(df_source[c] != df_target[c], F.lit(c)).otherwise("") for c in change_cols if c not in [id_col]]
  change_type = (F.when(df_target[id_col].isNull(), F.lit(2))
          .when(df_source[id_col].isNull(), F.lit(99))
          .when(F.size(F.array_remove(F.array(*conditions), "")) > 0, F.lit(3))
          .otherwise(0)
         )
  select_expr =[ 
                F.col(id_col),
                *[F.coalesce(df_source[c], df_target[c]).alias(c) for c in change_cols if c not in [id_col]],                
                F.array_remove(F.array(*conditions), "").alias("data_changes"),
                change_type.alias("change_type"),
  ]
  df_out = df_source.join(df_target, [id_col],"outer").select(*select_expr)
  return df_out

Change Data Capture Implementation

  • target dataframe (table containing historical and current data)
  • source dataframe (incoming file)
  • identify changes in variables and update the target table
    • separate target data by live_flag
    • records already exist
    • no changes
    • join dataframes to create an updated target dataframe with history
    • dataframe to hold records exist in the incoming file
    • dataframe to hold records does not exist in the incoming file – looking at target table
    • dataframe storing updated records
    • save change histroy
    • join dataframes and update target table

Code snippet listed below showing implementation of CDC

# identify and save changes
def process_save_changes(spark,logging,df_source,df_target,change_cols,targettable=None,pchtable=None):
  """
  identify changes in two dataframes
  
  parameters:
    df_source - dataframe containing new dataset
    df_target - existing datasets (target dataframe will updated accordingly)
    change_cols - list of varibles to for capturing chnages    
  """
  logging.info('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
  print('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
  change_col = "data_changes"
  col = df_source.columns[0]
  
  # separate target data by live_flag
  df_active = df_target.filter(df_target['live_flag'] == True) 
  df_inactive = df_target.filter(df_target['live_flag'] == False) 
  df_active.cache().count()
  df_inactive.cache().count()
  df_source.cache().count()

  # records already exist
  df_t = df_active.select(*(F.col(x).alias(x + '_t') for x in df_active.columns))
  df_amends = df_source.join(df_t,df_source[col] == df_t[col+'_t'],'inner')\
                           .select('*')\
                           .dropDuplicates([col]) 
  df_amends.cache().count()
  
  #  no changes
  df_no_changes = df_active.join(df_source,df_active[col] == df_source[col],"left")\
                .where(df_source[col].isNull())\
                .select(df_active['*'])

  # to keep history
  df_amends_prev = df_amends.select(df_t['*'])
  df_amends_prev = rename_cols(df_amends_prev,'_t','')
  df_amends_prev = df_amends_prev.withColumn("live_flag",F.lit(False))
  df_target_updated = df_no_changes.union(df_amends_prev)
  df_target_updated = df_target_updated.union(df_inactive)

  # join dataframes to create an updated target dataframe  without history
  #df_target_updated = df_no_changes.union(df_inactive)
  #df_target_updated.cache().count()

  # birth
  df_birth = df_source.join(df_target, on=[col], how='leftanti')
  df_birth = df_birth.withColumn(change_col,F.lit(2))
  df_birth.cache().count()
  
  # death
  df_death = df_target.join(df_source, on=[col], how='leftanti')
  df_death = df_death.withColumn(change_col,F.lit(99))
  df_death.cache().count()
  
  # identify updated records
  df_changes = find_changes(spark,logging,df_amends,change_cols)
  df_changes = df_changes.drop(*df_t.columns)
  
  # join new and updated source records
  df_all_changes = df_birth.union(df_changes)
  df_all_changes = df_all_changes.union(df_death)
  
  # save change history
  if pchtable is not None:
    save_change_history(spark,df_all_changes,pchtable)
      
  # join source and target
  df_changes = df_all_changes.drop(change_col)
  df_source_target = df_changes.union(df_target_updated)
  
  # save data
  if targettable is not None:
    save_target_data(spark,df_source_target,targettable)

  logging.info('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
  print('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
                             
  return df_source_target, df_all_changes

Working Python script or Jupyter notebook can be found at https://gitlab.com/akkasali910/companieshouse-data