In this article, we will use data from the Companies House and impute SIC 2003 codes from the corresponding UK SIC 2007 found in the dataset.
Steps TO IMPUTE SIC 2003
- Get Weighted tables with percentages UK SIC 03 – UK SIC 07
- Process Companies House data
- Use data from step 1 to impute SIC2003 from SIC2007
- Assign SIC 2003 values to matching Companies records
A company can have up to 4 SIC codes. SIC Code of 99999 is used for dormant company or non-trading is 74990. Some generic codes are given below:
- 62090 – Other information technology service activities
- 74909 – Other professional, scientific and technical activities not classified elsewhere
- 82990 – Other business support service activities not classified elsewhere
- 96090 – Other service activities not classified elsewhere.
Approach TAKEN for imputing values
A SIC2007 code may have 1-to-many mapping to SIC2003. We need to ensure that all values of SIC2003 are assigned proportionally for a given SIC2007 code. For example the dataset has 432 records with SIC2007 27900 code and we need to distribute SIC2003 codes (31620, 29430, 31100 and 31200) accordingly based on weight (0.7, 0.15, 0.13, 0.0113, 0.003). The final distribution after ignoring 0 values for weight percentage index:
+-------+-----+ |Sic2003|count| +-------+-----+ | 29430| 72| | 31620| 288| | 32100| 72| +-------+-----+ Only using three SIC2003 codes which mapped to Sic2007 code 27900 will be used.
partitions or buckets
Create partitions to distribute SIC2003 codes based number of records which have corresponding SIC2007 values. This can be done using NTILE window function however it requires predefine value for the number of partitions. It seems you can implement NTILE using the formula below:
CEIL((RANK() * TOTAL_NUMBER_SIC03_CODES) / NUMBER_OF_RECORDS )
CEIL(RANK() OVER (PARTITION BY SICCode_1 ORDER BY CompanyNumber) * s.sic03count /
COUNT(CompanyNumber) OVER (PARTITION BY SICCode_1)) AS NO_PARTITIONS,
# s.sic03count is derived using:
CASE
WHEN (WeightIndex > 0) THEN SUM(WeightIndex/WeightIndex) OVER (PARTITION BY Sic2007)
ELSE 0
END AS sic03count,
+-----+-------+-------+---------------------+-----------+----------+---+
|rowno|Sic2003|Sic2007|WeightPCT |WeightIndex|sic03count|np |
+-----+-------+-------+---------------------+-----------+----------+---+
|1 |31620 |27900 |0.7034186019271911 |4.0 |6.0 |1 |
|1 |31620 |27900 |0.7034186019271911 |4.0 |6.0 |2 |
|1 |31620 |27900 |0.7034186019271911 |4.0 |6.0 |3 |
|1 |31620 |27900 |0.7034186019271911 |4.0 |6.0 |4 |
|2 |32100 |27900 |0.15102386407805274 |1.0 |6.0 |5 |
|3 |29430 |27900 |0.13048230402145936 |1.0 |6.0 |6 |
|4 |31100 |27900 |0.011763689585354366 |0.0 |0.0 |7 |
|5 |31200 |27900 |0.0033115403879425214|0.0 |0.0 |8 |
+-----+-------+-------+---------------------+-----------+----------+---+
You can see row 1 has been repeated 4 times because it has more more weights than others.
repeating rows n of times
Used positional explode function to repeat rows:
lateral view posexplode(split(space(WeightIndex - rowno),' ')) rge as diff, x
Excerpts from ETL script
# read saved data
df = spark.read.parquet('spark-warehouse/ch202003_parquet')
df.createOrReplaceTempView('ch')
# create dataframe tohold sic codes
file = "sicweightedtables0307_tcm77-261272.xls"
sic_pd = pd.read_excel(file,sheet_name="RU5digit")
df_sic = spark.createDataFrame(sic_pd)
df_sic.createOrReplaceTempView('sic2007')
# impute SIC 2003 codes
df2 = spark.sql("""WITH sic03 AS (
SELECT Sic2003,Sic2007,`Count %` AS Weight,
ROW_NUMBER() OVER (PARTITION BY Sic2007 ORDER BY `Count %` DESC) AS rowno,
`Count %`/SUM(`Count %`) OVER (PARTITION BY Sic2007) AS WeightPct,
ROUND(`Count %`/SUM(`Count %`) OVER (PARTITION BY Sic2007)*COUNT(Sic2003) OVER (PARTITION BY Sic2007)) AS WeightIndex
FROM sic2007
)
SELECT rowno, Sic2003,Sic2007,WeightPCT,WeightIndex,
CASE
WHEN (WeightIndex > 0) THEN SUM(WeightIndex/WeightIndex) OVER (PARTITION BY Sic2007)
ELSE 0
END AS sic03count,
ROW_NUMBER() OVER (PARTITION BY Sic2007 ORDER BY WeightIndex DESC) AS np
FROM sic03
lateral view posexplode(split(space(WeightIndex - rowno),' ')) rge as diff, x
""")
df2.createOrReplaceTempView('sic0307')
# create needed partitions by joining SIC codes with Companies House data
df3 = spark.sql(f"""
SELECT CompanyNumber,SICCode_1,CH_ROWNO,NO_ROWS,NO_PARTITIONS,SIC_NO_ROWS,SIC_LIST FROM (
SELECT CompanyNumber,SICCode_1,s.sic03count AS SIC_NO_ROWS,
ROW_NUMBER() OVER (PARTITION BY SICCode_1 ORDER BY CompanyNumber) CH_ROWNO,
COUNT(CompanyNumber) OVER (PARTITION BY SICCode_1) NO_ROWS,
CEIL(RANK() OVER (PARTITION BY SICCode_1 ORDER BY CompanyNumber) * s.sic03count /
COUNT(CompanyNumber) OVER (PARTITION BY SICCode_1)) AS NO_PARTITIONS,
COLLECT_LIST(concat_ws(',',{cols_sic})) OVER (PARTITION BY CompanyNumber) AS SIC_LIST
FROM ch
LEFT JOIN sic0307 s ON ch.SICCode_1 = s.Sic2007 AND s.np = 1
--WHERE SICCode_1 in ('05552','82990','43999','27900')
) ch_s
""")
df3.createOrReplaceTempView('ch82990')
# assign imputed SIC2003 for given SIC2007
df4 = spark.sql("""
WITH crn AS (
SELECT *
FROM ch82990)
SELECT c.*,s.Sic2003
FROM crn c
LEFT JOIN sic0307 s ON c.SICCode_1 = s.Sic2007 AND s.np = c.NO_PARTITIONS
""")