Processing Companies House PSC stream data

Need to access PSC stream API and extract information about persons with significant control. However the result has personal information such name, part of date of birth, nationality and address with postcode. Please read following articles – Processing UK Companies House PSC Data and Companies House Stream API more detailed information on Companies House PSC data.

We are only interested in the following fields without identifying individuals so that information can be linked with Companies House Profile data:

  • crn
  • resource_id
  • resource_kind
  • event_type
  • event_timepoint
  • event_published_at
  • country_of_residence
  • nationality
  • notified_on
  • natures_of_control
  • postal_code
  • address_line_1
  • locality
  • country

Technical approach

We will use Google Cloud Platform (GCP) and free PostgreSQL database at Neon Free Tier.

  • Google Cloud Shell – python script which make call to Companies House stream API using python requests package with stream option and setting as true
import requests
url = 'https://stream.companieshouse.gov.uk/persons-with-significant-control'
stream_timeout = 600   # time out ten mins just as example, no need if not wanted
print('Streaming URL', url)
r = requests.get(url, auth=(stream_ch_api_key, ''), stream=True, timeout=stream_timeout)
print(r.status_code, r.headers)
  • save payload with relevant data to database (at Neon Free Tier PostgreSQL)
  • process JSONB payload data and create SQL view for accessing required data

Save JSON data to database

import dataset
# url
def construct_pg_url(postgres_user='username', postgres_password='password', postgres_host='eneon.tech', postgres_port='5432', postgres_database='neondb'):
  PG_URL = "postgresql://" + postgres_user + ":" + postgres_password + '@' + postgres_host + ':' + postgres_port + '/' + postgres_database
  return PG_URL
pgconn = dataset.Database(url=construct_pg_url(),schema='public')
# save data
def add_data(data):
  table = pgconn['table_name']
  row = json.loads(data)
  table.insert(dict(payload=row))
  return True

SQL View

drop view companies_house_psc;
create view companies_house_psc as (
select
substring(payload -> 'data' -> 'links' ->> 'self',10,8) as crn,
payload ->> 'resource_id'  as resource_id,
payload ->> 'resource_kind' as resource_kind,
payload -> 'event' ->> 'type'  as event_type,
payload -> 'event' ->> 'timepoint'  as event_timepoint,
payload -> 'event' ->> 'published_at'  as event_published_at,
payload -> 'data' ->> 'country_of_residence'  as country_of_residence,
payload -> 'data' ->> 'nationality'  as nationality,
payload -> 'data' ->> 'notified_on'  as notified_on,
payload -> 'data' ->> 'natures_of_control'  as natures_of_control,
payload -> 'data' -> 'address' ->> 'postal_code' as postal_code,
payload -> 'data' -> 'address' ->> 'address_line_1' as address_line_1,
payload -> 'data' -> 'address' ->> 'locality' as locality,
payload -> 'data' -> 'address' ->> 'country' as country
from ch where id > 10
)

Reading data

from sqlalchemy import create_engine
CONNSTR = f'postgresql://username:password@hostdb.neon.tech/databasename?options=endpoint%3Dproject_id'
engine = create_engine(CONNSTR)
pgconn = engine.connect()

from sqlalchemy import text
query = text("SELECT * FROM companies_house_psc")

# save result as pandas dataframe
import pandas as pd
psc_pd = pd.read_sql(query,con=pgconn)

# save the frame as parquet file
parquet_file = 'psc.parquet'
psc_pd.to_parquet(parquet_file, engine = 'pyarrow', compression = 'snappy')
PSC data

Company Profile end point

Request

