Validating historical features with Great Expectations
In this tutorial, we will use the public dataset of Chicago taxi trips to present data validation capabilities of Feast.
The original dataset is stored in BigQuery and consists of raw data for each taxi trip (one row per trip) since 2013.
We will generate several training datasets (aka historical features in Feast) for different periods and evaluate expectations made on one dataset against another.
Types of features we're ingesting and generating:
Features that aggregate raw data with daily intervals (eg, trips per day, average fare or speed for a specific day, etc.).
Features using SQL while pulling data from BigQuery (like total trips time or total miles travelled).
Features calculated on the fly when requested using Feast's on-demand transformations
Our plan:
Prepare environment
Pull data from BigQuery (optional)
Declare & apply features and feature views in Feast
Generate reference dataset
Develop & test profiler function
Run validation on different dataset using reference dataset & profiler
The original notebook and datasets for this tutorial can be found on GitHub.
0. Setup
Install Feast Python SDK and great expectations:
!pip install 'feast[ge]'
1. Dataset preparation (Optional)
You can skip this step if you don't have GCP account. Please use parquet files that are coming with this tutorial instead
Running some basic aggregations while pulling data from BigQuery. Grouping by taxi_id and day:
data_query ="""SELECT taxi_id, TIMESTAMP_TRUNC(trip_start_timestamp, DAY) as day, SUM(trip_miles) as total_miles_travelled, SUM(trip_seconds) as total_trip_seconds, SUM(fare) as total_earned, COUNT(*) as trip_countFROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`WHERE trip_miles > 0 AND trip_seconds > 60 AND trip_start_timestamp BETWEEN '2019-01-01' and '2020-12-31' AND trip_total < 1000GROUP BY taxi_id, TIMESTAMP_TRUNC(trip_start_timestamp, DAY)"""
driver_stats_table = bq_client.query(data_query).to_arrow()# Storing resulting dataset into parquet filepyarrow.parquet.write_table(driver_stats_table, "trips_stats.parquet")
defentities_query(year):return f"""SELECT distinct taxi_idFROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`WHERE trip_miles > 0 AND trip_seconds > 0 AND trip_start_timestamp BETWEEN '{year}-01-01' and '{year}-12-31'"""
batch_source =FileSource( timestamp_field="day", path="trips_stats.parquet", # using parquet file that we created on previous step file_format=ParquetFormat())
Dataset profiler is a function that accepts dataset and generates set of its characteristics. This charasteristics will be then used to evaluate (validate) next datasets.
Important: datasets are not compared to each other! Feast use a reference dataset and a profiler function to generate a reference profile. This profile will be then used during validation of the tested dataset.
Feast uses Great Expectations as a validation engine and ExpectationSuite as a dataset's profile. Hence, we need to develop a function that will generate ExpectationSuite. This function will receive instance of PandasDataset (wrapper around pandas.DataFrame) so we can utilize both Pandas DataFrame API and some helper functions from PandasDataset during profiling.
DELTA =0.1# controlling allowed window in fraction of the value on scale [0, 1]@ge_profilerdefstats_profiler(ds: PandasDataset) -> ExpectationSuite:# simple checks on data consistency ds.expect_column_values_to_be_between("avg_speed", min_value=0, max_value=60, mostly=0.99# allow some outliers ) ds.expect_column_values_to_be_between("total_miles_travelled", min_value=0, max_value=500, mostly=0.99# allow some outliers )# expectation of means based on observed values observed_mean = ds.trip_count.mean() ds.expect_column_mean_to_be_between("trip_count", min_value=observed_mean * (1- DELTA), max_value=observed_mean * (1+ DELTA)) observed_mean = ds.earned_per_hour.mean() ds.expect_column_mean_to_be_between("earned_per_hour", min_value=observed_mean * (1- DELTA), max_value=observed_mean * (1+ DELTA))# expectation of quantiles qs = [0.5,0.75,0.9,0.95] observed_quantiles = ds.avg_fare.quantile(qs) ds.expect_column_quantile_values_to_be_between("avg_fare", quantile_ranges={"quantiles": qs,"value_ranges": [[None, max_value] for max_value in observed_quantiles] })return ds.get_expectation_suite()
Verify that all expectations that we coded in our profiler are present here. Otherwise (if you can't find some expectations) it means that it failed to pass on the reference dataset (do it silently is default behavior of Great Expectations).
Now we can create validation reference from dataset and profiler function: