Tag: PySpark

  • Change data capture ETL pipelines

    In this article we look we will look into change data capture ETL pipelines. How do you implement change data capture (CDC)?

    We will use PySpark to implement CDC using data from Companies House . ETL pipelines have been deployed and tested in Google Cloud Platform.

    Outline of ETL pipelines for change data capture:

    1. From historical table (target) identify
      1. current records
      2. historical records
    2. Join snapshot (source data) with current records and identify the following:
      1. records exists in both datasets
      2. records not changed in target data
      3. new records
      4. records not exists in the source but exists in the target
    3. Identify data changes and save them into data change history table
    4. Join inactive and all the current records and update the target table

    Code snippets of change data capture

    # identify and save changes - dealing with delta records
    def process_save_changes(spark,logging,df_source,df_target,cols,targettable=None,pchtable=None):
      """
      identify changes in two dataframes
      """
      logging.info('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
      print('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
      change_col = "data_changes"
      id_col = df_target.columns[0]
      # separate target data by live_flag
      df_active = df_target.filter(df_target['live_flag'] == True) 
      df_inactive = df_target.filter(df_target['live_flag'] == False) 
      # record already exist
      df_source_exists = df_source.join(df_active,df_source[id_col] == df_active[id_col],'inner')\
                               .select(df_active['*'])\
                               .dropDuplicates([id_col]) 
      df_source_exists = df_source_exists.withColumn('live_flag',F.lit(False))
      
      #  no changes
      df_no_changes = df_active.join(df_source,df_active[id_col] == df_source[id_col],"left")\
                    .where(df_source[id_col].isNull())\
                    .select(df_active['*'])
      # join dataframes to create an updated target dataframe
      df_target_updated = df_no_changes.union(df_source_exists)
      df_target_updated = df_target_updated.union(df_inactive)
    
      # new records
      df_new = df_source.join(df_active, on=[id_col], how='leftanti')
      df_new = df_new.withColumn(change_col,F.lit(2))
      # identify deleted records
      df_deleted = df_active.join(df_source, on=[id_col], how='leftanti')
      df_deleted = df_deleted.withColumn(change_col,F.lit("D"))
    
      # identify updated records
      df_changes = find_changes(spark,logging,df_source,df_source_exists,cols)
      
      # join new and updated source records
      df_new_changes = df_new.union(df_changes)
      df_new_changes_deleted = df_new_changes.union(df_deleted)
        
      # save change history
      if pchtable is not None:
        save_change_history(spark,df_new_changes_deleted,pchtable)
          
      # join source and target
      df_new_changes = df_new_changes.drop(change_col)
      df_source_target = df_new_changes.union(df_target_updated)
      
      # save data
      if targettable is not None:
        save_target_data(spark,df_source_target,targettable)
    
      logging.info('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
      print('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
                                 
      return df_source_target

    See code can be found at: https://gitlab.com/akkasali910/companieshouse-data

    Change data capture ETL pipelines

    Identifying changes

    How do you identify changes: We need source and target data frames together with columns needed for capturing changes.

    # process change history
    def find_changes(spark,logging,df_source,df_source_exists,cols):
      change_col = "data_changes"
      id_col = df_source.columns[0]
      logging.info('Using ID column: ' + id_col)  
      # identify updated records
      df_t = df_source_exists.select(*(F.col(x).alias(x + '_t') for x in df_source_exists.columns))
      df_changes = df_source.join(df_t,df_source[id_col] == df_t[id_col+'_t'],"inner")\
                    .select('*')
      df_changes = df_changes.withColumn(change_col,F.lit(""))
        
      logging.info('Tracking changes for:') 
      i = 0
      for col in cols:
        logging.info(col) 
        print(col) 
        i=i+1
        df_changes = df_changes.withColumn(change_col, add_to_string_col(
          df_changes[change_col],f"{col}",
          check_changes(df_changes[col],df_changes[col+'_t'])))
        # incase if you want to avoid over writing previous non null values
        # coalesce does not work with no value. i.e. value is space
        df_changes = df_changes.withColumn(col,F.when(F.length(F.trim(F.col(col)))==0,df_changes[col+'_t']) \
             .otherwise(F.coalesce(df_changes[col],df_changes[col+'_t'])))    
    
        #df_changes = df_changes.withColumn(col,F.coalesce(df_changes[col],df_changes[col+'_t']))
        df_changes.cache().count()
    
      # drop columns
      df_changes = df_changes.drop(*df_t.columns)
      
      return df_changes

    Update – 10/06/2023

    Using different approach to CDC (capture data change). Let the HIVE SQL do the most of the work.

    # implementation of CDC using dataframes
    def change_data_capture(df_source,df_target,id_col=None,change_cols=None):
      if id_col is None:
        id_col = df_target_columns[:1]
      if change_cols is None:
        change_cols = df_target_columns    
      # 0=no change, 2=New, 3=change and 99=delete  
      conditions = [F.when(df_source[c] != df_target[c], F.lit(c)).otherwise("") for c in change_cols if c not in [id_col]]
      change_type = (F.when(df_target[id_col].isNull(), F.lit(2))
              .when(df_source[id_col].isNull(), F.lit(99))
              .when(F.size(F.array_remove(F.array(*conditions), "")) > 0, F.lit(3))
              .otherwise(0)
             )
      select_expr =[ 
                    F.col(id_col),
                    *[F.coalesce(df_source[c], df_target[c]).alias(c) for c in change_cols if c not in [id_col]],                
                    F.array_remove(F.array(*conditions), "").alias("data_changes"),
                    change_type.alias("change_type"),
      ]
      df_out = df_source.join(df_target, [id_col],"outer").select(*select_expr)
      return df_out