Scheduling Python Script to run on GCP

I came across this article Setting up a Recurring Google Cloud Function With Terraform and used serverless approach for scheduling python script which will run periodically to invoke Companies House PSC stream to get data and save them into free PostgreSQL database. I am using Google Cloud Function and Cloud Scheduler  to accomplish this task without need for spinning up a VM with a cronjob running on it.

Terraform and Google Cloud Shell

I will use terraform from Google Cloud Shell terminal to develop and deploy function and finally schedule a job to periodically. All these tasks are orchestrated using terraform. Following resources are required for the task:

  • plugin – provider e.g. google and include project_id and region
  • archive_file (generates an archive of the source code compressed as a .zip file)
  • google_storage_bucket
  • google_storage_bucket_object
  • google_cloudfunctions_function
  • google_service_account (this will create new service account)
  • google_cloudfunctions_function_iam_member (needed for invoking the scheduled job)
  • google_cloud_scheduler_job

Python Script

To pull data from Companies House PSC stream endpoint. Create a folder with following layout:

folder layoout - gcf
# main.py
import os
import requests
import gcp_psc_db as psc

def ch_psc(request):
  url = 'https://stream.companieshouse.gov.uk/persons-with-significant-control'
  stream_timeout = 600   # time out ten mins just as example, no need if not wanted
  """
  begin streaming, can drop the timeout if not required, maybe use try: except: too
  """
  print('Streaming URL', url)
  ch_key = os.environ.get('CH_KEY')

  r = requests.get(url, auth=(ch_key, ''), stream=True, timeout=stream_timeout)
  print(r.status_code, r.headers)
  """
  check for good requests status (200 = ok) and process the stream
  """
  if r.status_code == 200:
    for json_line in r.iter_lines(): # stream should give a continuous iterable
      if json_line:
        psc.add_data(json_line)
        print(json_line)
      else:
        print('Empty pulse')
  else:
    return f'Not 200, best check what to do'

# gcp_psc_db.py
import os
import json
import requests
import dataset

# get data
def add_data(data):
  db_url = os.environ.get('DB_URL')
  pgconn = dataset.Database(url=db_url,schema='public')      
  table = pgconn['ch']
  row = json.loads(data)
  table.insert(dict(payload=row))
  return True

Python packages

Include the following Python packages in requirements.txt file.

  • requests==2.31.0
  • psycopg2==2.9.6
  • dataset==1.6.0

Terraform script

# keeping terraform state locally
terraform {
  backend "local" {}
}
# plugin
provider "google" {
  project     = "project_id"
  region      = "europe-west2"
  zone        = "europe-west2-a"
}
# create storage bucket - it must be unique
resource "google_storage_bucket" "bucket" {
    name     = "ellogcf"
    location = "europe-west2"
    force_destroy = "true" 
}

# Generates an archive of the source code compressed as a .zip file.
data "archive_file" "src" {
    type        = "zip"
    source_dir  = "/home/gcf/apps/ex/src"
    output_path = "./tmp/psc_function.zip"
}
# create folder under your storage bucket
resource "google_storage_bucket_object" "archive" {
  name   = "${data.archive_file.src.output_md5}.zip"
  bucket = google_storage_bucket.bucket.name
  source = "./tmp/psc_function.zip"
}
# create Cloud function
resource "google_cloudfunctions_function" "function" {
  name        = "ch_psc_stream"
  description = "To extract Companies House PSC stream data using Stream API."
  runtime     = "python311"
 
  environment_variables = {
    CH_KEY = "ch_stream_key",
    DB_URL = "postgresql_credentials",
  }

  available_memory_mb   = 256
  source_archive_bucket = google_storage_bucket.bucket.name
  source_archive_object = google_storage_bucket_object.archive.name
  trigger_http          = true
  entry_point           = "ch_psc" # This is the name of the function that will be executed in your Python code
}

# creaste a new service account 
resource "google_service_account" "ch-scheduler" {
  account_id   = "companieshouse"
  display_name = "Cloud Function Companies House Stream API Invoker Service Account"
}
# setup IAM policyu
resource "google_cloudfunctions_function_iam_member" "invoker" {
  project        = google_cloudfunctions_function.function.project
  region         = google_cloudfunctions_function.function.region
  cloud_function = google_cloudfunctions_function.function.name

  role   = "roles/cloudfunctions.invoker"
  member = "serviceAccount:${google_service_account.ch-scheduler.email}"
}  
# create schedule job
resource "google_cloud_scheduler_job" "job" {
  name             = "ch-psc-scheduler"
  description      = "Trigger the ${google_cloudfunctions_function.function.name} Cloud Function every 30 mins."
  schedule         = "*/30 * * * *" # Every 30 mins
  time_zone        = "Europe/Dublin"
  attempt_deadline = "320s" # 5.3 mins

  http_target {
    http_method = "GET"
    uri         = google_cloudfunctions_function.function.https_trigger_url

    oidc_token {
      service_account_email = google_service_account.ch-scheduler.email
    }
  }
}