ELT Batch pipeline with Cloud Storage, BigQuery orchestrated by Airflow/Composer

Mazlum Tosun
Google Cloud - Community
8 min readMay 4, 2023

--

1. Explanation of the use case presented in this article

The goal of this article is showing a real world use case for ELT batch pipeline, with Cloud Storage, BigQuery, Apache Airflow and Cloud Composer :

  • The Extract part is managed in Cloud Storage
  • The Load part is managed from Cloud Storage to BigQuery
  • The Transform part is managed by a BigQuery SQL query

Everything is orchestrated by Airflow and Cloud Composer 2

Here is the use case diagram of this article :

  • Extract/Load : a Json NEWLINE_DELIMITED_JSON file containing the raw data are loaded to a raw table
  • Transform : a BigQuery SQL query applies all the needed transformations to calculate the domain data and insert the result to a domain table

An example of raw data in a Json format :

{
"teamName": "PSG",
"teamScore": 30,
"scorers": [
{
"scorerFirstName": "Kylian",
"scorerLastName": "Mbappe",
"goals": 15,
"goalAssists": 6,
"games": 13
},
{
"scorerFirstName": "Da Silva",
"scorerLastName": "Neymar",
"goals": 11,
"goalAssists": 7,
"games": 12
},
{
"scorerFirstName": "Angel",
"scorerLastName": "Di Maria",
"goals": 7,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Lionel",
"scorerLastName": "Messi",
"goals": 12,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Marco",
"scorerLastName": "Verrati",
"goals": 3,
"goalAssists": 10,
"games": 13
}
]
}

The corresponding computed domain data :

{
"teamName": "PSG",
"teamScore": 30,
"teamTotalGoals": 48,
"teamSlogan": "Paris est magique",
"topScorerStats": {
"firstName": "Kylian",
"lastName": "Mbappe",
"goals": 15,
"games": 13
},
"bestPasserStats": {
"firstName": "Marco",
"lastName": "Verrati",
"goalAssists": 10,
"games": 13
}
}

The goal is to calculate :

  • The total goals per team
  • The top scorer node
  • The best passer node
  • Set the slogan per team

I also created a video on this topic in my GCP Youtube channel, please subscribe to the channel to support my work for the Google Cloud community :

English version

French version

2. Structure of the project

2.1 The dag configuration

Airflow gives the possibility to pass variables in his configuration server. It’s the more natural and performant way to set configuration for our DAGs

Instead of set our DAG configuration directly in a Python code, we can create a json file containing an object with a parent node :

{
"team_league_elt": {
"feature_name": "team_league_elt",
"team_stats_raw_write_disposition": "WRITE_APPEND",
"team_stats_raw_create_disposition": "CREATE_NEVER",
"dataset": "mazlum_test",
"team_stat_input_bucket": "mazlum_dev",
"team_stat_source_object": "airflow/team_league/elt/*.json",
"team_stat_table": "team_stat",
"team_stat_raw_table": "team_stat_raw"
}
}

In Cloud Composer, there is an object/folder called data and we will copy our Json file there and in the Airflow configuration server, via the following commands :

#!/usr/bin/env bash

set -e
set -o pipefail
set -u

echo "############# Deploying the data config variables of module ${FEATURE_NAME} to composer"

# deploy variables
gcloud composer environments storage data import \
--source ${CONFIG_FOLDER_NAME}/variables/${ENV}/variables.json \
--destination "${FEATURE_NAME}"/config \
--environment ${COMPOSER_ENVIRONMENT} \
--location ${LOCATION} \
--project ${PROJECT_ID}

gcloud beta composer environments run ${COMPOSER_ENVIRONMENT} \
--project ${PROJECT_ID} \
--location ${LOCATION} \
variables import \
-- /home/airflow/gcs/data/"${FEATURE_NAME}"/config/variables.json

echo "############# Variables of ${FEATURE_NAME} are well imported in environment ${COMPOSER_ENVIRONMENT} for project ${PROJECT_ID}"

After publishing the variables in Airflow , we can access them in the Python code via the Variable object :

variables = Variable.get("team_league_elt", deserialize_json=True)

I created a class called Settings that retrieves the DAG variables and exposes all the configuration fields for our DAG code logic :

import os
from dataclasses import dataclass
from datetime import timedelta

from airflow.models import Variable
from airflow.utils.dates import days_ago

_variables = Variable.get("team_league_elt", deserialize_json=True)
_feature_name = _variables["feature_name"]


