In the previous article – Processing GLIEF data in JSON format, described how to ingest data into Databricks (community edition) data lake using PySpark. However we were unable to process GLEIF Golden Copy JSON format file due memory issue and complexity of nested JSON objects. The input file size (after unzipping the file size was over 6.2GB) did not help while using spark.read.json and failed due to lack of driver memory space. Needed different approach. We tried processing GLIEF files in XML instead of JSON and it worked nicely.
GLIEF Golden Copy
Latest file contains around 2306465 records.
root
|-- _xmlns:lei: string (nullable = true)
|-- lei:Entity: struct (nullable = true)
| |-- lei:EntityCategory: string (nullable = true)
| |-- lei:EntityCreationDate: timestamp (nullable = true)
| |-- lei:EntityStatus: string (nullable = true)
| |-- lei:EntitySubCategory: string (nullable = true)
| |-- lei:HeadquartersAddress: struct (nullable = true)
| | |-- _xml:lang: string (nullable = true)
| | |-- lei:AdditionalAddressLine: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- lei:AddressNumber: string (nullable = true)
| | |-- lei:AddressNumberWithinBuilding: string (nullable = true)
| | |-- lei:City: string (nullable = true)
| | |-- lei:Country: string (nullable = true)
| | |-- lei:FirstAddressLine: string (nullable = true)
| | |-- lei:MailRouting: string (nullable = true)
| | |-- lei:PostalCode: string (nullable = true)
| | |-- lei:Region: string (nullable = true)
| |-- lei:LegalAddress: struct (nullable = true)
| | |-- _xml:lang: string (nullable = true)
| | |-- lei:AdditionalAddressLine: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- lei:AddressNumber: string (nullable = true)
| | |-- lei:AddressNumberWithinBuilding: string (nullable = true)
| | |-- lei:City: string (nullable = true)
| | |-- lei:Country: string (nullable = true)
| | |-- lei:FirstAddressLine: string (nullable = true)
| | |-- lei:MailRouting: string (nullable = true)
| | |-- lei:PostalCode: string (nullable = true)
| | |-- lei:Region: string (nullable = true)
| |-- lei:LegalEntityEvents: struct (nullable = true)
| | |-- lei:LegalEntityEvent: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- _event_status: string (nullable = true)
| | | | |-- _group_id: string (nullable = true)
| | | | |-- _group_sequence_no: long (nullable = true)
| | | | |-- _group_type: string (nullable = true)
| | | | |-- lei:AffectedFields: struct (nullable = true)
| | | | | |-- lei:AffectedField: array (nullable = true)
| | | | | | |-- element: struct (containsNull = true)
| | | | | | | |-- _VALUE: string (nullable = true)
| | | | | | | |-- _field_xpath: string (nullable = true)
| | | | |-- lei:LegalEntityEventEffectiveDate: timestamp (nullable = true)
| | | | |-- lei:LegalEntityEventRecordedDate: timestamp (nullable = true)
| | | | |-- lei:LegalEntityEventType: string (nullable = true)
| | | | |-- lei:ValidationDocuments: string (nullable = true)
| | | | |-- lei:ValidationReference: string (nullable = true)
| |-- lei:LegalForm: struct (nullable = true)
| | |-- lei:EntityLegalFormCode: string (nullable = true)
| | |-- lei:OtherLegalForm: string (nullable = true)
| |-- lei:LegalJurisdiction: string (nullable = true)
| |-- lei:LegalName: struct (nullable = true)
| | |-- _VALUE: string (nullable = true)
| | |-- _xml:lang: string (nullable = true)
| |-- lei:NextVersion: string (nullable = true)
| |-- lei:OtherAddresses: struct (nullable = true)
| | |-- lei:OtherAddress: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- _type: string (nullable = true)
| | | | |-- _xml:lang: string (nullable = true)
| | | | |-- lei:AdditionalAddressLine: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- lei:AddressNumber: string (nullable = true)
| | | | |-- lei:AddressNumberWithinBuilding: string (nullable = true)
| | | | |-- lei:City: string (nullable = true)
| | | | |-- lei:Country: string (nullable = true)
| | | | |-- lei:FirstAddressLine: string (nullable = true)
| | | | |-- lei:MailRouting: string (nullable = true)
| | | | |-- lei:PostalCode: string (nullable = true)
| | | | |-- lei:Region: string (nullable = true)
| |-- lei:OtherEntityNames: struct (nullable = true)
| | |-- lei:OtherEntityName: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- _VALUE: string (nullable = true)
| | | | |-- _type: string (nullable = true)
| | | | |-- _xml:lang: string (nullable = true)
| |-- lei:RegistrationAuthority: struct (nullable = true)
| | |-- lei:OtherRegistrationAuthorityID: string (nullable = true)
| | |-- lei:RegistrationAuthorityEntityID: string (nullable = true)
| | |-- lei:RegistrationAuthorityID: string (nullable = true)
| |-- lei:SuccessorEntity: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- lei:SuccessorEntityName: struct (nullable = true)
| | | | |-- _VALUE: string (nullable = true)
| | | | |-- _xml:lang: string (nullable = true)
| | | |-- lei:SuccessorLEI: string (nullable = true)
| |-- lei:TransliteratedOtherAddresses: struct (nullable = true)
| | |-- lei:TransliteratedOtherAddress: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- _type: string (nullable = true)
| | | | |-- _xml:lang: string (nullable = true)
| | | | |-- lei:AdditionalAddressLine: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- lei:AddressNumber: string (nullable = true)
| | | | |-- lei:AddressNumberWithinBuilding: string (nullable = true)
| | | | |-- lei:City: string (nullable = true)
| | | | |-- lei:Country: string (nullable = true)
| | | | |-- lei:FirstAddressLine: string (nullable = true)
| | | | |-- lei:MailRouting: string (nullable = true)
| | | | |-- lei:PostalCode: string (nullable = true)
| | | | |-- lei:Region: string (nullable = true)
| |-- lei:TransliteratedOtherEntityNames: struct (nullable = true)
| | |-- lei:TransliteratedOtherEntityName: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- _VALUE: string (nullable = true)
| | | | |-- _type: string (nullable = true)
| | | | |-- _xml:lang: string (nullable = true)
|-- lei:Extension: struct (nullable = true)
| |-- ext:CIF: struct (nullable = true)
| | |-- _VALUE: string (nullable = true)
| | |-- _xmlns:ext: string (nullable = true)
| |-- leifr:EconomicActivity: struct (nullable = true)
| | |-- _xmlns:leifr: string (nullable = true)
| | |-- leifr:NACEClassCode: double (nullable = true)
| | |-- leifr:SousClasseNAF: string (nullable = true)
| |-- leifr:FundManagerBusinessRegisterID: struct (nullable = true)
| | |-- _VALUE: long (nullable = true)
| | |-- _xmlns:leifr: string (nullable = true)
| |-- leifr:FundNumber: struct (nullable = true)
| | |-- _VALUE: string (nullable = true)
| | |-- _xmlns:leifr: string (nullable = true)
| |-- leifr:LegalFormCodification: struct (nullable = true)
| | |-- _VALUE: long (nullable = true)
| | |-- _uri: string (nullable = true)
| | |-- _xmlns:leifr: string (nullable = true)
| |-- leifr:SIREN: struct (nullable = true)
| | |-- _VALUE: long (nullable = true)
| | |-- _xmlns:leifr: string (nullable = true)
|-- lei:LEI: string (nullable = true)
|-- lei:NextVersion: string (nullable = true)
|-- lei:Registration: struct (nullable = true)
| |-- lei:InitialRegistrationDate: timestamp (nullable = true)
| |-- lei:LastUpdateDate: timestamp (nullable = true)
| |-- lei:ManagingLOU: string (nullable = true)
| |-- lei:NextRenewalDate: timestamp (nullable = true)
| |-- lei:NextVersion: string (nullable = true)
| |-- lei:OtherValidationAuthorities: struct (nullable = true)
| | |-- lei:OtherValidationAuthority: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- lei:OtherValidationAuthorityID: string (nullable = true)
| | | | |-- lei:ValidationAuthorityEntityID: string (nullable = true)
| | | | |-- lei:ValidationAuthorityID: string (nullable = true)
| |-- lei:RegistrationStatus: string (nullable = true)
| |-- lei:ValidationAuthority: struct (nullable = true)
| | |-- lei:OtherValidationAuthorityID: string (nullable = true)
| | |-- lei:ValidationAuthorityEntityID: string (nullable = true)
| | |-- lei:ValidationAuthorityID: string (nullable = true)
| |-- lei:ValidationSources: string (nullable = true)
Data Pipeline
- Download data from GLIEF website
- Create a dataframe
- Write the dataframe to BigQuery
# create dataframwe
df = spark.read.format('xml').options(rowTag='lei:LEIRecord').load('file:///databricks/driver/20230125-gleif-concatenated-file-lei2.xml')
# select required fields
df1 = (df.select(F.col('lei:LEI').alias('LEI'),
F.col('lei:Entity.lei:EntityCategory').alias('EntityCategory'),
F.col('lei:Entity.lei:LegalName._VALUE').alias('LegalName'),
F.col('lei:Entity.lei:LegalForm.lei:EntityLegalFormCode').alias('LegalFormCode'),
F.col('lei:Entity.lei:LegalForm.lei:OtherLegalForm').alias('OtherLegalForm'),
F.col('lei:Entity.lei:EntityStatus').alias('EntityStatus'),
F.col('lei:Entity.lei:LegalAddress.lei:Country').alias('Country'),
F.col('lei:Entity.lei:LegalJurisdiction').alias('Jurisdiction'),
F.col('lei:Registration.lei:InitialRegistrationDate').alias('InitialRegistrationDate'),
F.col('lei:Registration.lei:LastUpdateDate').alias('LastUpdateDate'),
F.col('lei:Registration.lei:ManagingLOU').alias('ManagingLOU'),
F.col('lei:Registration.lei:NextRenewalDate').alias('NextRenewalDate'))
)
# save data into bigquery
df.write \
.format("bigquery") \
.option("temporaryGcsBucket", "gsdatabucket/temp/") \
.option("parentProject", "parent-project") \
.option("dataset", "dataset") \
.save("project.dataset.glief_lei_cc")
Prerequisites for BigQuery
- install jar files
- gcs_connector_hadoop3_latest.jar
- spark_bigquery_latest_2_12.jar
- service account key file and make it available to Spark driver
- dbutils.fs.cp(“/FileStore/key/glief_sevice.json”,”file:///tmp/service.json”)
Spark entry point for Google CoLab
spark = (SparkSession.builder
.config("spark.sql.warehouse.dir","./spark-warehouse")
.config("spark.driver.memory","4g")
.config("spark.executor.memory","8g")
.config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.17.0") \
.appName('pyspark-xml')
.enableHiveSupport()
.getOrCreate()
)
Leave a Reply