Processing GLIEF data in JSON format

UPDATE – 13/04/2024 – There is a work around which involved updating JSON file using linux tools such as – sed and awk. The file contains array of JSON objects. The issue is that Apache Spark read the whole contents instead of processing each JSON object.

Problem

Get dataset and unzip the dataset i.e. after unzip https://leidata-preview.gleif.org/storage/golden-copy-files/2024/03/13/905647/20240313-0800-gleif-goldencopy-lei2-golden-copy.json.zip# (832M) file size increase to 11GB

  • not enough memory available in Google Colab to process this dataset
  • json file contains array of json objects and each record is spread across many lines (multilines json object).

Workaround

  • Use sed to replace first and last line (i.e. remove array notation)
  • awk to insert newline after end of each json object
  • awk to remove leading and trailing white spaces (optional step). It reduces the file size to 6.1 GB

Time taken

It took about 30 mins to read and save data as parquet. Not an ideal solution but does the job.

read large json file using apache spark

UPDATE – 24/01/2023 – GLIEF golden copy is a very complex json file and needs more driver memory. Using wholeTextFiles may not work – increase driver memory and use spark.read.json(filename,multiLine=True)

!git clone https://gitlab.com/akkasali910/companieshouse-data
!pip3 install google.colab
  • you need to install pyspark in Google Colab
    • pip3 install pyspark
    • use code below to crerate spark context
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime
import logging

spark = SparkSession.builder\
       .config("spark.sql.warehouse.dir","./spark-warehouse") \
       .appName('pyspark-cdc')\
       .enableHiveSupport()\
       .getOrCreate()
  • remove metadata
listFiles = dbutils.fs.ls("/databricks/driver/gleif")
dbutils.fm.fm("/databricks/driver/gleif", recurse=True)

Glief file is too large

  • reading it using spark.read.json with multiLine=True may not work
  • workaround – use WholeTextFiles
from pyspark import StorageLevel
multiline_rdd=sc.wholeTextFiles('dbfs:/FileStore/shared_uploads/info@mirpurutc.com/lei_sample.json')
multiline_rdd.persist(StorageLevel.MEMORY_AND_DISK)
import re
json_rdd = multiline_rdd.map(lambda x : x[1]).map(lambda x : re.sub(r"\s+", "", x,flags=re.UNICODE))
df = spark.read.json(json_rdd)
glief data

Processing GLIEF Reporting Exceptions file

# By default, spark considers every record in a JSON file as a fully qualified record in a single line.
#glief_file = 'file:///tmp/20230120-0800-gleif-goldencopy-lei2-golden-copy.json'
glief_file = 'file:///tmp/20230124-0800-gleif-goldencopy-repex-golden-copy.json'
from pyspark import StorageLevel
df = spark.read.json(glief_file,multiLine=True) 
df.persist(StorageLevel.MEMORY_AND_DISK)
df.printSchema()
GLIEF Reporting Exceptions
GLIEF Reporting Exceptions

Ref: https://www.gleif.org/en/lei-data/gleif-data-quality-management

Sample golden copy record

