How to use Google Cloud Vertex AI to build an ML pipeline using Kubeflow?

outsidenox
12 min readNov 30, 2022

Deploying Machine Learning (ML) models to production at a company with a framework already in place is extremely challenging. As a team who wants to deliver ML models to solve problems in the real world, we, therefore, defined a set of processes to make ML development more reliable and productive.
Like many others, our company is embarking on a journey of on-premises-to-cloud migration. Knowing this, Google launched Vertex AI to pave the way for this transformation.

What is Vertex AI?

Vertex AI is a managed Machine Learning platform that provides you with all of Google’s cloud services in one place to deploy and maintain AI models. This solution allows the team to customize builds, deploy and scale ML models much faster. Using other services like Cloud Build for CI/CD and Cloud Functions for automatically triggering Vertex AI Pipelines, allows us to develop an automated pipeline, reducing operational overhead and deployment times.

But, why do we use Machine Learning pipelines?

Let’s first understand why ML pipelines are critical to optimise and automating the end-to-end workflow of a Machine Learning project. The main features are the following:

  • Perform data validation;
  • Reproduction and orchestration processes;
  • Alarm setting for model monitoring;
  • Model versioning;
  • Easily upgrade or downgrade machines for model training;
  • Auto-scalable model-serving endpoints;
  • Detect data shift and concept drift;
  • A/B testing for different ML models;
  • Define each step as a unique module of the overall process;

This modular approach helps organisations get a holistic view of machine learning models, aiding the management of their end-to-end process.
To summarise, replacing manual processes with automatic ones decreases human error and allows for faster delivery, more accurate models, and a better return on investment.

Okay, super interesting! So how can we create ML pipelines on Vertex AI?

To answer this question we have to introduce Kubeflow pipelines.

Kubeflow Pipelines is an open-source platform for running, monitoring, auditing, and managing Machine Learning pipelines. This tool allows us to create portable and scalable ML workflows based on components.
This modular design has multiple advantages, such as better re-usability, workload handling, and easier debugging processes.

At this point, you know why we are advocating the use of Kubeflow pipelines so let’s build one!

The remaining article describes a step-by-step tutorial to run your first model using Kubeflow pipelines. We also provide a notebook with the entire code please clone the project on the link.

🧃 STEP 0 — Download the required packages

First, you must have Python 3.5 or a later version installed on your local machine. Kubeflow Pipelines SDK package only works for those Python versions. Next, you also need to install the Google Cloud AI Platform package to connect your code with Google’s Vertex AI.

!pip install kfp==1.8.14
!pip install google-cloud-aiplatform==1.18.1

📚 STEP 1 — Import the necessary libraries

Let’s import the libraries and methods needed for this tutorial.

import json
from typing import NamedTuple
from datetime import datetime
import google.cloud.aiplatform as aiplatform
from kfp.v2.dsl import (compiler, component, Input, Model, Output, Dataset,
Artifact, OutputPath, ClassificationMetrics,
Metrics, InputPath)

📦 STEP 2: Building the components

The next step is to build each component separately, where each one has a specific job and meaning.

Each component is defined as a Python function. Then, we are going to apply a @component decorator, which specifies the base_image and packages_to_install in that component.

  • To define the packages_to_install at the beginning of your script you need to specify the Python libraries and the respective versions such as:
GCP_BIGQUERY = "google-cloud-bigquery==2.30.0"
PANDAS = "pandas==1.3.2"
SKLEARN = "scikit-learn==1.0.2"
NUMPY = "numpy==1.21.6"
  • To define the base_image, we must create a Docker image with Python and deploy it to the Artifact Registry on GCP, or you can use a pre-built image provided by Google. In this tutorial, we picked one from here.
BASE_IMAGE = "gcr.io/deeplearning-platform-release/xgboost-cpu"

Next, you must initiate the Python function with the artifacts or parameters needed. Artifacts represent large and complex data structures stored on Google Cloud Storage. This tutorial uses the following artifacts: Metrics, Dataset, Visualization, or Model. On the other hand, parameters are small data types such as int, float, bool, dict, str, or list, which you can pass directly through components.