GET https://stream.companieshouse.gov.uk/companies
create view companies_house_profile as (
select
payload ->> 'resource_id'  as resource_id,
payload ->> 'resource_kind' as resource_kind,
payload -> 'event' ->> 'type'  as event_type,
payload -> 'event' ->> 'timepoint'  as event_timepoint,
payload -> 'event' ->> 'published_at'  as event_published_at,
payload -> 'data' ->> 'type'  as company_type,
payload -> 'data' ->> 'company_number'  as company_number,
payload -> 'data' ->> 'company_name'  as company_name,
payload -> 'data' ->> 'jurisdiction'  as jurisdiction,
payload -> 'data' ->> 'company_status'  as company_status,
payload -> 'data' ->> 'date_of_creation'  as date_of_creation,
payload -> 'data' ->> 'sic_codes'  as sic_codes,
payload -> 'data' -> 'registered_office_address' ->> 'country' as country,
payload -> 'data' -> 'registered_office_address' ->> 'locality' as locality,
payload -> 'data' -> 'registered_office_address' ->> 'postal_code' as postal_code,
payload -> 'data' -> 'registered_office_address' ->> 'address_line_1' as address_line_1,
payload -> 'data' -> 'registered_office_address' ->> 'address_line_2' as address_line_2,
payload -> 'data' ->> 'can_file' as can_file,
payload -> 'data' -> 'accounts' ->> 'next_due' as accounts_next_due,
payload -> 'data' -> 'accounts' ->> 'last_accounts' as accounts_last_accounts,
payload -> 'data' -> 'accounts' -> 'next_accounts' ->> 'due_on' as next_accounts_due_on,
payload -> 'data' -> 'accounts' -> 'next_accounts' ->> 'period_end_on' as next_accounts_period_end_on,
payload -> 'data' -> 'accounts' -> 'next_accounts' ->> 'period_start_on' as next_accounts_period_start_on,
payload -> 'data' -> 'accounts' ->> 'next_made_up_to' as next_made_up_to_date,
payload -> 'data' -> 'accounts' ->> 'accounting_reference_date' as next_accounting_reference_date,
payload -> 'data' -> 'links' ->> 'self' as links_self,
payload -> 'data' -> 'links' ->> 'officers' as links_officers,
payload -> 'data' -> 'links' ->> 'filing_history' as links_filing_history,
payload -> 'data' -> 'links' ->> 'persons_with_significant_control' as links_persons_with_significant_control
from ch 
where payload->>'resource_kind'='company-profile'
)

# sample data
{
   "data": {
      "etag": "978592003273fe2cb5bc47d404feea3e1d92a883",
      "type": "ltd",
      "links": {
         "self": "/company/14952843",
         "officers": "/company/14952843/officers",
         "filing_history": "/company/14952843/filing-history",
         "persons_with_significant_control": "/company/14952843/persons-with-significant-control"
      },
      "accounts": {
         "next_due": "2025-03-22",
         "last_accounts": {
            "type": "null"
         },
         "next_accounts": {
            "due_on": "2025-03-22",
            "period_end_on": "2024-06-30",
            "period_start_on": "2023-06-22"
         },
         "next_made_up_to": "2024-06-30",
         "accounting_reference_date": {
            "day": "30",
            "month": "06"
         }
      },
      "can_file": true,
      "sic_codes": [
         "47990"
      ],
      "company_name": "AL ARABIAN MARKETING LTD",
      "jurisdiction": "england-wales",
      "company_number": "14952843",
      "company_status": "active",
      "date_of_creation": "2023-06-22",
      "confirmation_statement": {
         "next_due": "2024-07-05",
         "next_made_up_to": "2024-06-21"
      },
      "registered_office_address": {
         "country": "United Kingdom",
         "locality": "Poole, Dorset,",
         "postal_code": "BH16 6FA",
         "address_line_1": "Lytchett House",
         "address_line_2": "13 Freeland Park, Wareham Road,"
      }
   },
   "event": {
      "type": "changed",
      "timepoint": 62657738,
      "published_at": "2023-06-22T11:30:11"
   },
   "resource_id": "14952843",
   "resource_uri": "/company/14952843",
   "resource_kind": "company-profile"
}
COMPANIES HOUSSE PROFILE

Company filing history stream

This stream returns company filing history information, such as form type, description and dates.

Request

GET https://stream.companieshouse.gov.uk/filings

Insolvency cases stream

This stream returns information about company insolvency cases.

Request

GET https://stream.companieshouse.gov.uk/insolvency-cases

Charges stream

This stream returns information about company charges.

Request

GET https://stream.companieshouse.gov.uk/charges

Officers stream

This stream will return information about company officers.

Request

GET https://stream.companieshouse.gov.uk/officers

PSC statements stream

This stream returns information about persons with significant control statements.

Request

GET https://stream.companieshouse.gov.uk/persons-with-significant-control-statements

Disqualified officers stream

This stream returns information about disqualified officers (natural and corporate).

Request

GET https://stream.companieshouse.gov.uk/disqualified-officers

Company exemptions stream

The streaming API for company exemptions is currently still in development. It will be available at a later date.

When complete, this stream will return information about company exemptions.

Request

GET https://stream.companieshouse.gov.uk/company-exemptions

PSC statements stream

This stream returns information about persons with significant control statements.

Request

GET https://stream.companieshouse.gov.uk/persons-with-significant-control-statements

Metadata

Extract metadata using:

query = text("""
select * from ch where payload ->> 'resource_kind' = 'company-psc-legal'
""")
conn = engine.connect()
df = pd.read_sql_query(query, conn)
df = pd.json_normalize(df['payload'])
df.info()
legal-person-beneficial-owner
resource_id
resource_uri
resource_kind
data.etag
data.kind
data.name
data.links.self
data.address.country
data.address.locality
data.address.premises
data.address.postal_code
data.address.address_line_1
data.address.address_line_2
data.notified_on
data.identification.legal_form
data.identification.legal_authority
data.natures_of_control
data.principal_office_address.country
data.principal_office_address.locality
data.principal_office_address.premises
data.principal_office_address.postal_code
data.principal_office_address.address_line_1
data.principal_office_address.address_line_2
event.type
event.timepoint
event.published_at
legal-person-beneficial-owner
company-psc-supersecure
resource_id
resource_uri
resource_kind
data.etag
data.kind
data.links.self
data.description
event.type
event.timepoint
event.published_at
company-psc-supersecure
individual-beneficial-owner
resource_id
resource_uri
resource_kind
data.etag
data.kind
data.name
data.links.self
data.address.region
data.address.country
data.address.locality
data.address.premises
data.address.address_line_1
data.address.address_line_2
data.nationality
data.notified_on
data.date_of_birth.year
data.date_of_birth.month
data.name_elements.surname
data.name_elements.forename
data.natures_of_control
data.address.postal_code
data.is_sanctioned
data.name_elements.middle_name
data.name_elements.title
event.type
event.timepoint
event.published_at
individual-beneficial-owner
company-psc-corporate
resource_id
resource_uri
resource_kind
data.etag
data.kind
data.name
data.links.self
data.address.country
data.address.locality
data.address.premises
data.address.postal_code
data.address.address_line_1
data.address.po_box
data.address.region
data.address.address_line_2
data.ceased_on
data.notified_on
data.identification.legal_form
data.identification.legal_authority
data.identification.place_registered
data.identification.country_registered
data.identification.registration_number
data.natures_of_control
event.type
event.timepoint
event.published_at
company-psc-corporate
corporate-entity-beneficial-owner
resource_id
resource_uri
resource_kind
data.etag
data.kind
data.name
data.links.self
data.address.country
data.address.locality
data.address.premises
data.address.postal_code
data.address.address_line_1
data.address.address_line_2
data.identification.place_registered
data.identification.registration_number
data.principal_office_address.address_line_2
data.address.region
data.is_sanctioned
data.identification.country_registered
data.notified_on
data.identification.legal_form
data.identification.legal_authority
data.natures_of_control
data.principal_office_address.country
data.principal_office_address.locality
data.principal_office_address.premises
data.principal_office_address.postal_code
data.principal_office_address.address_line_1
event.type
event.timepoint
event.published_at
corporate-entity-beneficial-owner
company-psc-legal
resource_id
resource_uri
resource_kind
data.etag
data.kind
data.name
data.links.self
data.address.region
data.address.country
data.address.locality
data.address.premises
data.address.postal_code
data.address.address_line_1
data.address.address_line_2
data.address.po_box
data.notified_on
data.ceased_on
data.identification.legal_form
data.identification.legal_authority
data.natures_of_control
event.type
event.timepoint
event.published_at
company-psc-legal

Fields

Data from streaming API is saved as binary json datatype and stored in a PostgreSQL database.

create table ch
(
   id       serial, 
   payload  jsonb not null default '{}'::jsonb
);

View

Create view for respective stream api method as needed using variable value for payload ->> ‘resource_kind’

Extract fields from json payload

query = text("select payload from ch")
conn = engine.connect()
df = pd.read_sql_query(query, conn)
df1 = pd.json_normalize(df['payload'])
df1.info()
# object datatype to string and NaN to ''
import numpy as np
df2 = df1.replace(np.nan, '', regex=True)
df2 = df2.astype(str)
# save into a file as parquet
parquet_file = 'ch.parquet'
df2.to_parquet(parquet_file, engine = 'pyarrow', compression = 'snappy')
# pandas dataframe to pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
  .appName('PostgreSQL')\
  .enableHiveSupport()\
  .getOrCreate()