@dataclass
class Settings:
dag_folder = os.getenv("DAGS_FOLDER")
dag_default_args = {
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
"start_date": days_ago(1)
}
project_id = os.getenv("GCP_PROJECT")
queries_path = os.path.join(
dag_folder,
_feature_name,
'dag',
'queries'
)

dataset = _variables["dataset"]
team_stat_raw_table = _variables["team_stat_raw_table"]
team_stat_table = _variables["team_stat_table"]

variables = _variables
  • Some elements are retrieved from variables
  • It’s worth noting that Cloud Composer gives natively some predefined environment variables, like GCP project ID and DAG root folder
  • We build the path containing our SQL query : feature_name/dag/queries

2.2 Python local environment

The Python local environment uses PipEnv as a package manager and to automate the creation of virtual env.

You can check this video from my GCP Youtube channel that shows :

  • How having a Python comfortable local environment with PyEnv, PipEnv, DirEnv and Intellij IDEA and navigate in all the files, classes and methods
  • How to automate the creation of the virtual env for our Python project

2.3 The dag code

The DAG root folder is team_league_elt and the code containing the DAG logic with Python is team_league_dag.py :

import airflow
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from jinja2 import Template

from team_league_elt.dag.settings import Settings

settings = Settings()


def get_jinja_template(file_path: str) -> Template:
with open(f'{settings.queries_path}/{file_path}') as fp:
return Template(fp.read())


with airflow.DAG(
"team_league_elt",
default_args=settings.dag_default_args,
schedule_interval=None) as dag:
load_team_stats_raw_to_bq = GCSToBigQueryOperator(
task_id='load_team_stats_raw_to_bq',
bucket=settings.variables['team_stat_input_bucket'],
source_objects=[settings.variables['team_stat_source_object']],
destination_project_dataset_table=f'{settings.project_id}.{settings.dataset}.{settings.team_stat_raw_table}',
source_format='NEWLINE_DELIMITED_JSON',
compression='NONE',
create_disposition=settings.variables['team_stats_raw_create_disposition'],
write_disposition=settings.variables['team_stats_raw_write_disposition'],
autodetect=True
)

compute_and_insert_team_stats_domain_query = get_jinja_template('compute_and_insert_team_stats_data.sql').render(
project_id=settings.project_id,
dataset=settings.dataset,
team_stat_table=settings.team_stat_table,
team_stat_raw_table=settings.team_stat_raw_table
)

compute_and_insert_team_stats_domain = BigQueryInsertJobOperator(
task_id='compute_team_stats_domain',
configuration={
"query": {
"query": compute_and_insert_team_stats_domain_query,
"useLegacySql": False
}
},
location='EU'
)

load_team_stats_raw_to_bq >> compute_and_insert_team_stats_domain

Extract and Load : we have a first operator that loads the Json file from GCS to BigQuery for raw data : GCSToBigQueryOperator

This operator takes :

  • The input bucket
  • The input objects (NEWLINE_DELIMITED_JSON)
  • The output table (raw)

⚠️ we used autodetect mode in this example for simplicity, but we recommend using a BigQuery Json schema instead, in order to have a perfect controle of the file ingestion and the column types in the output table.

Transform : the second operator allows to launch a BigQuery job and a SQL query, to apply transformations and business logics :

INSERT INTO `{{project_id}}.{{dataset}}.{{team_stat_table}}`
(
teamName,
teamScore,
teamSlogan,
teamTotalGoals,
topScorerStats,
bestPasserStats,
ingestionDate
)
SELECT
team_stats.teamName,
team_stats.teamScore,
team_slogan.teamSlogan,
sum(scorer.goals) as teamTotalGoals,
ARRAY_AGG(
STRUCT(
scorer.scorerFirstName AS firstName,
scorer.scorerLastName AS lastName,
scorer.goals AS goals,
scorer.games AS games
)
ORDER BY scorer.goals DESC LIMIT 1
)[OFFSET(0)] AS topScorerStats,
ARRAY_AGG(
STRUCT(
scorer.scorerFirstName AS firstName,
scorer.scorerLastName AS lastName,
scorer.goalAssists AS goalAssists,
scorer.games AS games
)
ORDER BY scorer.goalAssists DESC LIMIT 1
)[OFFSET(0)] AS bestPasserStats,
current_timestamp() as ingestionDate
FROM `{{project_id}}.{{dataset}}.{{team_stat_raw_table}}` team_stats
INNER JOIN `{{project_id}}.{{dataset}}.team_slogan` team_slogan ON team_stats.teamName = team_slogan.teamName,
UNNEST(team_stats.scorers) AS scorer
GROUP BY
team_stats.teamName,
team_stats.teamScore,
team_slogan.teamSlogan;
  • This query uses a templating with Jinja2
  • A group by is done on team fields, because the calculation shoud be done per team
  • The top scorer node is calculated with ARRAY_AGG and ORDER BY DESC on the expected field. The OFFSET(0) gets the first element in the list
  • The logic is the same for the best passer but the ORDER BY DESC is applied on the concerned field
  • A SUM is used to calculate the total goals per team
  • A join is done on the team_slogan to retrieve the slogan field per team