To pass variables and data structures between components, we need to introduce a couple of new keywords:

  • inputs -> Input data that can be an artifact or parameter depending on the data type referred to as Input[ARTIFACT]or InputPath();
  • outputs -> Return data that are files using paths referred to as Output[ARTIFACT]or OutputPath().

The InputPath() and the OutputPath() are used when there is another type of complex format (e.g. pickle) that cannot be used when using Input or Output.

Let’s put into practice all these new concepts that you have learned.

📥 STEP 3.1. Component — Ingest data from Big Query

For this tutorial, we use a public dataset provided by UCI -> Iris Data Set. There is a wide variety of public datasets on GCP Dataset Marketplace that you can use if you want to choose another.

The next step is to upload the data into Big Query. To this end, you must follow these instructions:

  • Go to Big Query -> SQL workspace -> Select your current project -> Create data set -> Create a new table

Then, you should be able to use the dataset (medium_article) and table (iris_dataset) created as shown below.

The Ingest Data component creates a table on Big Query with some initial transformations.

@component(base_image=BASE_IMAGE, packages_to_install=[GCP_BIGQUERY])
def query_to_table(
query: str,
project_id: str,
dataset_id: str,
table_id: str,
location: str = "EU",
query_job_config: dict = None,
) -> None:
"""
Run the query and create a new BigQuery table
"""

import google.cloud.bigquery as bq

# Configure your query job
job_config = bq.QueryJobConfig(destination=f"{project_id}.{dataset_id}.{table_id}",
**query_job_config)

# Initiate the Big Query client to connect with the project
bq_client = bq.Client(project=project_id,
location=location)

# Generate the query with all the job configurations
query_job = bq_client.query(query,
job_config=job_config)

After submitting the pipeline and passing through this component, a temporary table (temp_table_medium) appears inside the dataset (medium_article). The table may have some transformations depending on the performed query.

☁ STEP 3.2. Component — Ingest data from Big Query to Google Cloud Storage

The second component extracts the table on Big Query and stores it on Google Cloud Storage under the path defined in PIPELINE_ROOT (this parameter is defined in section 5).

This component is essential to perform the subsequent steps of data manipulation and modelling using python. Next, the data must be serialized and deserialized to go through components. To that end, we need to extract the data from BigQuery and store it on Google Cloud Storage.

@component(base_image=BASE_IMAGE, packages_to_install=[GCP_BIGQUERY])
def extract_table_to_gcs(
project_id: str,
dataset_id: str,
table_id: str,
dataset: Output[Dataset],
location: str = "EU",
) -> None:
"""
Extract a Big Query table into Google Cloud Storage.
"""

import logging
import os
import google.cloud.bigquery as bq

# Get the table generated on the previous component
full_table_id = f"{project_id}.{dataset_id}.{table_id}"
table = bq.table.Table(table_ref=full_table_id)

# Initiate the Big Query client to connect with the project
job_config = bq.job.ExtractJobConfig(**{})
client = bq.client.Client(project=project_id, location=location)

# Submit the extract table job to store on GCS
extract_job = client.extract_table(table, dataset.uri)

After submitting the pipeline and passing through this component, under the PIPELINE_ROOT on Google Cloud Storage (GCS), it is possible to see the file dataset with the extracted data from Big Query (the name is dataset because on the component we defined as output dataset: Output[Dataset]), as follows:

✂ STEP 3.3. Component — Create the training and test set

This component creates the training and test set using the data stored on GCS from the previous component.

@component(
base_image=BASE_IMAGE, packages_to_install=[PANDAS, SKLEARN]
)
def create_sets(
data_input: Input[Dataset],
dataset_train: OutputPath(),
dataset_test: OutputPath(),
col_label: str,
col_training: list
) -> NamedTuple("Outputs", [("dict_keys", dict), ("shape_train", int), ("shape_test", int)]):


