I came across this article Setting up a Recurring Google Cloud Function With Terraform and used serverless approach for scheduling python script which will run periodically to invoke Companies House PSC stream to get data and save them into free PostgreSQL database. I am using Google Cloud Function and Cloud Scheduler to accomplish this task without need for spinning up a VM with a cronjob running on it.
Terraform and Google Cloud Shell
I will use terraform from Google Cloud Shell terminal to develop and deploy function and finally schedule a job to periodically. All these tasks are orchestrated using terraform. Following resources are required for the task:
- plugin – provider e.g. google and include project_id and region
- archive_file (generates an archive of the source code compressed as a .zip file)
- google_storage_bucket
- google_storage_bucket_object
- google_cloudfunctions_function
- google_service_account (this will create new service account)
- google_cloudfunctions_function_iam_member (needed for invoking the scheduled job)
- google_cloud_scheduler_job
Python Script
To pull data from Companies House PSC stream endpoint. Create a folder with following layout:
# main.py
import os
import requests
import gcp_psc_db as psc
def ch_psc(request):
url = 'https://stream.companieshouse.gov.uk/persons-with-significant-control'
stream_timeout = 600 # time out ten mins just as example, no need if not wanted
"""
begin streaming, can drop the timeout if not required, maybe use try: except: too
"""
print('Streaming URL', url)
ch_key = os.environ.get('CH_KEY')
r = requests.get(url, auth=(ch_key, ''), stream=True, timeout=stream_timeout)
print(r.status_code, r.headers)
"""
check for good requests status (200 = ok) and process the stream
"""
if r.status_code == 200:
for json_line in r.iter_lines(): # stream should give a continuous iterable
if json_line:
psc.add_data(json_line)
print(json_line)
else:
print('Empty pulse')
else:
return f'Not 200, best check what to do'
# gcp_psc_db.py
import os
import json
import requests
import dataset
# get data
def add_data(data):
db_url = os.environ.get('DB_URL')
pgconn = dataset.Database(url=db_url,schema='public')
table = pgconn['ch']
row = json.loads(data)
table.insert(dict(payload=row))
return True
Python packages
Include the following Python packages in requirements.txt file.
- requests==2.31.0
- psycopg2==2.9.6
- dataset==1.6.0
Terraform script
# keeping terraform state locally
terraform {
backend "local" {}
}
# plugin
provider "google" {
project = "project_id"
region = "europe-west2"
zone = "europe-west2-a"
}
# create storage bucket - it must be unique
resource "google_storage_bucket" "bucket" {
name = "ellogcf"
location = "europe-west2"
force_destroy = "true"
}
# Generates an archive of the source code compressed as a .zip file.
data "archive_file" "src" {
type = "zip"
source_dir = "/home/gcf/apps/ex/src"
output_path = "./tmp/psc_function.zip"
}
# create folder under your storage bucket
resource "google_storage_bucket_object" "archive" {
name = "${data.archive_file.src.output_md5}.zip"
bucket = google_storage_bucket.bucket.name
source = "./tmp/psc_function.zip"
}
# create Cloud function
resource "google_cloudfunctions_function" "function" {
name = "ch_psc_stream"
description = "To extract Companies House PSC stream data using Stream API."
runtime = "python311"
environment_variables = {
CH_KEY = "ch_stream_key",
DB_URL = "postgresql_credentials",
}
available_memory_mb = 256
source_archive_bucket = google_storage_bucket.bucket.name
source_archive_object = google_storage_bucket_object.archive.name
trigger_http = true
entry_point = "ch_psc" # This is the name of the function that will be executed in your Python code
}
# creaste a new service account
resource "google_service_account" "ch-scheduler" {
account_id = "companieshouse"
display_name = "Cloud Function Companies House Stream API Invoker Service Account"
}
# setup IAM policyu
resource "google_cloudfunctions_function_iam_member" "invoker" {
project = google_cloudfunctions_function.function.project
region = google_cloudfunctions_function.function.region
cloud_function = google_cloudfunctions_function.function.name
role = "roles/cloudfunctions.invoker"
member = "serviceAccount:${google_service_account.ch-scheduler.email}"
}
# create schedule job
resource "google_cloud_scheduler_job" "job" {
name = "ch-psc-scheduler"
description = "Trigger the ${google_cloudfunctions_function.function.name} Cloud Function every 30 mins."
schedule = "*/30 * * * *" # Every 30 mins
time_zone = "Europe/Dublin"
attempt_deadline = "320s" # 5.3 mins
http_target {
http_method = "GET"
uri = google_cloudfunctions_function.function.https_trigger_url
oidc_token {
service_account_email = google_service_account.ch-scheduler.email
}
}
}