ch = spark.createDataFrame(df2)
ch.columns
['resource_id',
 'resource_uri',
 'resource_kind',
 'data.date',
 'data.type',
 'data.links.self',
 'data.barcode',
 'data.category',
 'data.description',
 'data.transaction_id',
 'data.description_values.made_up_date',
 'event.type',
 'event.timepoint',
 'event.published_at',
 'data.etag',
 'data.kind',
 'data.name',
 'data.address.region',
 'data.address.country',
 'data.address.locality',
 'data.address.premises',
 'data.address.postal_code',
 'data.address.address_line_1',
 'data.nationality',
 'data.notified_on',
 'data.date_of_birth.year',
 'data.date_of_birth.month',
 'data.name_elements.surname',
 'data.name_elements.forename',
 'data.natures_of_control',
 'data.country_of_residence',
 'data.address.address_line_2',
 'data.name_elements.title',
 'data.ceased_on',
 'data.identification.legal_form',
 'data.identification.legal_authority',
 'data.identification.place_registered',
 'data.identification.country_registered',
 'data.identification.registration_number',
 'data.paper_filed',
 'data.address.po_box',
 'data.description_values.change_date',
 'data.description_values.new_address',
 'data.description_values.old_address',
 'data.subcategory',
 'data.description_values.officer_name',
 'data.description_values.termination_date',
 'data.resolutions',
 'data.description_values.description',
 'data.links.document_metadata',
 'data.pages',
 'data.associated_filings',
 'event.fields_changed',
 'data.description_values.appointment_date',
 'data.address.care_of',
 'data.links.officers',
 'data.links.filing_history',
 'data.links.persons_with_significant_control',
 'data.accounts.last_accounts.type',
 'data.accounts.accounting_reference_date.day',
 'data.accounts.accounting_reference_date.month',
 'data.sic_codes',
 'data.company_name',
 'data.jurisdiction',
 'data.company_number',
 'data.company_status',
 'data.date_of_creation',
 'data.date_of_cessation',
 'data.registered_office_address.locality',
 'data.registered_office_address.postal_code',
 'data.registered_office_address.address_line_1',
 'data.confirmation_statement.next_due',
 'data.confirmation_statement.last_made_up_to',
 'data.confirmation_statement.next_made_up_to',
 'data.registered_office_address.country',
 'data.can_file',
 'data.registered_office_address.address_line_2',
 'data.registered_office_address.region',
 'data.subtype',
 'data.external_registration_number',
 'data.partial_data_available',
 'data.confirmation_statement.overdue',
 'data.description_values.psc_name',
 'data.description_values.cessation_date',
 'data.principal_office_address.country',
 'data.principal_office_address.locality',
 'data.principal_office_address.premises',
 'data.principal_office_address.postal_code',
 'data.principal_office_address.address_line_1',
 'data.accounts.overdue',
 'data.accounts.next_due',
 'data.accounts.next_accounts.due_on',
 'data.accounts.next_accounts.overdue',
 'data.accounts.next_accounts.period_end_on',
 'data.accounts.next_accounts.period_start_on',
 'data.accounts.next_made_up_to',
 'data.principal_office_address.address_line_2',
 'data.previous_company_names',
 'data.description_values.notification_date',
 'data.description_values.new_date',
 'data.description_values.charge_number',
 'data.description_values.charge_creation_date',
 'data.is_community_interest_company',
 'data.links.persons_with_significant_control_statements',
 'data.accounts.last_accounts.made_up_to',
 'data.accounts.last_accounts.period_end_on',
 'data.accounts.last_accounts.period_start_on',
 'data.last_full_members_list_date',
 'data.links.charges',
 'data.has_charges',
 'data.company_status_detail',
 'data.links.registers',
 'data.registered_office_address.po_box',
 'data.registered_office_is_in_dispute',
 'data.links.insolvency',
 'data.has_been_liquidated',
 'data.has_insolvency_history',
 'data.registered_office_address.care_of',
 'data.service_address.country',
 'data.service_address.locality',
 'data.service_address.postal_code',
 'data.service_address.address_line_1',
 'data.foreign_company_details.legal_form',
 'data.foreign_company_details.governed_by',
 'data.foreign_company_details.registration_number',
 'data.foreign_company_details.originating_registry.name',
 'data.foreign_company_details.originating_registry.country',
 'data.super_secure_managing_officer_count',
 'data.annual_return.overdue',
 'data.annual_return.last_made_up_to',
 'data.description_values.date',
 'data.description_values.capital',
 'data.service_address.address_line_2',
 'data.foreign_company_details.business_activity',
 'data.foreign_company_details.accounting_requirement.foreign_account_type',
 'data.foreign_company_details.accounting_requirement.terms_of_account_publication',
 'data.description_values.withdrawal_date',
 'data.foreign_company_details.accounts.must_file_within.months',
 'data.foreign_company_details.accounts.account_period_to.day',
 'data.foreign_company_details.accounts.account_period_to.month',
 'data.foreign_company_details.accounts.account_period_from.day',
 'data.foreign_company_details.accounts.account_period_from.month',
 'data.service_address.region',
 'data.has_super_secure_pscs',
 'data.undeliverable_registered_office_address',
 'data.annual_return.next_due',
 'data.annual_return.next_made_up_to',
 'data.description_values.default_address',
 'data.annotations',
 'data.description_values.brought_down_date',
 'data.action_date',
 'data.description_values.original_description',
 'data.description_values.change_type',
 'data.description_values.branch_number',
 'data.description_values.change_details',
 'data.description_values.form_attached',
 'data.description_values.close_date',
 'data.description_values.company_number',
 'data.description_values.change_address',
 'data.description_values.representative_details',
 'data.description_values.officer_address',
 'data.description_values.alt_capital',
 'data.is_sanctioned',
 'data.name_elements.middle_name']