"""
Split data into train and test sets.
"""

import logging
import pickle

import pandas as pd
from sklearn import model_selection

def convert_labels_to_categories(labels):
"""
The function returns a dictionary with the encoding of labels.
:returns: A Pandas DataFrame with all the metrics
"""
try:
dic_keys = {k: label for k, label in enumerate(sorted(labels.unique()))}
dic_vals = {label: k for k, label in enumerate(sorted(labels.unique()))}
return dic_vals, dic_keys
except Exception as e:
print(f'[ERROR] Something went wrong that is {e}')
return {}, {}


df_ = pd.read_csv(data_input.path)

df_.dropna(inplace=True)

logging.info(f"[START] CREATE SETS, starts with an initial shape of {df_.shape}")

if len(df_) != 0:

yy = df_[col_label]
dic_vals, dic_keys = convert_labels_to_categories(yy)

yy = yy.apply(lambda v: dic_vals[v])
xx = df_[col_training]

x_train, x_test, y_train, y_test = model_selection.train_test_split(xx, yy, test_size=0.2, random_state=0, stratify=yy)

x_train_results = {'x_train': x_train, 'y_train': y_train}
x_test_results = {'x_test': x_test, 'y_test': y_test}

with open(dataset_train + f".pkl", 'wb') as file:
pickle.dump(x_train_results, file)

with open(dataset_test + ".pkl", 'wb') as file:
pickle.dump(x_test_results, file)

logging.info(f"[END] CREATE SETS, data set was split")

return (dic_keys, len(x_train), len(x_test))

else:
logging.error(f"[END] CREATE SETS, data set is empty")
return (None, None, None)

Since the previous component stored the data on GCS using the path PIPELINE_ROOT, we can see the two files (dataset_test.pkl and dataset_train.pkl) on the correspondent bucket.

💪🏾 STEP 3.4. Component — Let’s train the model!

At this point, we have the training and test set stored on the GCS. It is time to train our model with the training data.

For this tutorial, we use a Logistic Regression model. In this step, you can take advantage of using a modular solution to scale up for a better machine or add an accelerator to speed up the training time. To explore this topic, please explore this link.

@component(
base_image=BASE_IMAGE, packages_to_install=[SKLEARN, PANDAS]
)
def train_model(
training_data: InputPath(),
model: Output[Model],
) -> None:
"""
Train a classification model.
"""

import logging
import os
import pickle
import joblib
import numpy as np
from sklearn.linear_model import LogisticRegression

logging.getLogger().setLevel(logging.INFO)

# you have to load the training data
with open(training_data + ".pkl", 'rb') as file:
train_data = pickle.load(file)

X_train = train_data['x_train']
y_train = train_data['y_train']

logging.info(f"X_train shape {X_train.shape}")
logging.info(f"y_train shape {y_train.shape}")

logging.info("Starting Training...")

clf = LogisticRegression(n_jobs=-1, random_state=42)
train_model = clf.fit(X_train, y_train)

# ensure to change GCS to local mount path
os.makedirs(model.path, exist_ok=True)

# ensure that you save the final model as a .joblib
logging.info(f"Save model to: {model.path}")
joblib.dump(train_model, model.path + "/model.joblib")

After submitting the pipeline and passing through this component, a model with the format .joblib is created and stored in GCS.

🎯 STEP 3.5. Component — Make some predictions

The following component can assess the model’s performance by testing it on unseen data, the test set. The predictions are also stored on GCS.

@component(
base_image=BASE_IMAGE, packages_to_install=[PANDAS]
)
def predict_model(
test_data: InputPath(),
model: Input[Model],
predictions: Output[Dataset],
) -> None:


"""
Create the predictions of the model.
"""

import logging
import os
import pickle
import joblib
import pandas as pd

logging.getLogger().setLevel(logging.INFO)

# you have to load the test data
with open(test_data + ".pkl", 'rb') as file:
test_data = pickle.load(file)

