Tag: GCP

  • Processing Companies House PSC stream data

    Need to access PSC stream API and extract information about persons with significant control. However the result has personal information such name, part of date of birth, nationality and address with postcode. Please read following articles – Processing UK Companies House PSC Data and Companies House Stream API more detailed information on Companies House PSC data.

    We are only interested in the following fields without identifying individuals so that information can be linked with Companies House Profile data:

    • crn
    • resource_id
    • resource_kind
    • event_type
    • event_timepoint
    • event_published_at
    • country_of_residence
    • nationality
    • notified_on
    • natures_of_control
    • postal_code
    • address_line_1
    • locality
    • country

    Technical approach

    We will use Google Cloud Platform (GCP) and free PostgreSQL database at Neon Free Tier.

    • Google Cloud Shell – python script which make call to Companies House stream API using python requests package with stream option and setting as true
    import requests
    url = 'https://stream.companieshouse.gov.uk/persons-with-significant-control'
    stream_timeout = 600   # time out ten mins just as example, no need if not wanted
    print('Streaming URL', url)
    r = requests.get(url, auth=(stream_ch_api_key, ''), stream=True, timeout=stream_timeout)
    print(r.status_code, r.headers)
    • save payload with relevant data to database (at Neon Free Tier PostgreSQL)
    • process JSONB payload data and create SQL view for accessing required data

    Save JSON data to database

    import dataset
    # url
    def construct_pg_url(postgres_user='username', postgres_password='password', postgres_host='eneon.tech', postgres_port='5432', postgres_database='neondb'):
      PG_URL = "postgresql://" + postgres_user + ":" + postgres_password + '@' + postgres_host + ':' + postgres_port + '/' + postgres_database
      return PG_URL
    pgconn = dataset.Database(url=construct_pg_url(),schema='public')
    # save data
    def add_data(data):
      table = pgconn['table_name']
      row = json.loads(data)
      table.insert(dict(payload=row))
      return True

    SQL View

    drop view companies_house_psc;
    create view companies_house_psc as (
    select
    substring(payload -> 'data' -> 'links' ->> 'self',10,8) as crn,
    payload ->> 'resource_id'  as resource_id,
    payload ->> 'resource_kind' as resource_kind,
    payload -> 'event' ->> 'type'  as event_type,
    payload -> 'event' ->> 'timepoint'  as event_timepoint,
    payload -> 'event' ->> 'published_at'  as event_published_at,
    payload -> 'data' ->> 'country_of_residence'  as country_of_residence,
    payload -> 'data' ->> 'nationality'  as nationality,
    payload -> 'data' ->> 'notified_on'  as notified_on,
    payload -> 'data' ->> 'natures_of_control'  as natures_of_control,
    payload -> 'data' -> 'address' ->> 'postal_code' as postal_code,
    payload -> 'data' -> 'address' ->> 'address_line_1' as address_line_1,
    payload -> 'data' -> 'address' ->> 'locality' as locality,
    payload -> 'data' -> 'address' ->> 'country' as country
    from ch where id > 10
    )

    Reading data

    from sqlalchemy import create_engine
    CONNSTR = f'postgresql://username:password@hostdb.neon.tech/databasename?options=endpoint%3Dproject_id'
    engine = create_engine(CONNSTR)
    pgconn = engine.connect()
    
    from sqlalchemy import text
    query = text("SELECT * FROM companies_house_psc")
    
    # save result as pandas dataframe
    import pandas as pd
    psc_pd = pd.read_sql(query,con=pgconn)
    
    # save the frame as parquet file
    parquet_file = 'psc.parquet'
    psc_pd.to_parquet(parquet_file, engine = 'pyarrow', compression = 'snappy')
    PSC data

    Company Profile end point

    Request

    GET https://stream.companieshouse.gov.uk/companies
    create view companies_house_profile as (
    select
    payload ->> 'resource_id'  as resource_id,
    payload ->> 'resource_kind' as resource_kind,
    payload -> 'event' ->> 'type'  as event_type,
    payload -> 'event' ->> 'timepoint'  as event_timepoint,
    payload -> 'event' ->> 'published_at'  as event_published_at,
    payload -> 'data' ->> 'type'  as company_type,
    payload -> 'data' ->> 'company_number'  as company_number,
    payload -> 'data' ->> 'company_name'  as company_name,
    payload -> 'data' ->> 'jurisdiction'  as jurisdiction,
    payload -> 'data' ->> 'company_status'  as company_status,
    payload -> 'data' ->> 'date_of_creation'  as date_of_creation,
    payload -> 'data' ->> 'sic_codes'  as sic_codes,
    payload -> 'data' -> 'registered_office_address' ->> 'country' as country,
    payload -> 'data' -> 'registered_office_address' ->> 'locality' as locality,
    payload -> 'data' -> 'registered_office_address' ->> 'postal_code' as postal_code,
    payload -> 'data' -> 'registered_office_address' ->> 'address_line_1' as address_line_1,
    payload -> 'data' -> 'registered_office_address' ->> 'address_line_2' as address_line_2,
    payload -> 'data' ->> 'can_file' as can_file,
    payload -> 'data' -> 'accounts' ->> 'next_due' as accounts_next_due,
    payload -> 'data' -> 'accounts' ->> 'last_accounts' as accounts_last_accounts,
    payload -> 'data' -> 'accounts' -> 'next_accounts' ->> 'due_on' as next_accounts_due_on,
    payload -> 'data' -> 'accounts' -> 'next_accounts' ->> 'period_end_on' as next_accounts_period_end_on,
    payload -> 'data' -> 'accounts' -> 'next_accounts' ->> 'period_start_on' as next_accounts_period_start_on,
    payload -> 'data' -> 'accounts' ->> 'next_made_up_to' as next_made_up_to_date,
    payload -> 'data' -> 'accounts' ->> 'accounting_reference_date' as next_accounting_reference_date,
    payload -> 'data' -> 'links' ->> 'self' as links_self,
    payload -> 'data' -> 'links' ->> 'officers' as links_officers,
    payload -> 'data' -> 'links' ->> 'filing_history' as links_filing_history,
    payload -> 'data' -> 'links' ->> 'persons_with_significant_control' as links_persons_with_significant_control
    from ch 
    where payload->>'resource_kind'='company-profile'
    )
    
    # sample data
    {
       "data": {
          "etag": "978592003273fe2cb5bc47d404feea3e1d92a883",
          "type": "ltd",
          "links": {
             "self": "/company/14952843",
             "officers": "/company/14952843/officers",
             "filing_history": "/company/14952843/filing-history",
             "persons_with_significant_control": "/company/14952843/persons-with-significant-control"
          },
          "accounts": {
             "next_due": "2025-03-22",
             "last_accounts": {
                "type": "null"
             },
             "next_accounts": {
                "due_on": "2025-03-22",
                "period_end_on": "2024-06-30",
                "period_start_on": "2023-06-22"
             },
             "next_made_up_to": "2024-06-30",
             "accounting_reference_date": {
                "day": "30",
                "month": "06"
             }
          },
          "can_file": true,
          "sic_codes": [
             "47990"
          ],
          "company_name": "AL ARABIAN MARKETING LTD",
          "jurisdiction": "england-wales",
          "company_number": "14952843",
          "company_status": "active",
          "date_of_creation": "2023-06-22",
          "confirmation_statement": {
             "next_due": "2024-07-05",
             "next_made_up_to": "2024-06-21"
          },
          "registered_office_address": {
             "country": "United Kingdom",
             "locality": "Poole, Dorset,",
             "postal_code": "BH16 6FA",
             "address_line_1": "Lytchett House",
             "address_line_2": "13 Freeland Park, Wareham Road,"
          }
       },
       "event": {
          "type": "changed",
          "timepoint": 62657738,
          "published_at": "2023-06-22T11:30:11"
       },
       "resource_id": "14952843",
       "resource_uri": "/company/14952843",
       "resource_kind": "company-profile"
    }
    COMPANIES HOUSSE PROFILE

    Company filing history stream

    This stream returns company filing history information, such as form type, description and dates.

    Request

    GET https://stream.companieshouse.gov.uk/filings

    Insolvency cases stream

    This stream returns information about company insolvency cases.

    Request

    GET https://stream.companieshouse.gov.uk/insolvency-cases

    Charges stream

    This stream returns information about company charges.

    Request

    GET https://stream.companieshouse.gov.uk/charges

    Officers stream

    This stream will return information about company officers.

    Request

    GET https://stream.companieshouse.gov.uk/officers

    PSC statements stream

    This stream returns information about persons with significant control statements.

    Request

    GET https://stream.companieshouse.gov.uk/persons-with-significant-control-statements

    Disqualified officers stream

    This stream returns information about disqualified officers (natural and corporate).

    Request

    GET https://stream.companieshouse.gov.uk/disqualified-officers

    Company exemptions stream

    The streaming API for company exemptions is currently still in development. It will be available at a later date.

    When complete, this stream will return information about company exemptions.

    Request

    GET https://stream.companieshouse.gov.uk/company-exemptions

    PSC statements stream

    This stream returns information about persons with significant control statements.

    Request

    GET https://stream.companieshouse.gov.uk/persons-with-significant-control-statements

    Metadata

    Extract metadata using:

    query = text("""
    select * from ch where payload ->> 'resource_kind' = 'company-psc-legal'
    """)
    conn = engine.connect()
    df = pd.read_sql_query(query, conn)
    df = pd.json_normalize(df['payload'])
    df.info()
    legal-person-beneficial-owner
    resource_id
    resource_uri
    resource_kind
    data.etag
    data.kind
    data.name
    data.links.self
    data.address.country
    data.address.locality
    data.address.premises
    data.address.postal_code
    data.address.address_line_1
    data.address.address_line_2
    data.notified_on
    data.identification.legal_form
    data.identification.legal_authority
    data.natures_of_control
    data.principal_office_address.country
    data.principal_office_address.locality
    data.principal_office_address.premises
    data.principal_office_address.postal_code
    data.principal_office_address.address_line_1
    data.principal_office_address.address_line_2
    event.type
    event.timepoint
    event.published_at
    legal-person-beneficial-owner
    company-psc-supersecure
    resource_id
    resource_uri
    resource_kind
    data.etag
    data.kind
    data.links.self
    data.description
    event.type
    event.timepoint
    event.published_at
    company-psc-supersecure
    individual-beneficial-owner
    resource_id
    resource_uri
    resource_kind
    data.etag
    data.kind
    data.name
    data.links.self
    data.address.region
    data.address.country
    data.address.locality
    data.address.premises
    data.address.address_line_1
    data.address.address_line_2
    data.nationality
    data.notified_on
    data.date_of_birth.year
    data.date_of_birth.month
    data.name_elements.surname
    data.name_elements.forename
    data.natures_of_control
    data.address.postal_code
    data.is_sanctioned
    data.name_elements.middle_name
    data.name_elements.title
    event.type
    event.timepoint
    event.published_at
    individual-beneficial-owner
    company-psc-corporate
    resource_id
    resource_uri
    resource_kind
    data.etag
    data.kind
    data.name
    data.links.self
    data.address.country
    data.address.locality
    data.address.premises
    data.address.postal_code
    data.address.address_line_1
    data.address.po_box
    data.address.region
    data.address.address_line_2
    data.ceased_on
    data.notified_on
    data.identification.legal_form
    data.identification.legal_authority
    data.identification.place_registered
    data.identification.country_registered
    data.identification.registration_number
    data.natures_of_control
    event.type
    event.timepoint
    event.published_at
    company-psc-corporate
    corporate-entity-beneficial-owner
    resource_id
    resource_uri
    resource_kind
    data.etag
    data.kind
    data.name
    data.links.self
    data.address.country
    data.address.locality
    data.address.premises
    data.address.postal_code
    data.address.address_line_1
    data.address.address_line_2
    data.identification.place_registered
    data.identification.registration_number
    data.principal_office_address.address_line_2
    data.address.region
    data.is_sanctioned
    data.identification.country_registered
    data.notified_on
    data.identification.legal_form
    data.identification.legal_authority
    data.natures_of_control
    data.principal_office_address.country
    data.principal_office_address.locality
    data.principal_office_address.premises
    data.principal_office_address.postal_code
    data.principal_office_address.address_line_1
    event.type
    event.timepoint
    event.published_at
    corporate-entity-beneficial-owner
    company-psc-legal
    resource_id
    resource_uri
    resource_kind
    data.etag
    data.kind
    data.name
    data.links.self
    data.address.region
    data.address.country
    data.address.locality
    data.address.premises
    data.address.postal_code
    data.address.address_line_1
    data.address.address_line_2
    data.address.po_box
    data.notified_on
    data.ceased_on
    data.identification.legal_form
    data.identification.legal_authority
    data.natures_of_control
    event.type
    event.timepoint
    event.published_at
    company-psc-legal

    Fields

    Data from streaming API is saved as binary json datatype and stored in a PostgreSQL database.

    create table ch
    (
       id       serial, 
       payload  jsonb not null default '{}'::jsonb
    );
    

    View

    Create view for respective stream api method as needed using variable value for payload ->> ‘resource_kind’

    Extract fields from json payload

    query = text("select payload from ch")
    conn = engine.connect()
    df = pd.read_sql_query(query, conn)
    df1 = pd.json_normalize(df['payload'])
    df1.info()
    # object datatype to string and NaN to ''
    import numpy as np
    df2 = df1.replace(np.nan, '', regex=True)
    df2 = df2.astype(str)
    # save into a file as parquet
    parquet_file = 'ch.parquet'
    df2.to_parquet(parquet_file, engine = 'pyarrow', compression = 'snappy')
    # pandas dataframe to pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
      .appName('PostgreSQL')\
      .enableHiveSupport()\
      .getOrCreate()
    ch = spark.createDataFrame(df2)
    ch.columns
    ['resource_id',
     'resource_uri',
     'resource_kind',
     'data.date',
     'data.type',
     'data.links.self',
     'data.barcode',
     'data.category',
     'data.description',
     'data.transaction_id',
     'data.description_values.made_up_date',
     'event.type',
     'event.timepoint',
     'event.published_at',
     'data.etag',
     'data.kind',
     'data.name',
     'data.address.region',
     'data.address.country',
     'data.address.locality',
     'data.address.premises',
     'data.address.postal_code',
     'data.address.address_line_1',
     'data.nationality',
     'data.notified_on',
     'data.date_of_birth.year',
     'data.date_of_birth.month',
     'data.name_elements.surname',
     'data.name_elements.forename',
     'data.natures_of_control',
     'data.country_of_residence',
     'data.address.address_line_2',
     'data.name_elements.title',
     'data.ceased_on',
     'data.identification.legal_form',
     'data.identification.legal_authority',
     'data.identification.place_registered',
     'data.identification.country_registered',
     'data.identification.registration_number',
     'data.paper_filed',
     'data.address.po_box',
     'data.description_values.change_date',
     'data.description_values.new_address',
     'data.description_values.old_address',
     'data.subcategory',
     'data.description_values.officer_name',
     'data.description_values.termination_date',
     'data.resolutions',
     'data.description_values.description',
     'data.links.document_metadata',
     'data.pages',
     'data.associated_filings',
     'event.fields_changed',
     'data.description_values.appointment_date',
     'data.address.care_of',
     'data.links.officers',
     'data.links.filing_history',
     'data.links.persons_with_significant_control',
     'data.accounts.last_accounts.type',
     'data.accounts.accounting_reference_date.day',
     'data.accounts.accounting_reference_date.month',
     'data.sic_codes',
     'data.company_name',
     'data.jurisdiction',
     'data.company_number',
     'data.company_status',
     'data.date_of_creation',
     'data.date_of_cessation',
     'data.registered_office_address.locality',
     'data.registered_office_address.postal_code',
     'data.registered_office_address.address_line_1',
     'data.confirmation_statement.next_due',
     'data.confirmation_statement.last_made_up_to',
     'data.confirmation_statement.next_made_up_to',
     'data.registered_office_address.country',
     'data.can_file',
     'data.registered_office_address.address_line_2',
     'data.registered_office_address.region',
     'data.subtype',
     'data.external_registration_number',
     'data.partial_data_available',
     'data.confirmation_statement.overdue',
     'data.description_values.psc_name',
     'data.description_values.cessation_date',
     'data.principal_office_address.country',
     'data.principal_office_address.locality',
     'data.principal_office_address.premises',
     'data.principal_office_address.postal_code',
     'data.principal_office_address.address_line_1',
     'data.accounts.overdue',
     'data.accounts.next_due',
     'data.accounts.next_accounts.due_on',
     'data.accounts.next_accounts.overdue',
     'data.accounts.next_accounts.period_end_on',
     'data.accounts.next_accounts.period_start_on',
     'data.accounts.next_made_up_to',
     'data.principal_office_address.address_line_2',
     'data.previous_company_names',
     'data.description_values.notification_date',
     'data.description_values.new_date',
     'data.description_values.charge_number',
     'data.description_values.charge_creation_date',
     'data.is_community_interest_company',
     'data.links.persons_with_significant_control_statements',
     'data.accounts.last_accounts.made_up_to',
     'data.accounts.last_accounts.period_end_on',
     'data.accounts.last_accounts.period_start_on',
     'data.last_full_members_list_date',
     'data.links.charges',
     'data.has_charges',
     'data.company_status_detail',
     'data.links.registers',
     'data.registered_office_address.po_box',
     'data.registered_office_is_in_dispute',
     'data.links.insolvency',
     'data.has_been_liquidated',
     'data.has_insolvency_history',
     'data.registered_office_address.care_of',
     'data.service_address.country',
     'data.service_address.locality',
     'data.service_address.postal_code',
     'data.service_address.address_line_1',
     'data.foreign_company_details.legal_form',
     'data.foreign_company_details.governed_by',
     'data.foreign_company_details.registration_number',
     'data.foreign_company_details.originating_registry.name',
     'data.foreign_company_details.originating_registry.country',
     'data.super_secure_managing_officer_count',
     'data.annual_return.overdue',
     'data.annual_return.last_made_up_to',
     'data.description_values.date',
     'data.description_values.capital',
     'data.service_address.address_line_2',
     'data.foreign_company_details.business_activity',
     'data.foreign_company_details.accounting_requirement.foreign_account_type',
     'data.foreign_company_details.accounting_requirement.terms_of_account_publication',
     'data.description_values.withdrawal_date',
     'data.foreign_company_details.accounts.must_file_within.months',
     'data.foreign_company_details.accounts.account_period_to.day',
     'data.foreign_company_details.accounts.account_period_to.month',
     'data.foreign_company_details.accounts.account_period_from.day',
     'data.foreign_company_details.accounts.account_period_from.month',
     'data.service_address.region',
     'data.has_super_secure_pscs',
     'data.undeliverable_registered_office_address',
     'data.annual_return.next_due',
     'data.annual_return.next_made_up_to',
     'data.description_values.default_address',
     'data.annotations',
     'data.description_values.brought_down_date',
     'data.action_date',
     'data.description_values.original_description',
     'data.description_values.change_type',
     'data.description_values.branch_number',
     'data.description_values.change_details',
     'data.description_values.form_attached',
     'data.description_values.close_date',
     'data.description_values.company_number',
     'data.description_values.change_address',
     'data.description_values.representative_details',
     'data.description_values.officer_address',
     'data.description_values.alt_capital',
     'data.is_sanctioned',
     'data.name_elements.middle_name']