Deploy a Reusable Custom Data Pipeline Using Dataflow Flex Template in GCP

(Python SDK)

Amarachi Ogu
7 min readDec 2, 2022

Have you ever been in a situation where your pipeline works on your machine but breaks on the client’s machine?

Use Dataflow Flex Template to deploy reusable custom pipeline in a docker container

Don’t be like this developer 😀 use Dataflow Flex Template.

Dataflow Flex Template is a great way to easily turn any Dataflow pipeline into a reusable template that anyone with the right permissions can deploy.

It packages an existing pipeline and its dependencies as a docker image on your project’s Artifact Registry Repository and creates a template specification (spec) file that is pushed to a Google Cloud Storage (GCS) bucket, which can be easily executed by referring to that spec file.

This separates the pipeline construction (i.e development and packaging) from its execution, as two independent roles.

In the construction phase, the developer:

  • Create a pipeline by writing code and tests.
  • Stage serialized DAG (execution graph) as a template on Cloud Storage.

In the execution phase, users execute the pipeline by referring to the template on Cloud Storage.

Dataflow Flex Template supports execution via a variety of channels (Console, gcloud, REST API, Cloud Scheduler). And there are no run-time environment dependencies, regardless of how a user runs it, manually or if it’s being automated.

Therefore, the Flex Templates:

  • Helps developers to build and share reusable pipelines.
  • Allows users to launch pipelines without the need for a development environment or any pipeline dependencies installed on the local machine.
  • Allow users to schedule jobs using cloud-native services (like Cloud Scheduler).

There are actually two kinds of Dataflow templates:

  1. Classic Templates
  2. Flex Templates.

You can find out more about Dataflow templates here.

This blog talks about an approach to building and deploying a Dataflow Flex template. It also shares some of the challenges faced while building this template and how they were resolved. Let’s get started!

Context

The pipeline that will be used here is a streaming pipeline that reads messages from a Cloud Pub/Sub subscriber, applies a fixed window technique, performs aggregations, and writes the resulting PCollections to a BigQuery table for analytical use.

Use Dataflow Flex Template to deploy reusable custom pipeline in a docker container

The process of building this pipeline has been explained in a previous blog, if you’ve not already seen it, you may consider checking it out first.

The code for building the flex template is found in this GitHub repo.

Setting Up

To get started, complete the following tasks:

  1. Enable GCP APIs - In addition to the APIs already enabled in the previous blog, enable the Artifact Registry and Cloud Build APIs.
  2. Grant your service account permission to build a flex template - In addition to the already existing permissions, grant the following permissions (a) Storage Object Admin (b) Dataflow Worker (c) Artifact Registry Repository Administrator.
  3. Create a Cloud Storage Bucket. In this case, it was named pipe-line-demo-bucket.
  4. Create a BigQuery dataset. In this case, it was named twitter-data.
  5. Create an Artifact Registry repository.

Note: All but the last task were explained in a previous blog.

Create an Artifact Registry repository

Before we can upload our container image to Artifact Registry we need to create an Artifact Registry repository.

To create a new repository, run the following command:

gcloud artifacts repositories create REPOSITORY \
--repository-format=docker \
--location=REGION \
--async

Replace REPOSITORY and REGION with a name for your repository and region respectively. For each repository location in a project, repository names must be unique.

In this case, we’d name the repository twitter-pipeline in the us-central1 region.

gcloud artifacts repositories create twitter-pipeline \
--repository-format=docker \
--location=us-central1 \
--async

File Layout

The code for this project can be seen in this GitHub repo.

.
├── .gitignore
├── Dockerfile
├── README.md
├── agg_and_write2bq.py
├── metadata.json
└── requirements.txt

  1. .gitignore: Specifies intentionally untracked files that Git should ignore
  2. Dockerfile: Contains specifications for the container which will package the pipeline code.
  3. README.md: A brief overview of the code
  4. agg_and_write2bq.py: contains all the pipeline code.
  5. Metadata.json: contains Metadata about the pipeline and validation rules for its parameters. For more info refer to the documentation.
  6. requirements.txt: ideally should contain packages needed to be installed on the Container. But left empty in this case.

Build the Template

At the end of this phase, we will achieve the following:

  1. Use a Dockerfile to build a Docker container that contains the pipeline code.
  2. Push the Docker container to the Artifact Registry Repository we created earlier.
  3. Deploy the template spec file to the created GCS Bucket.

