Using DynamoDB local on Googlee Colab

Steps for deploying DynamoDB local

  • Download the latest jar file from AWS website
  • Extract the contents and copy the extracted directory to a location of your choice.
  • To start DynamoDB on Google CoLab instance and use the following commands:
# install boto3
!pip3 install boto3
 
%%bash
wget https://d1ni2b6xgvw0s0.cloudfront.net/v2.x/dynamodb_local_latest.tar.gz
tar xvf dynamodb_local_latest.tar.gz
mkdir /content/data

# store credentials in a file - only if you are using aws command line tool
%%writefile /content/.aws/credentials 
[default]
aws_access_key_id = admin
aws_secret_access_key = password   

#  region and output
%%writefile /content/.aws/config
region = gdrive
output = json

# run DynamoDB in the background
%%bash --bg
java -jar DynamoDBLocal.jar -sharedDb -dbPath /content/drive/MyDrive/dynamodb

Create a table in DynamoDB

# create DynamoDB session
dyn_resource = boto3.resource("dynamodb",
     region_name="gdrive",
     aws_secret_access_key="admin",
     aws_access_key_id="password",
     endpoint_url="http://localhost:8000")

# create table - define partition_key and sort_key
table_name = "company_profile"
params = {
     "TableName": table_name,
     "KeySchema": [
           {"Attribute Name": "resource_id", "KeyType": "HASH"},
           {"AttributeName": "data.company_status", "KeyType": "RANGE"},
      ],
      "AttributeDefinitions": [
          {"AttributeName": "resource_id", "AttributeType": "S"},
          {"AttributeName": "data.company_status", "AttributeType": "S"},
      ],
      "ProvisionedThroughput": {"ReadCapacityUnits": 10, "WriteCapacityUnits": 10},
}
table = dyn_resource.create_table(**params)
print(f"Creating {table_name}...")
table.wait_until_exists()

Calling streaming API and saving data into DynamoDB

Python Requests HTTP library for making streaming API call. Result is stored into a table in DynamoDB using Python Boto3 package. Python json module for handing binary json payload.

url = 'https://stream.companieshouse.gov.uk/companies'
# use streaming API access code
response = requests.get(url,auth=(ch_api_key, ''), stream=True)
print(response.status_code, response.headers)

Handling payload from Companies House Streaming API

Companies House API payload returns a continuous stream of data (binary JSON) as string. We need to convert string to JSON object and assign values to partition_key and sort_code variables respectively.

"""
check for good requests status (200 = ok) and process the stream
"""
if response.status_code == 200:
  for json_line in response.iter_lines(): # stream should give a continuous iterable
    if json_line:
      # Send data to DynamoDB
      payload = json.loads(json_line)
      table.put_item(Item={"resource_id": payload['resource_id'],
   "data.company_status": payload['data']['company_status'],
         "item": payload,})
      print('data ',json_line)
    else:
      print('Empty pulse')
else:
  print(f'Not 200, please ccheck')

put an item into DynamoDB
Inspecting a record from DynamoDB