ELT Batch pipeline with Cloud Storage, BigQuery orchestrated by Airflow/Composer
1. Explanation of the use case presented in this article
The goal of this article is showing a real world use case for ELT batch pipeline, with Cloud Storage, BigQuery, Apache Airflow and Cloud Composer :
- The Extract part is managed in Cloud Storage
- The Load part is managed from Cloud Storage to BigQuery
- The Transform part is managed by a BigQuery SQL query
Everything is orchestrated by Airflow and Cloud Composer 2
Here is the use case diagram of this article :
- Extract/Load : a Json
NEWLINE_DELIMITED_JSON
file containing the raw data are loaded to a raw table - Transform : a BigQuery SQL query applies all the needed transformations to calculate the domain data and insert the result to a domain table
An example of raw data in a Json
format :
{
"teamName": "PSG",
"teamScore": 30,
"scorers": [
{
"scorerFirstName": "Kylian",
"scorerLastName": "Mbappe",
"goals": 15,
"goalAssists": 6,
"games": 13
},
{
"scorerFirstName": "Da Silva",
"scorerLastName": "Neymar",
"goals": 11,
"goalAssists": 7,
"games": 12
},
{
"scorerFirstName": "Angel",
"scorerLastName": "Di Maria",
"goals": 7,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Lionel",
"scorerLastName": "Messi",
"goals": 12,
"goalAssists": 8,
"games": 13
},
{
"scorerFirstName": "Marco",
"scorerLastName": "Verrati",
"goals": 3,
"goalAssists": 10,
"games": 13
}
]
}
The corresponding computed domain data :
{
"teamName": "PSG",
"teamScore": 30,
"teamTotalGoals": 48,
"teamSlogan": "Paris est magique",
"topScorerStats": {
"firstName": "Kylian",
"lastName": "Mbappe",
"goals": 15,
"games": 13
},
"bestPasserStats": {
"firstName": "Marco",
"lastName": "Verrati",
"goalAssists": 10,
"games": 13
}
}
The goal is to calculate :
- The total goals per team
- The top scorer node
- The best passer node
- Set the slogan per team
I also created a video on this topic in my GCP Youtube channel, please subscribe to the channel to support my work for the Google Cloud community :
English version
French version
2. Structure of the project
2.1 The dag configuration
Airflow
gives the possibility to pass variables in his configuration server. It’s the more natural and performant way to set configuration for our DAGs
Instead of set our DAG configuration directly in a Python
code, we can create a json
file containing an object with a parent node :
{
"team_league_elt": {
"feature_name": "team_league_elt",
"team_stats_raw_write_disposition": "WRITE_APPEND",
"team_stats_raw_create_disposition": "CREATE_NEVER",
"dataset": "mazlum_test",
"team_stat_input_bucket": "mazlum_dev",
"team_stat_source_object": "airflow/team_league/elt/*.json",
"team_stat_table": "team_stat",
"team_stat_raw_table": "team_stat_raw"
}
}
In Cloud Composer, there is an object/folder called data
and we will copy our Json file there and in the Airflow
configuration server, via the following commands :
#!/usr/bin/env bash
set -e
set -o pipefail
set -u
echo "############# Deploying the data config variables of module ${FEATURE_NAME} to composer"
# deploy variables
gcloud composer environments storage data import \
--source ${CONFIG_FOLDER_NAME}/variables/${ENV}/variables.json \
--destination "${FEATURE_NAME}"/config \
--environment ${COMPOSER_ENVIRONMENT} \
--location ${LOCATION} \
--project ${PROJECT_ID}
gcloud beta composer environments run ${COMPOSER_ENVIRONMENT} \
--project ${PROJECT_ID} \
--location ${LOCATION} \
variables import \
-- /home/airflow/gcs/data/"${FEATURE_NAME}"/config/variables.json
echo "############# Variables of ${FEATURE_NAME} are well imported in environment ${COMPOSER_ENVIRONMENT} for project ${PROJECT_ID}"
After publishing the variables in Airflow
, we can access them in the Python
code via the Variable
object :
variables = Variable.get("team_league_elt", deserialize_json=True)
I created a class called Settings
that retrieves the DAG variables and exposes all the configuration fields for our DAG code logic :
import os
from dataclasses import dataclass
from datetime import timedelta
from airflow.models import Variable
from airflow.utils.dates import days_ago
_variables = Variable.get("team_league_elt", deserialize_json=True)
_feature_name = _variables["feature_name"]
@dataclass
class Settings:
dag_folder = os.getenv("DAGS_FOLDER")
dag_default_args = {
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
"start_date": days_ago(1)
}
project_id = os.getenv("GCP_PROJECT")
queries_path = os.path.join(
dag_folder,
_feature_name,
'dag',
'queries'
)
dataset = _variables["dataset"]
team_stat_raw_table = _variables["team_stat_raw_table"]
team_stat_table = _variables["team_stat_table"]
variables = _variables
- Some elements are retrieved from variables
- It’s worth noting that
Cloud Composer
gives natively some predefined environment variables, like GCP project ID and DAG root folder - We build the path containing our
SQL
query :feature_name/dag/queries
2.2 Python local environment
The Python
local environment uses PipEnv as a package manager and to automate the creation of virtual env.
You can check this video from my GCP Youtube channel that shows :
- How having a
Python
comfortable local environment with PyEnv, PipEnv, DirEnv and Intellij IDEA and navigate in all the files, classes and methods - How to automate the creation of the virtual env for our Python project
2.3 The dag code
The DAG root folder is team_league_elt
and the code containing the DAG logic with Python
is team_league_dag.py
:
import airflow
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from jinja2 import Template
from team_league_elt.dag.settings import Settings
settings = Settings()
def get_jinja_template(file_path: str) -> Template:
with open(f'{settings.queries_path}/{file_path}') as fp:
return Template(fp.read())
with airflow.DAG(
"team_league_elt",
default_args=settings.dag_default_args,
schedule_interval=None) as dag:
load_team_stats_raw_to_bq = GCSToBigQueryOperator(
task_id='load_team_stats_raw_to_bq',
bucket=settings.variables['team_stat_input_bucket'],
source_objects=[settings.variables['team_stat_source_object']],
destination_project_dataset_table=f'{settings.project_id}.{settings.dataset}.{settings.team_stat_raw_table}',
source_format='NEWLINE_DELIMITED_JSON',
compression='NONE',
create_disposition=settings.variables['team_stats_raw_create_disposition'],
write_disposition=settings.variables['team_stats_raw_write_disposition'],
autodetect=True
)
compute_and_insert_team_stats_domain_query = get_jinja_template('compute_and_insert_team_stats_data.sql').render(
project_id=settings.project_id,
dataset=settings.dataset,
team_stat_table=settings.team_stat_table,
team_stat_raw_table=settings.team_stat_raw_table
)
compute_and_insert_team_stats_domain = BigQueryInsertJobOperator(
task_id='compute_team_stats_domain',
configuration={
"query": {
"query": compute_and_insert_team_stats_domain_query,
"useLegacySql": False
}
},
location='EU'
)
load_team_stats_raw_to_bq >> compute_and_insert_team_stats_domain
Extract and Load : we have a first operator that loads the Json file from GCS to BigQuery for raw data : GCSToBigQueryOperator
This operator takes :
- The input bucket
- The input objects (NEWLINE_DELIMITED_JSON)
- The output table (raw)
⚠️ we used autodetect
mode in this example for simplicity, but we recommend using a BigQuery
Json schema instead, in order to have a perfect controle of the file ingestion and the column types in the output table.
Transform : the second operator allows to launch a BigQuery
job and a SQL
query, to apply transformations and business logics :
INSERT INTO `{{project_id}}.{{dataset}}.{{team_stat_table}}`
(
teamName,
teamScore,
teamSlogan,
teamTotalGoals,
topScorerStats,
bestPasserStats,
ingestionDate
)
SELECT
team_stats.teamName,
team_stats.teamScore,
team_slogan.teamSlogan,
sum(scorer.goals) as teamTotalGoals,
ARRAY_AGG(
STRUCT(
scorer.scorerFirstName AS firstName,
scorer.scorerLastName AS lastName,
scorer.goals AS goals,
scorer.games AS games
)
ORDER BY scorer.goals DESC LIMIT 1
)[OFFSET(0)] AS topScorerStats,
ARRAY_AGG(
STRUCT(
scorer.scorerFirstName AS firstName,
scorer.scorerLastName AS lastName,
scorer.goalAssists AS goalAssists,
scorer.games AS games
)
ORDER BY scorer.goalAssists DESC LIMIT 1
)[OFFSET(0)] AS bestPasserStats,
current_timestamp() as ingestionDate
FROM `{{project_id}}.{{dataset}}.{{team_stat_raw_table}}` team_stats
INNER JOIN `{{project_id}}.{{dataset}}.team_slogan` team_slogan ON team_stats.teamName = team_slogan.teamName,
UNNEST(team_stats.scorers) AS scorer
GROUP BY
team_stats.teamName,
team_stats.teamScore,
team_slogan.teamSlogan;
- This query uses a templating with
Jinja2
- A
group by
is done on team fields, because the calculation shoud be done per team - The top scorer node is calculated with
ARRAY_AGG
andORDER BY DESC
on the expected field. TheOFFSET(0)
gets the first element in the list - The logic is the same for the best passer but the
ORDER BY DESC
is applied on the concerned field - A
SUM
is used to calculate the total goals per team - A join is done on the
team_slogan
to retrieve the slogan field per team
For the Python
code, the Jinja2
template passes the configuration fields to the SQL query
The SQL query is then passed to the BigQueryInsertJobOperator
The final syntax allows to sequence the 2 tasks : task1 >> task2
The DAG folder is copied in the Composer
bucket : compose-bucket/dags
via the following commands :
#!/usr/bin/env bash
set -e
set -o pipefail
set -u
# Remove current DAG folder.
gcloud composer environments storage dags delete \
${FEATURE_NAME} \
-q \
--environment ${COMPOSER_ENVIRONMENT} \
--location ${LOCATION} \
--project ${PROJECT_ID}
echo "############# Current existing DAG folder ${FEATURE_NAME} is well deleted in environment ${COMPOSER_ENVIRONMENT} for project ${PROJECT_ID}"
# Then replace it.
gcloud composer environments storage dags import \
--source ${FEATURE_NAME} \
--environment ${COMPOSER_ENVIRONMENT} \
--location ${LOCATION} \
--project ${PROJECT_ID}
echo "############# DAG folder ${FEATURE_NAME} is well imported in environment ${COMPOSER_ENVIRONMENT} for project ${PROJECT_ID}"
We delete the old version of the DAG and deploy the new (we can also apply DAG versioning but it’s not the object of this article).
2.4 The setup.py file for DAGs
Our DAG uses Python
imports from the root folder team_league_elt
from team_league_elt.dag.settings import Settings
To work correctly with these imports in Cloud Composer
we have to copy a setup.py
file at the dag
root folder : composer-bucket/dags/setup.py
We have the following dags_setup/setup.py
file :
from setuptools import find_packages, setup
setup(
name="composer_env_python",
version="0.0.1",
install_requires=[],
packages=find_packages(),
)
We copy this file in the Composer
bucket via the following command :
#!/usr/bin/env bash
set -e
set -o pipefail
set -u
gcloud composer environments storage dags import \
--source dags_setup/setup.py \
--environment ${COMPOSER_ENVIRONMENT} \
--location ${LOCATION} \
--project ${PROJECT_ID}
echo "############# DAGs setup.py file is well imported in environment ${COMPOSER_ENVIRONMENT} for project ${PROJECT_ID}"
2.5 Run the manual DAG from the Airflow Webserver
Go to the Airflow Webserver via the Composer home page or environment configuration tab :
In the Webserver, launch manually our DAG with a click on the play button :
Access to the DAG graph :
When we click on a task, we can access to the Log and Rendered tabs :
Example of Rendered tab for the SQL query task :
This shows the final SQL query executed by the task, all the dynamic templating variables was replaced by their real values.
Conclusion
This use case showed how simple it is to orchestrate an ELT batch pipeline in Google Cloud with Airflow.
We only used two existing operators with a full compatibilty for Google Cloud services like Cloud Storage and BigQuery.
We didn’t need to manage the wait between tasks because Airflow manages it for us.
The code is simple and lightweight with Python.
The deployment of DAG and configuration in Composer is also simple via gcloud commands.
Cloud Composer manages and installs Airflow for us, but we have to configure correctly the machines sizing depending on the DAGs number (GKE autopilot and environment size).
This article was not intended to discuss about the cost and configuration aspects of the cluster, because we prefer having a dedicated article on this topic.
I planned to write other articles about pipeline orchestration :
- Create the Cloud Composer cluster with Terraform code
- CI CD to deploy this use case and all the commands presented in this article, with Cloud Build
- The same use case and ELT pipeline with Cloud Workflows
- Compare Cloud Composer with Cloud Workflows
- How to write an ETL pipeline with Composer/Airflow, Cloud Run or Dataflow and BigQuery
- How to write an ETL pipeline with Cloud Workflows, Cloud Run and BigQuery
- …..
All the code shared on this article is accessible from my Github
repository :
If you like my articles, videos and want to see my posts, follow me on :