Tag: terraform

  • Scheduling Python Script to run on GCP

    I came across this article Setting up a Recurring Google Cloud Function With Terraform and used serverless approach for scheduling python script which will run periodically to invoke Companies House PSC stream to get data and save them into free PostgreSQL database. I am using Google Cloud Function and Cloud Scheduler  to accomplish this task without need for spinning up a VM with a cronjob running on it.

    Terraform and Google Cloud Shell

    I will use terraform from Google Cloud Shell terminal to develop and deploy function and finally schedule a job to periodically. All these tasks are orchestrated using terraform. Following resources are required for the task:

    • plugin – provider e.g. google and include project_id and region
    • archive_file (generates an archive of the source code compressed as a .zip file)
    • google_storage_bucket
    • google_storage_bucket_object
    • google_cloudfunctions_function
    • google_service_account (this will create new service account)
    • google_cloudfunctions_function_iam_member (needed for invoking the scheduled job)
    • google_cloud_scheduler_job

    Python Script

    To pull data from Companies House PSC stream endpoint. Create a folder with following layout:

    folder layoout - gcf
    # main.py
    import os
    import requests
    import gcp_psc_db as psc
    
    def ch_psc(request):
      url = 'https://stream.companieshouse.gov.uk/persons-with-significant-control'
      stream_timeout = 600   # time out ten mins just as example, no need if not wanted
      """
      begin streaming, can drop the timeout if not required, maybe use try: except: too
      """
      print('Streaming URL', url)
      ch_key = os.environ.get('CH_KEY')
    
      r = requests.get(url, auth=(ch_key, ''), stream=True, timeout=stream_timeout)
      print(r.status_code, r.headers)
      """
      check for good requests status (200 = ok) and process the stream
      """
      if r.status_code == 200:
        for json_line in r.iter_lines(): # stream should give a continuous iterable
          if json_line:
            psc.add_data(json_line)
            print(json_line)
          else:
            print('Empty pulse')
      else:
        return f'Not 200, best check what to do'
    
    # gcp_psc_db.py
    import os
    import json
    import requests
    import dataset
    
    # get data
    def add_data(data):
      db_url = os.environ.get('DB_URL')
      pgconn = dataset.Database(url=db_url,schema='public')      
      table = pgconn['ch']
      row = json.loads(data)
      table.insert(dict(payload=row))
      return True

    Python packages

    Include the following Python packages in requirements.txt file.

    • requests==2.31.0
    • psycopg2==2.9.6
    • dataset==1.6.0

    Terraform script

    # keeping terraform state locally
    terraform {
      backend "local" {}
    }
    # plugin
    provider "google" {
      project     = "project_id"
      region      = "europe-west2"
      zone        = "europe-west2-a"
    }
    # create storage bucket - it must be unique
    resource "google_storage_bucket" "bucket" {
        name     = "ellogcf"
        location = "europe-west2"
        force_destroy = "true" 
    }
    
    # Generates an archive of the source code compressed as a .zip file.
    data "archive_file" "src" {
        type        = "zip"
        source_dir  = "/home/gcf/apps/ex/src"
        output_path = "./tmp/psc_function.zip"
    }
    # create folder under your storage bucket
    resource "google_storage_bucket_object" "archive" {
      name   = "${data.archive_file.src.output_md5}.zip"
      bucket = google_storage_bucket.bucket.name
      source = "./tmp/psc_function.zip"
    }
    # create Cloud function
    resource "google_cloudfunctions_function" "function" {
      name        = "ch_psc_stream"
      description = "To extract Companies House PSC stream data using Stream API."
      runtime     = "python311"
     
      environment_variables = {
        CH_KEY = "ch_stream_key",
        DB_URL = "postgresql_credentials",
      }
    
      available_memory_mb   = 256
      source_archive_bucket = google_storage_bucket.bucket.name
      source_archive_object = google_storage_bucket_object.archive.name
      trigger_http          = true
      entry_point           = "ch_psc" # This is the name of the function that will be executed in your Python code
    }
    
    # creaste a new service account 
    resource "google_service_account" "ch-scheduler" {
      account_id   = "companieshouse"
      display_name = "Cloud Function Companies House Stream API Invoker Service Account"
    }
    # setup IAM policyu
    resource "google_cloudfunctions_function_iam_member" "invoker" {
      project        = google_cloudfunctions_function.function.project
      region         = google_cloudfunctions_function.function.region
      cloud_function = google_cloudfunctions_function.function.name
    
      role   = "roles/cloudfunctions.invoker"
      member = "serviceAccount:${google_service_account.ch-scheduler.email}"
    }  
    # create schedule job
    resource "google_cloud_scheduler_job" "job" {
      name             = "ch-psc-scheduler"
      description      = "Trigger the ${google_cloudfunctions_function.function.name} Cloud Function every 30 mins."
      schedule         = "*/30 * * * *" # Every 30 mins
      time_zone        = "Europe/Dublin"
      attempt_deadline = "320s" # 5.3 mins
    
      http_target {
        http_method = "GET"
        uri         = google_cloudfunctions_function.function.https_trigger_url
    
        oidc_token {
          service_account_email = google_service_account.ch-scheduler.email
        }
      }
    }