Event Driven Data Processing on Google Cloud Platform

Introduction

Venkata Phaneendra Reddy Janga
4 min readMar 6, 2023

In Data Engineering, we have cases where there are scheduled workflows which will run at a particular window of time. Even if the files arrives a bit early or late the workflows will start running in that scheduled window only, sometimes this may lead to wastage of computational resources which may lead to excessive billing.

Let’s see how we can solve this problem on GCP using below Architecture.

Architecture Diagram

Flow:

When file gets created in the GCS bucket, cloud function gets triggered, which will make a request to Airflow REST API to run the DAG, DAG will create a cluster, submit the job(which loads data into BQ) and finally deletes the Cluster.

Services Used :

  • Cloud Storage
  • Cloud Dataproc
  • Cloud Functions
  • Cloud Composer
  • BigQuery

Step-1: Cloud Storage:

Create 2 Cloud Storage Bucket, for ingesting files and storing output in another bucket:

  • gs://landing-zone-dev
  • gs://staging-zone-dev
Creating Cloud Storage Bucket

Step-2: Cloud Composer:

Create a Cloud Composer Environment, cloud composer is Google’s managed airflow instance with which we will automate the entire workflow, right from creating cluster to processing the file and loading it into final entity

For Airflow 2, REST API is enabled by default, if you’re using Airflow 1, please refer below link to enable: https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf

The cloud Composer DAG will do the below Tasks:

  • Create a Dataproc cluster.
  • Submit PySpark job
  • Delete the Dataproc cluster

DAG Code: https://gitlab.com/dev_phani/event-driven-dataprocessing/-/blob/main/airlow_batch_new.py

Creating Composer Environment
Airflow DAG

Step-3: Cloud Function:

Cloud Functions are event based serverless service, which will run programs based on events.

Use cloud function to trigger the DAG as soon as file lands in GCS bucket, set cloud storage bucket as trigger and FINALIZE as Event type, choose the landing bucket. Finalize event type will generate a event when object gets created in the bucket.

How to trigger DAG from cloud function: https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf

Follow the above documentation to create the cloud function, replace the airflow webserver id and client id with respective to your project.

Make sure cloud function service account has Composer User Role

Note: create the cloud composer before cloud function, to add required composer details in cloud function.

Function Code: https://gitlab.com/dev_phani/event-driven-dataprocessing/-/blob/main/fn_2_trigger_dag.py

Function Configuration
Function Logs

Step-4: BigQuery:

BigQuery is Google’s fully managed petabit scalable data warehouse, we’ll use BQ as our data warehouse, create a dataset and table to load the processed data.

Make sure BigQuery Dataset, Dataproc Cluster and Cloud Storage Bucket are created in same Region for High Availability

Step-5: Cloud Dataproc:

Cloud Dataproc is google managed Hadoop/spark cluster, Use dataproc cluster to process the raw data. it’s good practice to separate the storage from cluster i.e, store data in Cloud storage instead of HDFS. With this approach we can delete the cluster after it’s use and save cost.

Note: creation of cluster is automated using cloud composer.

Now ingest some sample data into GCS bucket, you can directly upload the file in bucket or you can use below command.

gsutil cp sample_data.parquet gs://landing-zone

In Case of Multiple buckets and Multiple DAGs Scenario:

In this case, publish the storage bucket events to cloud pub sub topic and subscribe to it using a function, Based on bucket from pub sub message identify which DAG should be triggered.

Architecture for Multiple Buckets Scenario

Step-1: Create Pub Sub Topic:

Pub Sub is google managed, scalable real time messaging queue, which guarantees at least once delivery.

Create a Pub Sub Topic, to Send GCS notifications.

Step-2: Create Notification on GCS:

Publish GCS event notification into pub sub topic for FINALIZE event type.

Command to create gcs notifications and publish them into pub sub topic

gsutil notification create -f json \
-e OBJECT_FINALIZE
-t topic-name gs://bucket-name

Step-3: Create Cloud Function:

Create a cloud function with pub sub topic as trigger, from the pub sub message extract the bucket name/filename and trigger the respective DAG.

Sample Pub Sub Message:

{
"kind": "storage#object",
"id": "bucket/object/generation_id",
"selfLink": "",
"name": "object_name",
"bucket": "bucket_name",
"generation": "generation_id",
"metageneration": "1",
"contentType": "application/octet-stream",
"timeCreated": "2023-02-26T10:10:00.845Z",
"updated": "2023-02-26T10:10:00.845Z",
"storageClass": "STANDARD",
"timeStorageClassUpdated": "2023-02-26T10:10:00.845Z",
"size": "0",
"md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
"mediaLink": "media_link",
"crc32c": "AAAAAA==",
"etag": "CKbl8YX5sv0CEAE="
}

Hope this will be helpful, Happy Learning :)

--

--

Venkata Phaneendra Reddy Janga

GCP Data Engineer working at Impetus Technologies, Hyderabad. Aspiring Cloud Architect. LinkedIn - www.linkedin.com/in/j-v-phanindra-reddy