Recently, I have setup a process on a RaspberryPI server and configured it to run in the background and listen for events from Companies House streaming API. It has has been running without any issues.
I want to use DyanamoDB local for storing events data from streaming API.
Steps for deploying DynamoDB local
- Download the latest jar file from AWS website
- Extract the contents and copy the extracted directory to a location of your choice.
- To start DynamoDB on Google CoLab instance and use the following commands:
# install boto3
!pip3 install boto3
%%bash
wget https://d1ni2b6xgvw0s0.cloudfront.net/v2.x/dynamodb_local_latest.tar.gz
tar xvf dynamodb_local_latest.tar.gz
mkdir /content/data
# store credentials in a file - only if you are using aws command line tool
%%writefile /content/.aws/credentials
[default]
aws_access_key_id = admin
aws_secret_access_key = password
# region and output
%%writefile /content/.aws/config
region = gdrive
output = json
# run DynamoDB in the background
%%bash --bg
java -jar DynamoDBLocal.jar -sharedDb -dbPath /content/drive/MyDrive/dynamodb
Create a table in DynamoDB
Before processing payload from Companies Streaming API , you must create a table.
# create DynamoDB session
dyn_resource = boto3.resource("dynamodb",
region_name="gdrive",
aws_secret_access_key="admin",
aws_access_key_id="password",
endpoint_url="http://localhost:8000")
# create table - define partition_key and sort_key
table_name = "company_profile"
params = {
"TableName": table_name,
"KeySchema": [
{"Attribute Name": "resource_id", "KeyType": "HASH"},
{"AttributeName": "data.company_status", "KeyType": "RANGE"},
],
"AttributeDefinitions": [
{"AttributeName": "resource_id", "AttributeType": "S"},
{"AttributeName": "data.company_status", "AttributeType": "S"},
],
"ProvisionedThroughput": {"ReadCapacityUnits": 10, "WriteCapacityUnits": 10},
}
table = dyn_resource.create_table(**params)
print(f"Creating {table_name}...")
table.wait_until_exists()
Calling streaming API and saving data into DynamoDB
Python Requests HTTP library for making streaming API call. Result is stored into a table in DynamoDB using Python Boto3 package. Python json module for handing binary json payload.
url = 'https://stream.companieshouse.gov.uk/companies'
# use streaming API access code
response = requests.get(url,auth=(ch_api_key, ''), stream=True)
print(response.status_code, response.headers)
Handling payload from Companies House Streaming API
Companies House API payload returns a continuous stream of data (binary JSON) as string. We need to convert string to JSON object and assign values to partition_key and sort_code variables respectively.
"""
check for good requests status (200 = ok) and process the stream
"""
if response.status_code == 200:
for json_line in response.iter_lines(): # stream should give a continuous iterable
if json_line:
# Send data to DynamoDB
payload = json.loads(json_line)
table.put_item(Item={"resource_id": payload['resource_id'],
"data.company_status": payload['data']['company_status'],
"item": payload,})
print('data ',json_line)
else:
print('Empty pulse')
else:
print(f'Not 200, please ccheck')