Data validations on Vertex AI for metadata and content check

Penny Qian
6 min readMay 23, 2023
Photo by Ната Рута on Unsplash

Vertex AI is a unified artificial intelligence platform that offers all of Google’s cloud services under one roof. When we start to develop ML solutions on Vertex AI, one common issue is to ensure the quality of the data for downstream processing (analytics, visualizations, machine learning, etc).

In principle, the data validation should happen across the whole machine learning development cycle, from data ingestion to final predictions, as the figure is shown below. The scope of this article will be focusing on pre-training and post-prediction data validations.

Metadata validation

Medatada in simple words describe as data about data. Verifying metadata can be a lightweight way to sense if the data is in good shape with minimum effort. To verify the metadata of incoming data tables, we need several steps:

  • read the dependency tables metadata and convert it into an agreeable format
  • performs checks on the metadata of the table, which can simply be some pre-defined assertions
  • output the test results

Each of the steps can be described as a component such as a Kubeflow component, that together forms a Kubeflow pipeline @dsl.pipeline

In our case, our data is stored in GCP BigQuery, and we can simply fetch the metadata using the BigQuery client and perform the checks as below:

from google.cloud import bigquery

client = bigquery.Client(project=PROJECT_ID)

# define a lambda function to get the delta days
get_delta_day = lambda td: td.days

# perform general checks
for bq_table in bq_tables:
table = client.get_table(bq_table)
print(
"Got table '{}.{}.{}'.".format(
table.project, table.dataset_id, table.table_id
)
)
print("Table schema: {}".format(table.schema))
print("Table description: {}".format(table.description))
print("Table has {} rows".format(table.num_rows))

assert table.num_rows > 0, "Table has no rows"
assert (
get_delta_day(datetime.now(pytz.utc) - table.modified) < 30
), "Table is not updated in the last 30 days"

# Table specific checks
table1 = client.get_table("table_name")

# add example assert here
assert (
get_delta_day(datetime.now(pytz.utc) - table1.modified) < 3
), "Table is not updated in the last 3 days"

In Vertex AI each component can be linked using dependency logics. You can refer to our full pipeline example here which contains two steps.

Content validation

There are multiple choices regarding a tool for data validation. For example, the comparison of Pandera and Great Expectations the two popular Python libraries.

In this article, we chose Great Expectations (GE) to implement for the reasons below:

  • GE can serve for data science-specific expectations (e.g. validate the distribution range, specify expectations relevant to the business)
  • Provides a centralized and readable place for the test suites
  • Includes a nice dashboard for the test results summary
  • Can auto-generate expectations based on fed data

Before jumping into the implementation, Great Expectations has a few key terminology and concepts in the workflow:

  • Data Context: represents a GE project, is created by instantiating GE via the CLI and configured via the great_expectations.yml file
  • Datasources: a configuration that describes where the data is, how to connect to it, and which execution engine to use when running a Checkpoint.
  • Expectations: description of the expected behavior of data.
  • Batch Request: defines a batch of data to run Expectations on from a given data source.
  • Checkpoint config: defines which Expectation Suite to run against which data, and which actions to take during and after a run.

Implementation

The workflow of Great Expectations works as below: the data to be validated gets into GE, and through the validation the results go out

source: https://docs.greatexpectations.io/docs/

To integrate the above in Vertex AI, we need several steps, and we also use a GCS bucket to store the initiation folders together as the output results.

To start with the implementation, open your terminal and run these commands:

pip install great_expectations
great_expectations init
gsutil cp -r great_expectations gs://{your_gcs_bucket}

This will create an initiation folder of Great Expectations and copy it to GCS bucket. For the Vertex AI component, the files that are stored in GCS bucket can be accessed as the way for the filesystem. Because the GCS will be mounted as a filesystem automatically via Cloud Storage FUSE.

