Category: ETL

  • Capturing delta data changes between two datasets

    Suppose you want to capture delta data changes on key variables as part of the data received. In most cases the dataset will not provide change history. You have to figure it out for yourself which variables were updated by comparing the values from the previous stored data.

    In this article, we will share an approach for keeping history of data changes. You may group changes into the following three categories:

    1. transaction type 0 – for initial data load into your data warehouse
    2. type 2 for no changes
    3. type 3 onwards for changes. You may want to group variables which are related to each other, such as address group – addressline1, addressline2, addressline3 and postcode.
    4. type 99 for records are not present in the source data

    Using data from Companies House

    See previous post for data structure and data quality check.

    For example, need to keep change history for the following variables:

    CompanyName
    
    RegAddressAddressLine1
    RegAddressPostCode
    
    CompanyCategory
    CompanyStatus
    
    AccountsNextDueDate
    
    SICCodeSicText_1
    
    Note: For simplicity we are using variable name for transaction type. 

    Steps for identifying only changes:

    • join the two datasets and append suffice to target column names
    • for key variables, compare the value of source (new dataset) with. the target (existing dataset).
    • create a new variable to keep the change history
    • drop the target columns. and return dataframe with the new field
    #process change history
    def find_changes(spark,df_source,df_source_exists,cols):
      change_col = "data_changes"
      col = df_source.columns[0]
      #identify updated records
      df_t = df_source_exists.select(*(F.col(x).alias(x + '_t') for x in df_source_exists.columns))
      df_changes = df_source.join(df_t,df_source[col] == df_t[col+'_t'],"inner")\
                    .select('*')
      df_changes = df_changes.withColumn(change_col,F.lit(""))  
      i = 0
      for col in cols:
        print(col) 
        i=i+1
        df_changes = df_changes.withColumn(change_col, dc.add_to_string_col(
          df_changes[change_col],f"{col}",
          dc.check_changes(df_changes[col],df_changes[col+'_t'])))
        # incase if you want to avoid over writing previous non null values
        #df_changes = df_changes.withColumn(col,F.coalesce(df_changes[col],df_changes[col+'_t']))
        df_changes.cache()
      #drop columns
      df_changes = df_changes.drop(*df_t.columns)
      
      return df_changes
    cols = [
      'CompanyName',
      'RegAddressAddressLine1',
      'RegAddressPostCode',
      'CompanyCategory',
      'CompanyStatus',
      'AccountsNextDueDate',
      'SICCodeSicText_1'
    ]
    df_changes = find_changes(spark,df_source,df_target,cols)
    #show changes
    df_changes.filter(df_changes.data_changes != '').groupby('data_changes').count().show(truncate=False)
    
    Capturing changes

    It appears that April’s data has 2 records (out of 849999), where variables – RegAddressPostCode and SICCodeSicText_1 were changed from the previous month.

    Displaying the changes

    Cross checking result by inspecting data in both source and target.

    cross checking record in both datasets