Event Driven Data Processing on Google Cloud Platform
Introduction
In Data Engineering, we have cases where there are scheduled workflows which will run at a particular window of time. Even if the files arrives a bit early or late the workflows will start running in that scheduled window only, sometimes this may lead to wastage of computational resources which may lead to excessive billing.
Let’s see how we can solve this problem on GCP using below Architecture.
Flow:
When file gets created in the GCS bucket, cloud function gets triggered, which will make a request to Airflow REST API to run the DAG, DAG will create a cluster, submit the job(which loads data into BQ) and finally deletes the Cluster.
Services Used :
- Cloud Storage
- Cloud Dataproc
- Cloud Functions
- Cloud Composer
- BigQuery
Step-1: Cloud Storage:
Create 2 Cloud Storage Bucket, for ingesting files and storing output in another bucket:
- gs://landing-zone-dev
- gs://staging-zone-dev
Step-2: Cloud Composer:
Create a Cloud Composer Environment, cloud composer is Google’s managed airflow instance with which we will automate the entire workflow, right from creating cluster to processing the file and loading it into final entity
For Airflow 2, REST API is enabled by default, if you’re using Airflow 1, please refer below link to enable: https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf
The cloud Composer DAG will do the below Tasks:
- Create a Dataproc cluster.
- Submit PySpark job
- Delete the Dataproc cluster
DAG Code: https://gitlab.com/dev_phani/event-driven-dataprocessing/-/blob/main/airlow_batch_new.py
Step-3: Cloud Function:
Cloud Functions are event based serverless service, which will run programs based on events.
Use cloud function to trigger the DAG as soon as file lands in GCS bucket, set cloud storage bucket as trigger and FINALIZE as Event type, choose the landing bucket. Finalize event type will generate a event when object gets created in the bucket.
How to trigger DAG from cloud function: https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf
Follow the above documentation to create the cloud function, replace the airflow webserver id and client id with respective to your project.
Make sure cloud function service account has Composer User Role
Note: create the cloud composer before cloud function, to add required composer details in cloud function.
Function Code: https://gitlab.com/dev_phani/event-driven-dataprocessing/-/blob/main/fn_2_trigger_dag.py
Step-4: BigQuery:
BigQuery is Google’s fully managed petabit scalable data warehouse, we’ll use BQ as our data warehouse, create a dataset and table to load the processed data.
Make sure BigQuery Dataset, Dataproc Cluster and Cloud Storage Bucket are created in same Region for High Availability
Step-5: Cloud Dataproc:
Cloud Dataproc is google managed Hadoop/spark cluster, Use dataproc cluster to process the raw data. it’s good practice to separate the storage from cluster i.e, store data in Cloud storage instead of HDFS. With this approach we can delete the cluster after it’s use and save cost.
- Create dataproc cluster with bigquery-spark connector, for this use initialization action: https://github.com/GoogleCloudDataproc/initialization-actions/tree/master/connectors, this init action will let dataproc connect to BigQuery.
Note: creation of cluster is automated using cloud composer.
Now ingest some sample data into GCS bucket, you can directly upload the file in bucket or you can use below command.
gsutil cp sample_data.parquet gs://landing-zone
In Case of Multiple buckets and Multiple DAGs Scenario:
In this case, publish the storage bucket events to cloud pub sub topic and subscribe to it using a function, Based on bucket from pub sub message identify which DAG should be triggered.
Step-1: Create Pub Sub Topic:
Pub Sub is google managed, scalable real time messaging queue, which guarantees at least once delivery.
Create a Pub Sub Topic, to Send GCS notifications.
Step-2: Create Notification on GCS:
Publish GCS event notification into pub sub topic for FINALIZE event type.
Command to create gcs notifications and publish them into pub sub topic
gsutil notification create -f json \
-e OBJECT_FINALIZE
-t topic-name gs://bucket-name
Step-3: Create Cloud Function:
Create a cloud function with pub sub topic as trigger, from the pub sub message extract the bucket name/filename and trigger the respective DAG.
Sample Pub Sub Message:
{
"kind": "storage#object",
"id": "bucket/object/generation_id",
"selfLink": "",
"name": "object_name",
"bucket": "bucket_name",
"generation": "generation_id",
"metageneration": "1",
"contentType": "application/octet-stream",
"timeCreated": "2023-02-26T10:10:00.845Z",
"updated": "2023-02-26T10:10:00.845Z",
"storageClass": "STANDARD",
"timeStorageClassUpdated": "2023-02-26T10:10:00.845Z",
"size": "0",
"md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
"mediaLink": "media_link",
"crc32c": "AAAAAA==",
"etag": "CKbl8YX5sv0CEAE="
}
Hope this will be helpful, Happy Learning :)