One thing to note here is that the root directory `/gcs` is not readable. If you run ls /gcs, you will get an “Input/output error”. However, it is okay to read the bucket root such as ls /gcs/example-bucket. So in our case, we can access the Great Expectation folder via the path of /gcs/{your_gcs_bucket}/great_expectations

After initiation, we can start adding the data source to the Great Expectations config. In our case the main data source is from the GCS bucket, so we need to specify it in the data connector part, as shown below:

context = gx.data_context.DataContext(
context_root_dir="/gcs/penny-trial-bucket/great_expectations"
)

datasource_yaml = rf"""
name: my_datasource
class_name: Datasource
execution_engine:
class_name: PandasExecutionEngine
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetGCSDataConnector
bucket_or_name: penny-trial-bucket
prefix: great_expectations_test_csvs
default_regex:
pattern: (.*)\.csv
group_names:
- data_asset_name
"""

context.add_datasource(**yaml.load(datasource_yaml))
# Test the connection
context.test_yaml_config(yaml_config=datasource_yaml)
logging.info("Data Source updated")

After that, we can define a batch request that can fetch a selection of records from the data source that can be used in validation.

batch_request = RuntimeBatchRequest(
datasource_name="my_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="", # this can be anything that identifies this data_asset for you
runtime_parameters={
"path": ""
}, # Add your GCS path here.
batch_identifiers={"default_identifier_name": "default_identifier"},
)

# rewrite so the format can be correct
batch_request.data_asset_name = "great_expectations_test_csvs/test-data1"
batch_request.runtime_parameters["path"] = f"gs://penny-trial-bucket/great_expectations_test_csvs/test-data1.csv"

and also create the test suite, this test suite is then written to the GE initiation folder in GCS bucket `penny-trial-bucket/great_expectations/expectations/test-suite.json`

suite = context.create_expectation_suite(
expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)

# Expect all `customer_id` values to not be null
validator.expect_column_values_to_not_be_null(column="seq")

# Expect all `unique_key` values to be unique
validator.expect_column_values_to_be_unique(column="name/first")

# # Expect `taxi_trip_in_seconds` values to be greater than 0
validator.expect_column_values_to_be_between(
column="age", min_value=0, max_value=150
)

# And save the final state to JSON
validator.save_expectation_suite(discard_failed_expectations=False)

Now the final step is to create a checkpoint to assign the test suite we created to the batch data we wanna validate, and then output the test results. The concept of the checkpoint is a bundle of batch requests, data sources, and validators as:

source: https://docs.greatexpectations.io/docs/terms/checkpoint/
# Define the checkpoint
yaml_config = f"""
name: my_checkpoint
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S"
validations:
- batch_request:
datasource_name: my_datasource
data_connector_name: default_inferred_data_connector_name
data_asset_name: great_expectations_test_csvs/test-data1
data_connector_query:
index: -1
expectation_suite_name: test_suite
"""

# Add the checkpoint to context
context.add_checkpoint(**yaml.load(yaml_config))

# Run checkpoint to validate the data
checkpoint_result = context.run_checkpoint(checkpoint_name="my_checkpoint")

# Validation results are rendered as HTML
document_model = ValidationResultsPageRenderer().render(
list(checkpoint_result.run_results.values())[0]["validation_result"]
)

# Write validation results as output HTML
with open("/gcs/penny-trial-bucket/output.html", "w") as writer:
writer.write(DefaultJinjaPageView().render(document_model))

The final results are then written to the GCS bucket. The full Vertex AI pipeline and code are here.

Conclusions

In this article, we went through the two general types of data validation metadata validation and content validation. For each type, we also provided an implementation in Vertex AI as part of the model training or prediction pipeline.

These implementations are good for experiments and can be further improved for team collaborations. For example, the test suites can be stored and version controlled so as to be shared across the team. Also, the components can be wrapped into containers so they can be imported into the existing pipelines more easily.

--

--

Penny Qian

I share my real-life MLOps work in this channel.