For the Python code, the Jinja2 template passes the configuration fields to the SQL query

The SQL query is then passed to the BigQueryInsertJobOperator

The final syntax allows to sequence the 2 tasks : task1 >> task2

The DAG folder is copied in the Composer bucket : compose-bucket/dags via the following commands :

#!/usr/bin/env bash

set -e
set -o pipefail
set -u

# Remove current DAG folder.
gcloud composer environments storage dags delete \
${FEATURE_NAME} \
-q \
--environment ${COMPOSER_ENVIRONMENT} \
--location ${LOCATION} \
--project ${PROJECT_ID}

echo "############# Current existing DAG folder ${FEATURE_NAME} is well deleted in environment ${COMPOSER_ENVIRONMENT} for project ${PROJECT_ID}"

# Then replace it.
gcloud composer environments storage dags import \
--source ${FEATURE_NAME} \
--environment ${COMPOSER_ENVIRONMENT} \
--location ${LOCATION} \
--project ${PROJECT_ID}

echo "############# DAG folder ${FEATURE_NAME} is well imported in environment ${COMPOSER_ENVIRONMENT} for project ${PROJECT_ID}"

We delete the old version of the DAG and deploy the new (we can also apply DAG versioning but it’s not the object of this article).

2.4 The setup.py file for DAGs

Our DAG uses Python imports from the root folder team_league_elt

from team_league_elt.dag.settings import Settings

To work correctly with these imports in Cloud Composer we have to copy a setup.py file at the dag root folder : composer-bucket/dags/setup.py

We have the following dags_setup/setup.py file :

from setuptools import find_packages, setup

setup(
name="composer_env_python",
version="0.0.1",
install_requires=[],
packages=find_packages(),
)

We copy this file in the Composer bucket via the following command :

#!/usr/bin/env bash

set -e
set -o pipefail
set -u

gcloud composer environments storage dags import \
--source dags_setup/setup.py \
--environment ${COMPOSER_ENVIRONMENT} \
--location ${LOCATION} \
--project ${PROJECT_ID}

echo "############# DAGs setup.py file is well imported in environment ${COMPOSER_ENVIRONMENT} for project ${PROJECT_ID}"

2.5 Run the manual DAG from the Airflow Webserver

Go to the Airflow Webserver via the Composer home page or environment configuration tab :

In the Webserver, launch manually our DAG with a click on the play button :

Access to the DAG graph :

When we click on a task, we can access to the Log and Rendered tabs :

Example of Rendered tab for the SQL query task :

This shows the final SQL query executed by the task, all the dynamic templating variables was replaced by their real values.

Conclusion

This use case showed how simple it is to orchestrate an ELT batch pipeline in Google Cloud with Airflow.

We only used two existing operators with a full compatibilty for Google Cloud services like Cloud Storage and BigQuery.

We didn’t need to manage the wait between tasks because Airflow manages it for us.

The code is simple and lightweight with Python.

The deployment of DAG and configuration in Composer is also simple via gcloud commands.

Cloud Composer manages and installs Airflow for us, but we have to configure correctly the machines sizing depending on the DAGs number (GKE autopilot and environment size).

This article was not intended to discuss about the cost and configuration aspects of the cluster, because we prefer having a dedicated article on this topic.

I planned to write other articles about pipeline orchestration :

  • Create the Cloud Composer cluster with Terraform code
  • CI CD to deploy this use case and all the commands presented in this article, with Cloud Build
  • The same use case and ELT pipeline with Cloud Workflows
  • Compare Cloud Composer with Cloud Workflows
  • How to write an ETL pipeline with Composer/Airflow, Cloud Run or Dataflow and BigQuery
  • How to write an ETL pipeline with Cloud Workflows, Cloud Run and BigQuery
  • …..

All the code shared on this article is accessible from my Github repository :

If you like my articles, videos and want to see my posts, follow me on :

--

--

Mazlum Tosun
Google Cloud - Community

GDE Cloud | Head of Data & Cloud GroupBees | Data | Serverless | IAC | Devops | FP