The Dockerfile

# set base image
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

# copy files
COPY requirements.txt .
COPY agg_and_write2bq.py .

# set environment variables
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/agg_and_write2bq.py"

# Install apache-beam and other dependencies to launch the pipeline
RUN apt-get update
RUN pip install --no-cache-dir --upgrade pip
RUN pip install 'apache-beam[gcp]==2.43.0'
RUN pip install -U -r ./requirements.txt

# Since we already downloaded all the dependencies, there's no need to rebuild everything.
ENV PIP_NO_DEPS=True

Note the following about the Dockerfile:

  1. The image used in the Dockerfile is a Google-provided base image which keeps the setup simple. You may choose the most recent tag from the Flex Templates base images. You can also use a custom base image. See more details here.
  2. For packages to be part of the Beam container, they must be specified as part of the requirements.txt file. Ensure that you do not specify apache-beam as part of the requirements.txt file. The Beam container already has apache-beam.
  3. The environment variable FLEX_TEMPLATE_PYTHON_PY_FILE was set to point to the pipeline code and FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE points to the requirements.txt file. Ensure to set this correctly. You can see the documentation for more details.
  4. The apache-beam version pines to 2.43.0. Check this documentation for SDK version support status

Build the image

Next, we’d use Cloud Build to build the docker image and push it to the Artifact Registry repository we created earlier.

Run the following command to configure Docker to authenticate requests for Artifact Registry.

gcloud auth configure-docker us-central1-docker.pkg.dev

Next, push the image to Artifact Registry. Run the command

gcloud builds submit --tag us-central1-docker.pkg.dev/PROJECT-ID/twitter-pipeline/dataflow/stream-data:latest .

Create the Flex Template

In this stage, we will create the template spec file in a GCS bucket that contains all of the information needed to run the job, such as the SDK information and metadata.

cloud dataflow flex-template build gs://pipe-line-demo-bucket/dataflow/templates/streaming-pipeline \
--image "us-central1-docker.pkg.dev/PROJECT_ID/twitter-pipeline/dataflow/stream-data:latest" \
--sdk-language "PYTHON" \
--metadata-file "metadata.json"

Run the Flex Template pipeline

We’re finally there! We can now run the Apache Beam pipeline in Dataflow by referring to the template spec file and passing the template parameters required by the pipeline.

The parameters required in this case are input_subscription and output_table.

cloud dataflow flex-template run "twitter-pipeline`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location "gs://pipe-line-demo-bucket/dataflow/templates/streaming-pipeline" \
--parameters input_subscription "projects/PROJECT_ID/subscriptions/twitter-pipeline-sub" \
--parameters output_table "PROJECT_ID:twitter_data.agg_tweets" \
--region "us-west1" \
--service-account-email "twitter-pipeline@PROJECT_ID.iam.gserviceaccount.com"

In the console, navigate to dataflow jobs to confirm that the job has started.

The resulting pipeline graph

Troubleshooting

While developing this Flex Template, two major errors were encountered:

  1. Failed to start the VM…UNAVAILABLE, reason: One or more operations had an error.

The Google Cloud error model indicates that a 503 error means that the service is unavailable. Typically the server is down.

It was figured that the issue was with the selected region. To resolve this issue, while running the Flex Template, chose a different region. In this case, the region was changed from ‘us-central1’ to ‘us-west1’and it worked.

  1. Failed to read the result file…Unable to open template file:

In this case, it was figured that the issue was with the service account not having all the required permissions.

The following additional permissions were granted to the service account:

  • Storage Object Admin (roles/storage.objectAdmin)
  • Dataflow Worker (roles/dataflow.worker)
  • Artifact Registry Repository Administrator

After granting these permissions, the issue was resolved.

Thanks for reading. I hope this provides you with some helpful insight on building a Dataflow Flex template.

You are welcome to follow me on LinkedIn or Twitter.

Resources

Beam Summit Flex Template Demo

Using Flex Templates documentation

gcloud dataflow flex-template run

Dataflow security and permissions

Build, Test and Deploy Dataflow Flex Templates in Google Cloud

Why you should be using Flex templates for your Dataflow deployments

How to deploy a dataflow template using flex templates

Configure Flex Templates

Deploying a Dataflow pipeline

Use the Dataflow monitoring interface

Troubleshoot Dataflow errors

Troubleshoot Slow or Stuck Jobs in Google Cloud Dataflow

--

--