{"records":[
{
    "LEI": {
        "$": "529900P5TAD0ABFTMV10"
    },
    "Entity": {
        "LegalName": {
            "@xml:lang": "de",
            "$": "Land Hessen"
        },
        "LegalAddress": {
            "@xml:lang": "de",
            "FirstAddressLine": {
                "$": "Friedrich-Ebert-Allee 8"
            },
            "City": {
                "$": "Wiesbaden"
            },
            "Region": {
                "$": "DE-HE"
            },
            "Country": {
                "$": "DE"
            },
            "PostalCode": {
                "$": "65185"
            }
        },
        "HeadquartersAddress": {
            "@xml:lang": "de",
            "FirstAddressLine": {
                "$": "Friedrich-Ebert-Allee 8"
            },
            "City": {
                "$": "Wiesbaden"
            },
            "Region": {
                "$": "DE-HE"
            },
            "Country": {
                "$": "DE"
            },
            "PostalCode": {
                "$": "65185"
            }
        },
        "RegistrationAuthority": {
            "RegistrationAuthorityID": {
                "$": "RA999999"
            }
        },
        "LegalJurisdiction": {
            "$": "DE"
        },
        "EntityCategory": {
            "$": "RESIDENT_GOVERNMENT_ENTITY"
        },
        "EntitySubCategory": {
            "$": "STATE_GOVERNMENT"
        },
        "LegalForm": {
            "EntityLegalFormCode": {
                "$": "8888"
            },
            "OtherLegalForm": {
                "$": "Legal Entity of Public Law"
            }
        },
        "EntityStatus": {
            "$": "ACTIVE"
        },
        "EntityCreationDate": {
            "$": "1945-09-19T00:00:00+02:00"
        }
    },
    "Registration": {
        "InitialRegistrationDate": {
            "$": "2017-07-14T16:41:21+02:00"
        },
        "LastUpdateDate": {
            "$": "2021-05-25T17:07:10.059+02:00"
        },
        "RegistrationStatus": {
            "$": "ISSUED"
        },
        "NextRenewalDate": {
            "$": "2021-07-14T16:41:21+02:00"
        },
        "ManagingLOU": {
            "$": "5299000J2N45DDNE4Y28"
        },
        "ValidationSources": {
            "$": "ENTITY_SUPPLIED_ONLY"
        },
        "ValidationAuthority": {
            "ValidationAuthorityID": {
                "$": "RA999999"
            }
        }
    },
    "Extension": null
},
{
    "LEI": {
        "$": "5493007BPZS3LU7J0U61"
    },
    "Entity": {
        "LegalName": {
            "@xml:lang": "en",
            "$": "Canadian Air Transport Security Authority"
        },
        "OtherEntityNames": {
            "OtherEntityName": [
                {
                    "@xml:lang": "fr",
                    "@type": "ALTERNATIVE_LANGUAGE_LEGAL_NAME",
                    "$": "Administration canadienne de la sûreté du transport aérien"
                }
            ]
        },
        "LegalAddress": {
            "@xml:lang": "en",
            "FirstAddressLine": {
                "$": "Sun Life Financial Centre"
            },
            "AdditionalAddressLine": [
                {
                    "$": "99 Bank Street"
                },
                {
                    "$": "5th Floor"
                }
            ],
            "City": {
                "$": "Ottawa"
            },
            "Region": {
                "$": "CA-ON"
            },
            "Country": {
                "$": "CA"
            },
            "PostalCode": {
                "$": "K1P 6B9"
            }
        },
        "HeadquartersAddress": {
            "@xml:lang": "en",
            "FirstAddressLine": {
                "$": "Sun Life Financial Centre"
            },
            "AdditionalAddressLine": [
                {
                    "$": "99 Bank Street"
                },
                {
                    "$": "5th Floor"
                }
            ],
            "City": {
                "$": "Ottawa"
            },
            "Region": {
                "$": "CA-ON"
            },
            "Country": {
                "$": "CA"
            },
            "PostalCode": {
                "$": "K1P 6B9"
            }
        },
        "RegistrationAuthority": {
            "RegistrationAuthorityID": {
                "$": "RA999999"
            }
        },
        "LegalJurisdiction": {
            "$": "CA"
        },
        "EntityCategory": {
            "$": "RESIDENT_GOVERNMENT_ENTITY"
        },
        "EntitySubCategory": {
            "$": "CENTRAL_GOVERNMENT"
        },
        "LegalForm": {
            "EntityLegalFormCode": {
                "$": "8888"
            },
            "OtherLegalForm": {
                "$": "Crown Corporation"
            }
        },
        "EntityStatus": {
            "$": "ACTIVE"
        },
        "EntityCreationDate": {
            "$": "2002-04-01T00:00:00-04:00"
        }
    },
    "Registration": {
        "InitialRegistrationDate": {
            "$": "2016-11-01T02:40:00+01:00"
        },
        "LastUpdateDate": {
            "$": "2021-05-25T17:17:17.512+02:00"
        },
        "RegistrationStatus": {
            "$": "ISSUED"
        },
        "NextRenewalDate": {
            "$": "2021-09-01T18:25:00+02:00"
        },
        "ManagingLOU": {
            "$": "EVK05KS7XY1DEII3R011"
        },
        "ValidationSources": {
            "$": "ENTITY_SUPPLIED_ONLY"
        },
        "ValidationAuthority": {
            "ValidationAuthorityID": {
                "$": "RA999999"
            }
        }
    },
    "Extension": null
}]}

Error messages

Reading large JSON file with an array of JSON objects may result in OOM (Java Out of Memory exception). If it does occur then the Java Garbage Collector (GC) will not be able free up memory spaces and you need to restart the Java session again.

Spark Context used to process JSON file on Google CoLab

Example of Schema