Capturing delta data changes between two datasets

Suppose you want to capture delta data changes on key variables as part of the data received. In most cases the dataset will not provide change history. You have to figure it out for yourself which variables were updated by comparing the values from the previous stored data.

In this article, we will share an approach for keeping history of data changes. You may group changes into the following three categories:

  1. transaction type 0 – for initial data load into your data warehouse
  2. type 2 for no changes
  3. type 3 onwards for changes. You may want to group variables which are related to each other, such as address group – addressline1, addressline2, addressline3 and postcode.
  4. type 99 for records are not present in the source data

Using data from Companies House

See previous post for data structure and data quality check.

For example, need to keep change history for the following variables:

CompanyName

RegAddressAddressLine1
RegAddressPostCode

CompanyCategory
CompanyStatus

AccountsNextDueDate

SICCodeSicText_1

Note: For simplicity we are using variable name for transaction type. 

Steps for identifying only changes:

  • join the two datasets and append suffice to target column names
  • for key variables, compare the value of source (new dataset) with. the target (existing dataset).
  • create a new variable to keep the change history
  • drop the target columns. and return dataframe with the new field
#process change history
def find_changes(spark,df_source,df_source_exists,cols):
  change_col = "data_changes"
  col = df_source.columns[0]
  #identify updated records
  df_t = df_source_exists.select(*(F.col(x).alias(x + '_t') for x in df_source_exists.columns))
  df_changes = df_source.join(df_t,df_source[col] == df_t[col+'_t'],"inner")\
                .select('*')
  df_changes = df_changes.withColumn(change_col,F.lit(""))  
  i = 0
  for col in cols:
    print(col) 
    i=i+1
    df_changes = df_changes.withColumn(change_col, dc.add_to_string_col(
      df_changes[change_col],f"{col}",
      dc.check_changes(df_changes[col],df_changes[col+'_t'])))
    # incase if you want to avoid over writing previous non null values
    #df_changes = df_changes.withColumn(col,F.coalesce(df_changes[col],df_changes[col+'_t']))
    df_changes.cache()
  #drop columns
  df_changes = df_changes.drop(*df_t.columns)
  
  return df_changes
cols = [
  'CompanyName',
  'RegAddressAddressLine1',
  'RegAddressPostCode',
  'CompanyCategory',
  'CompanyStatus',
  'AccountsNextDueDate',
  'SICCodeSicText_1'
]
df_changes = find_changes(spark,df_source,df_target,cols)
#show changes
df_changes.filter(df_changes.data_changes != '').groupby('data_changes').count().show(truncate=False)
Capturing changes

It appears that April’s data has 2 records (out of 849999), where variables – RegAddressPostCode and SICCodeSicText_1 were changed from the previous month.

Displaying the changes

Cross checking result by inspecting data in both source and target.

cross checking record in both datasets