X_test = test_data['x_test']
y_test = test_data['y_test']

# load model
model_path = os.path.join(model.path, "model.joblib")
model = joblib.load(model_path)
y_pred = model.predict(X_test)

# predict and save to prediction column
df = pd.DataFrame({
'class_true': y_test.tolist(),
'class_pred': y_pred.tolist()}
)

# save dataframe
df.to_csv(predictions.path, sep=",", header=True, index=False)

🔢 STEP 3.6. Component — Let’s get the evaluation metrics!

Let’s calculate the evaluation metrics based on the predictions created above. As we are doing a classification model, we computed the F-score, recall, precision, and accuracy.

@component(
base_image=BASE_IMAGE, packages_to_install=[PANDAS, NUMPY]
)
def evaluation_metrics(
predictions: Input[Dataset],
metrics_names: list,
dict_keys: dict,
metrics: Output[ClassificationMetrics],
kpi: Output[Metrics],
eval_metrics: Output[Metrics]
) -> None:

"""
Create the evaluation metrics.
"""
import json
import logging
from importlib import import_module
import numpy as np
import pandas as pd

results = pd.read_csv(predictions.path)

# Encode the predictions model
results['class_true_clean'] = results[true_column_name].astype(str).map(dict_keys)
results['class_pred_clean'] = results[pred_column_name].astype(str).map(dict_keys)

# To fetch metrics from sklearn
module = import_module(f"sklearn.metrics")
metrics_dict = {}
for each_metric in metrics_names:
metric_func = getattr(module, each_metric)
if each_metric == 'f1_score':
metric_val = metric_func(results['class_true'], results['class_pred'], average=None)
else:
metric_val = metric_func(results['class_true'], results['class_pred'])

# Save metric name and value
metric_val = np.round(np.average(metric_val), 4)
metrics_dict[f"{each_metric}"] = metric_val
kpi.log_metric(f"{each_metric}", metric_val)

# dumping kpi metadata to generate the metrics kpi
with open(kpi.path, "w") as f:
json.dump(kpi.metadata, f)
logging.info(f"{each_metric}: {metric_val:.3f}")

# dumping metrics_dict to generate the metrics table
with open(eval_metrics.path, "w") as f:
json.dump(metrics_dict, f)

# to generate the confusion matrix plot
confusion_matrix_func = getattr(module, "confusion_matrix")
metrics.log_confusion_matrix(list(dict_keys.values()),
confusion_matrix_func(results['class_true_clean'], results['class_pred_clean']).tolist(),)

# dumping metrics metadata
with open(metrics.path, "w") as f:
json.dump(metrics.metadata, f)

Vertex AI allows us to visualise some metrics by taking advantage of ClassificationMetricsand MetricsArtifacts of the Kubeflow pipelines package.

Then use the method log_confusion_matrix() as presented in the code snippet above, to generate a confusion matrix like the one shown below.

On the other hand, to plot the scalar metrics you must use the log_metric()method. The output is as follows:

💻 STEP 4: Building the Kubeflow pipeline with all the components

Let’s get all the components together and build the full Kubeflow pipeline.

To compile our pipeline, we must apply the decorator @dsl.pipelineto a Python function. Here, we should specify the pipeline’s name (PIPELINE_NAME) and root path (PIPELINE_ROOT).

Then, you must initialise the pipeline function with all the needed parameters. To name the component, you should use the method set_display_name().

@dsl.pipeline(name=PIPELINE_NAME, pipeline_root=PIPELINE_ROOT)
def medium_pipeline(
project_id: str,
dataset_location: str,
dataset_id: str,
table_id: str,
col_label: str,
col_training: list):

QUERY = """SELECT * FROM `project-id.medium_article.iris_dataset`"""
METRICS_NAMES = ["accuracy_score", "f1_score"]

ingest = query_to_table(query=QUERY,
table_id=table_id,
project_id=project_id,
dataset_id=dataset_id,
location=dataset_location,
query_job_config=json.dumps(dict(write_disposition="WRITE_TRUNCATE"))
).set_display_name("Ingest Data")

