Testing is an important part of software development lifecycle. Identifying defects during development and fixing them before deploying code into production will save time and money. Most importantly it gives assurance to business that the quality of the code is acceptable and it is fit for purpose. A good testing depends on agreed acceptance criteria by all interested parties.
Testing Process (manual or automated)
Usually test process involves the following stages:
- Test Planning
- review requirements
- devise sprint test plan
- Test Design and Analysis
- test conditions
- test case design
- test scripts
- peer review
- traceability
- test data
- sign-off
- Test Execution
- functional testing
- non-functional testing
- reconciliation testing – counts checking
- Data validations
- new records
- updated records
- duplication or erroneous of records check
- data format and data types
- Defect raising/fixing
- Defect retesting and Regression testing
- Test Reports
- Test Closure Activities
Example of unit testing an ETL pipeline
PySpark is recommended Python API for Spark, it is easy to use and provides support for manipulating large datasets and performing ETL tasks.
Python provides good support for unit testing. We will use pytest for testing Companies House data as mentioned at the previous post. Unit testing ETL code is just one part of making sure that your pipeline is producing data fit for purpose. Usually PySpark ETL pipeline will take long time to complete therefore it is important to test various components before running ETL pipeline with large datasets.
Steps to unit testing PySpark pipelines.
We set up some directories and files as a baseline.
Setup directories and pytest
>> mkdir tests
>> touch tests/test_companieshouse_data.py
We need a test directory, a file to hold our unit tests test_companieshouse_data.py
.
import sys
sys.path.append('~/apps/notebooks')
import pytest
#local modules
import utils.sparksession as S
import utils.data_checks as dc
#get spark context
spark = S.spark
#extract data companies house data
df_t = spark.read.parquet('spark-warehouse/ch_20210301')
df_s = spark.read.parquet('spark-warehouse/ch_20210404')
#to ensure source record count is greater or equal to previous month row count
def test_row_count():
print("Target: {} Source: {}".format(df_t.count(),df_s.count()))
assert df_t.count() <= df_s.count()
#identify changes - verify it works
def test_find_changes(spark):
cols = [
'CompanyName',
'CompanyNumber',
'CompanyCategory',
'CompanyStatus',
'RegAddressPostCode',
'AccountsAccountCategory',
'SICCodeSicText_1'
]
df_changes = dc.find_changes(spark,df_s,df_t,cols)
print(df_changes.filter(df_changes.data_changes != '').groupby('data_changes').count().show(truncate=False))
assert df_changes.filter(df_changes.data_changes != '').count() > 0
Output from test run
$ pytest -s -q tests/test_companieshouse_data.py
21/05/08 13:32:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/05/08 13:32:47 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Target: 849999 Source: 849999
.CompanyName
CompanyName
CompanyNumber
CompanyCategory
CompanyStatus
RegAddressPostCode
AccountsAccountCategory
SICCodeSicText_1
.......
-- Docs: https://docs.pytest.org/en/stable/warnings.html
2 passed, 1 warning in 214.18s (0:03:34)