Step-by-Step Guide to Accelerate ML Jobs with Rapids, Dask, Vertex and GPUs

Jesus
9 min readJan 20, 2023

Follow me

Sometimes we need to work with large data samples, although there are different ways to handle it, I never found an article that explained the step by step. So I decided to write and illustrate how to train 4 million of rows of data with an open-source gradient boosted trees algorithm (xgboost).

To keep it simple, I took advantage of the products I use everyday;

Infrastructure:

  • BigQuery is a completely serverless datawarehouse solution that supports queries over petabytes of information, it has the principle of processing distribution where resources are divided in 3 technologies: colossus which is a gigantic global file system, jupiter that is a super nebular networking and dremel, the processing architecture that shuffles, aggregates and do functions for the queries.
  • dask is an open-source python library for parallel computing.
  • Vertex AI is a managed unified platform to execute machine learning algorithms in no time by leveraging resources like GPUs/TPUs.

Logic:

Generally speaking we feed the model with large dataset, version it and deploy it.

graphic description

The dataset: The dataset was determined from US Forest Service (USFS) and it is composed by cartographic variables, the target to predict is the cover type for a given observation in a forest. Data was previously prepared for the simplicity of this article, so the target is already in a categorical format (1,2,3,4,5,6).

Training: This is a supervised learning (you have the labels/target/outcomes), one of the methods with a lot of relevance these days is xgboost, which is an open-sources software library which provides a regularization gradient boosting framework, is easy to use and very accurate in some cases.

Inference: Once the model has been trained, we need to evaluate it and deploy it somewhere to start making predictions, I will use a managed service from Google called Vertex Endpoints.

Machine Learning Artifacts are stored during the process.

Let’s get started.

Find the Data and Create your Table

You can find the dataset here. Creating a table for BigQuery is out of the scope, but it is hell easy:

BigQuery > projects (jchavezar-demo) > Create Dataset > Create Table

Hit me for more information.

Set Constants and Import Variables

By setting the values at the beginning of the code, help us to create a sharable escenario where others can use their own values and re-run what we have done, also it’s important to avoid words repetation and have a clean code.

I will create a temporary folder with all our artifacts during this process (DIR = ‘xgboost_custom’).

PROJECT_ID = 'jchavezar-demo'
REGION = 'us-central1'
DIR = 'xgboost_custom'
BQ_TABLE_DIR = 'vertex_datasets_public.cover_type_4Mrows'
MODEL_URI = 'gs://vtx-models/xgboost/cover_type'
STAGING_URI = 'gs://vtx-staging/xgboost/cover_type'
TRAIN_IMAGE_URI = 'us-central1-docker.pkg.dev/jchavezar-demo/trainings/xgboost-dask-gpu:latest'
PREDICTION_IMAGE_URI = 'us-central1-docker.pkg.dev/jchavezar-demo/custom-predictions/xgboost-dask-cpu:latest'

Import Libraries

from google.cloud import aiplatform as aip

Create Temporary Folder ($DIR) Structure

!rm -fr $DIR
!mkdir $DIR
!mkdir $DIR/trainer

Create the Training Code

With large datasets we need many resources to handle the ml job, so during the training proces I will use an open source solution called Dask, Dask is amazing because it works in a layer that integrates physical resources or workers (CPU, Memory and GPU) with ml frameworks (xgboost), it fires up python libraries like numpy, pandas and Scikit-learn by distributing the job through workers, it is used throughout the PyData ecosystem and is included in many cool libraries like RAPIDS data pipelines or dataframes with GPUs.

Scale Python Code

You might be thinking… why so many frameworks?, well let’s say, they work pretty cool together. Vertex AI platform helps you avoid all the crappy engineering stuff like container cluster management. Have you thought how difficult is to maintain these days a container environment from security, networking and storage perspective?, so with this method we focus only on the python libraries in upper layers: xgboost, dask and rapids.

These are the parts of my code, but you can find the full code here.

def load_data(self):
'''Load data from BigQuery to Dask'''
_ = self.bq_table_dir.split('.')

ddf = dask_bigquery.read_gbq(
project_id='jchavezar-demo',
dataset_id=_[0],
table_id=_[1]
).dropna()

print(f"[INFO] ------ Splitting dataset")
df_train, df_eval = ddf.random_split([0.8, 0.2], random_state=123)
self.df_train_features = df_train.drop('Cover_Type', axis=1)
self.df_eval_features = df_eval.drop('Cover_Type', axis=1)
self.df_train_labels = df_train.pop('Cover_Type')
self.df_eval_labels = df_eval.pop('Cover_Type')

load_data: this function call the dask-bigquery library to query bigquery table, the interesting part is that dask will load this data into workers (cpu+memory, gpu+memory) so you can fire up the chips. Then we split and shuffle the data which is a good practice in ML to avoid bias.

    def model_train(self):
print("[INFO] ------ Creating dask cluster")
scheduler_ip = subprocess.check_output(['hostname','--all-ip-addresses'])
scheduler_ip = scheduler_ip.decode('UTF-8').split()[0]

