Tag: PySpark

  • Change Data Capture Implementation using PySpark

    In this article, we will describe an approach for Change Data Capture Implementation using PySpark. All the functions are included in the example together with test data. The is an updated version Change data capture ETL pipelines.

    UPDATE – 10/06/2023 using HIVE SQL to implement find_changes will take less time than processing dataframe using PySpark.

    # implementation of CDC using HIVE SQL
    def change_data_capture(df_source,df_target,id_col=None,change_cols=None):
      if id_col is None:
        id_col = df_target_columns[:1]
      if change_cols is None:
        change_cols = df_target_columns    
      # 0=no change, 2=New, 3=change and 99=delete  
      conditions = [F.when(df_source[c] != df_target[c], F.lit(c)).otherwise("") for c in change_cols if c not in [id_col]]
      change_type = (F.when(df_target[id_col].isNull(), F.lit(2))
              .when(df_source[id_col].isNull(), F.lit(99))
              .when(F.size(F.array_remove(F.array(*conditions), "")) > 0, F.lit(3))
              .otherwise(0)
             )
      select_expr =[ 
                    F.col(id_col),
                    *[F.coalesce(df_source[c], df_target[c]).alias(c) for c in change_cols if c not in [id_col]],                
                    F.array_remove(F.array(*conditions), "").alias("data_changes"),
                    change_type.alias("change_type"),
      ]
      df_out = df_source.join(df_target, [id_col],"outer").select(*select_expr)
      return df_out

    Change Data Capture Implementation

    • target dataframe (table containing historical and current data)
    • source dataframe (incoming file)
    • identify changes in variables and update the target table
      • separate target data by live_flag
      • records already exist
      • no changes
      • join dataframes to create an updated target dataframe with history
      • dataframe to hold records exist in the incoming file
      • dataframe to hold records does not exist in the incoming file – looking at target table
      • dataframe storing updated records
      • save change histroy
      • join dataframes and update target table

    Code snippet listed below showing implementation of CDC

    # identify and save changes
    def process_save_changes(spark,logging,df_source,df_target,change_cols,targettable=None,pchtable=None):
      """
      identify changes in two dataframes
      
      parameters:
        df_source - dataframe containing new dataset
        df_target - existing datasets (target dataframe will updated accordingly)
        change_cols - list of varibles to for capturing chnages    
      """
      logging.info('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
      print('START - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
      change_col = "data_changes"
      col = df_source.columns[0]
      
      # separate target data by live_flag
      df_active = df_target.filter(df_target['live_flag'] == True) 
      df_inactive = df_target.filter(df_target['live_flag'] == False) 
      df_active.cache().count()
      df_inactive.cache().count()
      df_source.cache().count()
    
      # records already exist
      df_t = df_active.select(*(F.col(x).alias(x + '_t') for x in df_active.columns))
      df_amends = df_source.join(df_t,df_source[col] == df_t[col+'_t'],'inner')\
                               .select('*')\
                               .dropDuplicates([col]) 
      df_amends.cache().count()
      
      #  no changes
      df_no_changes = df_active.join(df_source,df_active[col] == df_source[col],"left")\
                    .where(df_source[col].isNull())\
                    .select(df_active['*'])
    
      # to keep history
      df_amends_prev = df_amends.select(df_t['*'])
      df_amends_prev = rename_cols(df_amends_prev,'_t','')
      df_amends_prev = df_amends_prev.withColumn("live_flag",F.lit(False))
      df_target_updated = df_no_changes.union(df_amends_prev)
      df_target_updated = df_target_updated.union(df_inactive)
    
      # join dataframes to create an updated target dataframe  without history
      #df_target_updated = df_no_changes.union(df_inactive)
      #df_target_updated.cache().count()
    
      # birth
      df_birth = df_source.join(df_target, on=[col], how='leftanti')
      df_birth = df_birth.withColumn(change_col,F.lit(2))
      df_birth.cache().count()
      
      # death
      df_death = df_target.join(df_source, on=[col], how='leftanti')
      df_death = df_death.withColumn(change_col,F.lit(99))
      df_death.cache().count()
      
      # identify updated records
      df_changes = find_changes(spark,logging,df_amends,change_cols)
      df_changes = df_changes.drop(*df_t.columns)
      
      # join new and updated source records
      df_all_changes = df_birth.union(df_changes)
      df_all_changes = df_all_changes.union(df_death)
      
      # save change history
      if pchtable is not None:
        save_change_history(spark,df_all_changes,pchtable)
          
      # join source and target
      df_changes = df_all_changes.drop(change_col)
      df_source_target = df_changes.union(df_target_updated)
      
      # save data
      if targettable is not None:
        save_target_data(spark,df_source_target,targettable)
    
      logging.info('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
      print('END - identify changes: {}'.format(datetime.now().strftime("%Y%m%d%H%M")))
                                 
      return df_source_target, df_all_changes

    Working Python script or Jupyter notebook can be found at https://gitlab.com/akkasali910/companieshouse-data