# From big query store in GCS
ingested_dataset = (
extract_table_to_gcs(
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
location=dataset_location,
)
.after(ingest)
.set_display_name("Extract Big Query to GCS")
)

# Split data
spit_data = create_sets(data_input=ingested_dataset.outputs["dataset"],
col_label=col_label,
col_training=col_training
).set_display_name("Split data")

# Train model
training_model = train_model(
training_data=spit_data.outputs['dataset_train']).set_display_name("Train Model")

# Predit model
predict_data = predict_model(
test_data=spit_data.outputs['dataset_test'],
model=training_model.outputs["model"]
).set_display_name("Create Predictions")


# Evaluate model
eval_metrics = evaluation_metrics(
predictions=predict_data.outputs['predictions'],
dict_keys=spit_data.outputs['dict_keys'],
metrics_names=json.dumps(METRICS_NAMES),
).set_display_name("Evaluation Metrics")

🧮 STEP 5 — Define the variables

The next step is to compile and submit the pipeline created in the previous step. But first, you must define all the variables that are used on the components, as well as the environment ones.

The environment variables:

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

PROJECT_ID = #you have to fill in your project id
LOCATION = #you have to fill in the location of the data
PIPELINE_ROOT = #the location where the pipeline's artifacts are stored
SERVICE_ACCOUNT = #the service account to connect with your project
PIPELINE_NAME = "medium-article"
JOBID = f"training-pipeline-{TIMESTAMP}"
ENABLE_CACHING = False
TEMPLATE_PATH = "medium_pipeline.json"

Below you can inspect the component variables to fill in.

DATASET_ID = "medium_article"
TABLE_ID = "temp_table_medium"
COL_LABEL = "class"
COL_TRAINING=["sepal_length", "sepal_width", "petal_length", "petal_width"]


PIPELINE_PARAMS = {"project_id": PROJECT_ID,
"dataset_location": LOCATION,
"table_id": TABLE_ID,
"dataset_id": DATASET_ID,
"col_label": COL_LABEL,
"col_training": COL_TRAINING}

🎬 STEP 6: Compile the Pipeline

At this point, the pipeline is built. Hurrah! Now, we must compile it, and for that, you need to define the pipeline_func and package_path.

compiler.Compiler().compile(
pipeline_func=medium_pipeline,
package_path=TEMPLATE_PATH)

🚀 Finally, STEP 7: Let’s submit this pipeline!!

aiplatform.init(project=PROJECT_ID, location=LOCATION)
pipeline_ = aiplatform.pipeline_jobs.PipelineJob(
enable_caching=ENABLE_CACHING,
display_name=PIPELINE_NAME,
template_path=TEMPLATE_PATH,
job_id=JOBID,
parameter_values=PIPELINE_PARAMS)

pipeline_.submit(service_account=SERVICE_ACCOUNT)

After running the pipeline it will be visible from the Vertex AI console. To go there follow the next steps.

  1. Go to google cloud console and select the project that you are developing;
  2. Search for Vertex AI

3. Go to Pipelines and select the one you created. Next, you should be able to see your pipeline displayed as shown below.

🎉 Conclusion

This is the second article following our Medium collection about Vertex AI. In case you miss the first one go to Running Your First ML Model using GCP Vertex AI.

The current article introduces the topic of Machine Learning Operations and how critical it is to deploy Machine Learning models into production. Furthermore, we also present Vertex AI - service launched by Google to solve this crucial issue.
When you conclude this tutorial, you should be able to train your ML model using a Vertex AI pipeline. However, this is just the first step of an entire Machine Learning workflow, and there is still much to do. In the following articles, we will show how to deploy our models to an endpoint.
We hope you’ve enjoyed reading this article and please stay tuned for the next one 😉.

--

--

outsidenox

Cloud enthusiasts, we use it on daily basis. A team of Data Scientists and Machine Learning engineers.