with LocalCUDACluster(
ip=scheduler_ip,
n_workers=self.num_workers,
threads_per_worker=self.threads_per_worker
) as cluster:
with Client(cluster) as client:
print('[INFO]: ------ Calling main function ')

print("[INFO]: ------ Dataset for dask")
dtrain = dxgb.DaskDeviceQuantileDMatrix(client, self.df_train_features, self.df_train_labels)
dvalid = dxgb.DaskDeviceQuantileDMatrix(client, self.df_eval_features, self.df_eval_labels)

print("[INFO]: ------ Training...")
output = xgb.dask.train(
client,
{
"verbosity": 2,
"tree_method": "gpu_hist",
"objective": "multi:softprob",
"eval_metric": ["mlogloss"],
"learning_rate": 0.1,
"gamma": 0.9,
"subsample": 0.5,
"max_depth": 9,
"num_class": 8
},
dtrain,
num_boost_round=10,
evals=[(dvalid, "valid1")],
early_stopping_rounds=5
)
model = output["booster"]
best_model = model[: model.best_iteration]
print(f"[INFO] ------ Best model: {best_model}")
best_model.save_model("/tmp/model.json")
model_metrics = output["history"]["valid1"]
with open("/tmp/metadata.json", "w") as outfile:
json.dump(model_metrics, outfile)

model_train: This part bring machine learning to work with dask, by caling LocalCUDACluster we are telling the code to work on GPUs. Now the important part is that Dask is a distributed system so that’s why is called “LocalCluster” because at the end treat all the resources as bunch of workers, that’s why we have some additional lines of code like scheduler_ip which lets the code know the ip address of all it workers. (It’s pretty cool), btw while I was creating the snippets on my jupyterlab+radpids I noticed that you have an extension which is an access to monitor your workers:

The training algorithm has different parameters like:

three_method: in this case I wanted to train using gpus so I selected “gpu_hist”.

objective: in dt like neural networks we want to reduce the error, that’s called the crossentropy loss function, in this case we set the function based on softmax which is a way to shorten values in a probabilistic way:

eval_metric: is the metric we want to optimize; the crossentropy loss function described earlier.

I also used some optimization values to make training time shorter like early_stopping_rounds. The last part of the code is to get the best value accross all the iterations and save it as model.json to later upload it in Google Cloud Storage.

    def storage_artifacts(self):        
print('[INFO] ------ Storing Artifacts on Google Cloud Storage')
bucket = os.environ['AIP_MODEL_DIR'].split('/')[2]
blob_name = '/'.join(os.environ['AIP_MODEL_DIR'].split('/')[3:])
bucket ='vtx-models'
storage_client = storage.Client(project=self.project)
bucket = storage_client.bucket(bucket)

for i in ["model.json", "metadata.json"]:
blob = bucket.blob(f'{blob_name}{i}')
blob.upload_from_filename(f'/tmp/{i}')

storage_artifacts: The final step of the function is to get google cloud storage bucket address to save all the artifacts. I wrapped it all up in a python class function to keep the cleaness of my code. Full code here.

Create Image and Push it to Google Artifacts Repository

Creating Google Artifacts Repository is out of scope in this article but you can follow this instructions, it is just a couple of sdk instruction lines. Remember to autheticate your notebook thorugh the terminal with this command: gcloud auth configure-docker us-central1-docker.pkg.dev (chage the region).

%%writefile $DIR/Dockerfile
FROM rapidsai/rapidsai-nightly:22.12-cuda11.2-base-ubuntu20.04-py3.8

RUN pip install google-cloud-storage \
&& pip install gcsfs \
&& pip install pandas \
&& pip install dask-bigquery

COPY trainer trainer/

ENTRYPOINT ["python", "trainer/train.py"]
!docker build -t $TRAIN_IMAGE_URI $DIR/.
!docker push $TRAIN_IMAGE_URI

Create Vertex Training from Code [CustomJob]

Now that I have my container with the training code in google cloud repository ready, I can run it by using Vertex AI platform.

aip.init(
project=PROJECT_ID,
location=REGION)

num_gpus = 4

worker_pool_specs = [
{
"machine_spec": {
"machine_type": "n1-standard-4",
"accelerator_type": "NVIDIA_TESLA_V100",
"accelerator_count": num_gpus
},
"replica_count": "1",
"container_spec": {
"image_uri": TRAIN_IMAGE_URI,
"args": [
"--bq_table_dir", "vertex_datasets_public.cover_type_4Mrows",
"--num_workers", f"{num_gpus}",
"--threads_per_worker", "4"
]
}
},
]

job = aip.CustomJob(
display_name = '05cb-bqdask-xgboost-customjob',
worker_pool_specs = worker_pool_specs,
base_output_dir = MODEL_URI,
staging_bucket = STAGING_URI
)

model = job.run(
)

machine_spec: sets the size of the vm that we want to use for the training.

accelerator_type and count: this tells the job to use a gpu model and the quantity.

container_spec: this is to specify the container on top of the vm and pass arguments to the code, like the number of rowkers or the number of gpus.

aip.CustomJob: is for creating the ml job with all the worker pool specifications.

Trainig Loss Chart

Artifacts were saved during the training process, I will take them from Google Cloud Storage and plot the logistic or cross-entropy loss curve:

import json
import matplotlib.pyplot as plt
from google.cloud import storage
from google.cloud import storage

client = storage.Client()
bucket = client.get_bucket('vtx-models')
blob = bucket.blob('xgboost/cover_type/model/metadata.json')
blob.download_to_filename('metadata.json')

# Opening JSON file
with open('metadata.json') as json_file:
data = json.load(json_file)

plt.plot(data['mlogloss'])
plt.show()
mlogloss chart

Upload Model

The next step is to take the model into the Model Registry and keep the version. For that we first create the folder structure:

!rm -fr $DIR/prediction
!mkdir $DIR/prediction

And create code, I use Flask as a web server platform to expose the prediction service through an endpoint.

Prediction Code

%%writefile $DIR/prediction/app.py

import os
import logging
import pandas as pd
import xgboost as xgb
from flask import Flask, request, Response, jsonify
from google.cloud import storage

client = storage.Client(project=os.environ['PROJECT_ID'])

# Model Download from gcs

fname = "model.json"

with open(fname, "wb") as model:
client.download_blob_to_file(
f"{os.environ['AIP_STORAGE_URI']}/{fname}", model
)

# Loading model
print("[INFO] ------ Loading model from: {}".format(fname))
model = xgb.Booster(model_file=fname)

# Creation of the Flask app
app = Flask(__name__)

# Flask route for Liveness checks
@app.route(os.environ['AIP_HEALTH_ROUTE'])
def isalive():
status_code = Response(status=200)
return status_code

# Flask route for predictions
@app.route(os.environ['AIP_PREDICT_ROUTE'],methods=['GET','POST'])
def prediction():
_features = ['Id','Elevation', 'Aspect', 'Slope', 'Horizontal_Distance_To_Hydrology', 'Vertical_Distance_To_Hydrology', 'Horizontal_Distance_To_Roadways',
'Hillshade_9am', 'Hillshade_Noon', 'Hillshade_3pm','Horizontal_Distance_To_Fire_Points', 'Wilderness_Area1', 'Wilderness_Area2', 'Wilderness_Area3',
'Wilderness_Area4', 'Soil_Type1', 'Soil_Type2', 'Soil_Type3', 'Soil_Type4', 'Soil_Type5', 'Soil_Type6', 'Soil_Type7', 'Soil_Type8', 'Soil_Type9',
'Soil_Type10','Soil_Type11','Soil_Type12','Soil_Type13','Soil_Type14','Soil_Type15','Soil_Type16','Soil_Type17','Soil_Type18','Soil_Type19',
'Soil_Type20', 'Soil_Type21', 'Soil_Type22', 'Soil_Type23', 'Soil_Type24', 'Soil_Type25', 'Soil_Type26', 'Soil_Type27', 'Soil_Type28', 'Soil_Type29',
'Soil_Type30', 'Soil_Type31', 'Soil_Type32', 'Soil_Type33', 'Soil_Type34', 'Soil_Type35', 'Soil_Type36', 'Soil_Type37', 'Soil_Type38', 'Soil_Type39', 'Soil_Type40']
data = request.get_json(silent=True, force=True)
dmf = xgb.DMatrix(pd.DataFrame(data["instances"], columns=_features))
response = pd.DataFrame(model.predict(dmf))
logging.info(f"Response: {response}")
return jsonify({"Cover Type": str(response.idxmax(axis=1)[0])})

if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=8080)

Create Files, Dockerfiles and Push it to Registry

requirements.txt

%%writefile $DIR/prediction/requirements.txt

google-cloud-storage
numpy
pandas
flask
xgboost

Dockerfile

%%writefile $DIR/prediction/Dockerfile

FROM python:3.7-buster

RUN mkdir my-model

COPY app.py ./app.py
COPY requirements.txt ./requirements.txt
RUN pip install -r requirements.txt

# Flask Env Variable
ENV FLASK_APP=app

# Expose port 8080
EXPOSE 8080

CMD flask run --host=0.0.0.0 --port=8080

I use google cloud build, which is a platform to create and push code, we usually use it as a CI/CD tool.

!gcloud builds submit --tag $PREDICTION_IMAGE_URI $DIR/prediction/. --timeout 3000

Once the webserver code is built, I can upload the model into Model Registry:

model = aip.Model.upload(
display_name = '05-cb-cover_type',
serving_container_image_uri = PREDICTION_IMAGE_URI,
artifact_uri = f'{MODEL_URI}/model',
serving_container_environment_variables = {
'PROJECT_ID': PROJECT_ID,
}
)

The next and final step is to deploy the model using Vertex Endpoints AI:

endpoint = model.deploy(
deployed_model_display_name = '05-cb-cover_ep_dep',
traffic_percentage = 100,
machine_type = 'n1-standard-4',
min_replica_count = 1,
max_replica_count = 1,
)

Now we can test it by calling the API directly or through the console:

Vertex AI > Endpoints > Model > Deploy and Test > JSON request

For the prediction I didn’t use GPU since is the data is tabular and the sample is 1 row only.

I hope this will help, let me know your thoughts.

Until the next one.

Full code.

--

--