Search…
Validating historical features with Great Expectations
In this tutorial, we will use the public dataset of Chicago taxi trips to present data validation capabilities of Feast.
  • The original dataset is stored in BigQuery and consists of raw data for each taxi trip (one row per trip) since 2013.
  • We will generate several training datasets (aka historical features in Feast) for different periods and evaluate expectations made on one dataset against another.
Types of features we're ingesting and generating:
  • Features that aggregate raw data with daily intervals (eg, trips per day, average fare or speed for a specific day, etc.).
  • Features using SQL while pulling data from BigQuery (like total trips time or total miles travelled).
  • Features calculated on the fly when requested using Feast's on-demand transformations
Our plan:
  1. 1.
    Prepare environment
  2. 2.
    Pull data from BigQuery (optional)
  3. 3.
    Declare & apply features and feature views in Feast
  4. 4.
    Generate reference dataset
  5. 5.
    Develop & test profiler function
  6. 6.
    Run validation on different dataset using reference dataset & profiler
The original notebook and datasets for this tutorial can be found on GitHub.

0. Setup

Install Feast Python SDK and great expectations:
1
!pip install 'feast[ge]'
Copied!

1. Dataset preparation (Optional)

You can skip this step if you don't have GCP account. Please use parquet files that are coming with this tutorial instead
1
!pip install google-cloud-bigquery
Copied!
1
import pyarrow.parquet
2
3
from google.cloud.bigquery import Client
Copied!
1
bq_client = Client(project='kf-feast')
Copied!
Running some basic aggregations while pulling data from BigQuery. Grouping by taxi_id and day:
1
data_query = """SELECT
2
taxi_id,
3
TIMESTAMP_TRUNC(trip_start_timestamp, DAY) as day,
4
SUM(trip_miles) as total_miles_travelled,
5
SUM(trip_seconds) as total_trip_seconds,
6
SUM(fare) as total_earned,
7
COUNT(*) as trip_count
8
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
9
WHERE
10
trip_miles > 0 AND trip_seconds > 60 AND
11
trip_start_timestamp BETWEEN '2019-01-01' and '2020-12-31' AND
12
trip_total < 1000
13
GROUP BY taxi_id, TIMESTAMP_TRUNC(trip_start_timestamp, DAY)"""
Copied!
1
driver_stats_table = bq_client.query(data_query).to_arrow()
2
3
# Storing resulting dataset into parquet file
4
pyarrow.parquet.write_table(driver_stats_table, "trips_stats.parquet")
Copied!
1
def entities_query(year):
2
return f"""SELECT
3
distinct taxi_id
4
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
5
WHERE
6
trip_miles > 0 AND trip_seconds > 0 AND
7
trip_start_timestamp BETWEEN '{year}-01-01' and '{year}-12-31'
8
"""
Copied!
1
entities_2019_table = bq_client.query(entities_query(2019)).to_arrow()
2
3
# Storing entities (taxi ids) into parquet file
4
pyarrow.parquet.write_table(entities_2019_table, "entities.parquet")
Copied!

2. Declaring features

1
import pyarrow.parquet
2
import pandas as pd
3
4
from feast import FeatureView, Entity, FeatureStore, Field, BatchFeatureView
5
from feast.types import Float64, Int64
6
from feast.value_type import ValueType
7
from feast.data_format import ParquetFormat
8
from feast.on_demand_feature_view import on_demand_feature_view
9
from feast.infra.offline_stores.file_source import FileSource
10
from feast.infra.offline_stores.file import SavedDatasetFileStorage
11
from datetime import timedelta
Copied!
1
batch_source = FileSource(
2
timestamp_field="day",
3
path="trips_stats.parquet", # using parquet file that we created on previous step
4
file_format=ParquetFormat()
5
)
Copied!
1
taxi_entity = Entity(name='taxi', join_keys=['taxi_id'])
Copied!
1
trips_stats_fv = BatchFeatureView(
2
name='trip_stats',
3
entities=['taxi'],
4
features=[
5
Field(name="total_miles_travelled", dtype=Float64),
6
Field(name="total_trip_seconds", dtype=Float64),
7
Field(name="total_earned", dtype=Float64),
8
Field(name="trip_count", dtype=Int64),
9
10
],
11
ttl=timedelta(seconds=86400),
12
source=batch_source,
13
)
Copied!
Read more about feature views in Feast docs
1
@on_demand_feature_view(
2
schema=[
3
Field("avg_fare", Float64),
4
Field("avg_speed", Float64),
5
Field("avg_trip_seconds", Float64),
6
Field("earned_per_hour", Float64),
7
],
8
sources=[
9
trips_stats_fv,
10
]
11
)
12
def on_demand_stats(inp):
13
out = pd.DataFrame()
14
out["avg_fare"] = inp["total_earned"] / inp["trip_count"]
15
out["avg_speed"] = 3600 * inp["total_miles_travelled"] / inp["total_trip_seconds"]
16
out["avg_trip_seconds"] = inp["total_trip_seconds"] / inp["trip_count"]
17
out["earned_per_hour"] = 3600 * inp["total_earned"] / inp["total_trip_seconds"]
18
return out
Copied!
Read more about on demand feature views here
1
store = FeatureStore(".") # using feature_store.yaml that stored in the same directory
Copied!
1
store.apply([taxi_entity, trips_stats_fv, on_demand_stats]) # writing to the registry
Copied!

3. Generating training (reference) dataset

1
taxi_ids = pyarrow.parquet.read_table("entities.parquet").to_pandas()
Copied!
Generating range of timestamps with daily frequency:
1
timestamps = pd.DataFrame()
2
timestamps["event_timestamp"] = pd.date_range("2019-06-01", "2019-07-01", freq='D')
Copied!
Cross merge (aka relation multiplication) produces entity dataframe with each taxi_id repeated for each timestamp:
1
entity_df = pd.merge(taxi_ids, timestamps, how='cross')
2
entity_df
Copied!
Text
taxi_id
event_timestamp
0
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
2019-06-01
1
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
2019-06-02
2
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
2019-06-03
3
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
2019-06-04
4
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
2019-06-05
...
...
...
156979
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
2019-06-27
156980
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
2019-06-28
156981
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
2019-06-29
156982
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
2019-06-30
156983
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
2019-07-01
156984 rows × 2 columns
Retrieving historical features for resulting entity dataframe and persisting output as a saved dataset:
1
job = store.get_historical_features(
2
entity_df=entity_df,
3
features=[
4
"trip_stats:total_miles_travelled",
5
"trip_stats:total_trip_seconds",
6
"trip_stats:total_earned",
7
"trip_stats:trip_count",
8
"on_demand_stats:avg_fare",
9
"on_demand_stats:avg_trip_seconds",
10
"on_demand_stats:avg_speed",
11
"on_demand_stats:earned_per_hour",
12
]
13
)
14
15
store.create_saved_dataset(
16
from_=job,
17
name='my_training_ds',
18
storage=SavedDatasetFileStorage(path='my_training_ds.parquet')
19
)
Copied!
1
<SavedDataset(name = my_training_ds, features = ['trip_stats:total_miles_travelled', 'trip_stats:total_trip_seconds', 'trip_stats:total_earned', 'trip_stats:trip_count', 'on_demand_stats:avg_fare', 'on_demand_stats:avg_trip_seconds', 'on_demand_stats:avg_speed', 'on_demand_stats:earned_per_hour'], join_keys = ['taxi_id'], storage = <feast.infra.offline_stores.file_source.SavedDatasetFileStorage object at 0x1276e7950>, full_feature_names = False, tags = {}, _retrieval_job = <feast.infra.offline_stores.file.FileRetrievalJob object at 0x12716fed0>, min_event_timestamp = 2019-06-01 00:00:00, max_event_timestamp = 2019-07-01 00:00:00)>
Copied!

4. Developing dataset profiler

Dataset profiler is a function that accepts dataset and generates set of its characteristics. This charasteristics will be then used to evaluate (validate) next datasets.
Important: datasets are not compared to each other! Feast use a reference dataset and a profiler function to generate a reference profile. This profile will be then used during validation of the tested dataset.
1
import numpy as np
2
3
from feast.dqm.profilers.ge_profiler import ge_profiler
4
5
from great_expectations.core.expectation_suite import ExpectationSuite
6
from great_expectations.dataset import PandasDataset
Copied!
Loading saved dataset first and exploring the data:
1
ds = store.get_saved_dataset('my_training_ds')
2
ds.to_df()
Copied!
Text
total_earned
avg_trip_seconds
taxi_id
total_miles_travelled
trip_count
earned_per_hour
event_timestamp
total_trip_seconds
avg_fare
avg_speed
0
68.25
2270.000000
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
24.70
2.0
54.118943
2019-06-01 00:00:00+00:00
4540.0
34.125000
19.585903
1
221.00
560.500000
7a4a6162eaf27805aef407d25d5cb21fe779cd962922cb...
54.18
24.0
59.143622
2019-06-01 00:00:00+00:00
13452.0
9.208333
14.499554
2
160.50
1010.769231
f4c9d05b215d7cbd08eca76252dae51cdb7aca9651d4ef...
41.30
13.0
43.972603
2019-06-01 00:00:00+00:00
13140.0
12.346154
11.315068
3
183.75
697.550000
c1f533318f8480a59173a9728ea0248c0d3eb187f4b897...
37.30
20.0
47.415956
2019-06-01 00:00:00+00:00
13951.0
9.187500
9.625116
4
217.75
1054.076923
455b6b5cae6ca5a17cddd251485f2266d13d6a2c92f07c...
69.69
13.0
57.206451
2019-06-01 00:00:00+00:00
13703.0
16.750000
18.308692
...
...
...
...
...
...
...
...
...
...
...
156979
38.00
1980.000000
0cccf0ec1f46d1e0beefcfdeaf5188d67e170cdff92618...
14.90
1.0
69.090909
2019-07-01 00:00:00+00:00
1980.0
38.000000
27.090909
156980
135.00
551.250000
beefd3462e3f5a8e854942a2796876f6db73ebbd25b435...
28.40
16.0
55.102041
2019-07-01 00:00:00+00:00
8820.0
8.437500
11.591837
156981
NaN
NaN
9a3c52aa112f46cf0d129fafbd42051b0fb9b0ff8dcb0e...
NaN
NaN
NaN
2019-07-01 00:00:00+00:00
NaN
NaN
NaN
156982
63.00
815.000000
08308c31cd99f495dea73ca276d19a6258d7b4c9c88e43...
19.96
4.0
69.570552
2019-07-01 00:00:00+00:00
3260.0
15.750000
22.041718
156983
NaN
NaN
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
NaN
NaN
NaN
2019-07-01 00:00:00+00:00
NaN
NaN
NaN
156984 rows × 10 columns
Feast uses Great Expectations as a validation engine and ExpectationSuite as a dataset's profile. Hence, we need to develop a function that will generate ExpectationSuite. This function will receive instance of PandasDataset (wrapper around pandas.DataFrame) so we can utilize both Pandas DataFrame API and some helper functions from PandasDataset during profiling.
1
DELTA = 0.1 # controlling allowed window in fraction of the value on scale [0, 1]
2
3
@ge_profiler
4
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:
5
# simple checks on data consistency
6
ds.expect_column_values_to_be_between(
7
"avg_speed",
8
min_value=0,
9
max_value=60,
10
mostly=0.99 # allow some outliers
11
)
12
13
ds.expect_column_values_to_be_between(
14
"total_miles_travelled",
15
min_value=0,
16
max_value=500,
17
mostly=0.99 # allow some outliers
18
)
19
20
# expectation of means based on observed values
21
observed_mean = ds.trip_count.mean()
22
ds.expect_column_mean_to_be_between("trip_count",
23
min_value=observed_mean * (1 - DELTA),
24
max_value=observed_mean * (1 + DELTA))
25
26
observed_mean = ds.earned_per_hour.mean()
27
ds.expect_column_mean_to_be_between("earned_per_hour",
28
min_value=observed_mean * (1 - DELTA),
29
max_value=observed_mean * (1 + DELTA))
30
31
32
# expectation of quantiles
33
qs = [0.5, 0.75, 0.9, 0.95]
34
observed_quantiles = ds.avg_fare.quantile(qs)
35
36
ds.expect_column_quantile_values_to_be_between(
37
"avg_fare",
38
quantile_ranges={
39
"quantiles": qs,
40
"value_ranges": [[None, max_value] for max_value in observed_quantiles]
41
})
42
43
return ds.get_expectation_suite()
Copied!
Testing our profiler function:
1
ds.get_profile(profiler=stats_profiler)
Copied!
1
02/02/2022 02:43:47 PM INFO: 5 expectation(s) included in expectation_suite. result_format settings filtered.
2
<GEProfile with expectations: [
3
{
4
"expectation_type": "expect_column_values_to_be_between",
5
"kwargs": {
6
"column": "avg_speed",
7
"min_value": 0,
8
"max_value": 60,
9
"mostly": 0.99
10
},
11
"meta": {}
12
},
13
{
14
"expectation_type": "expect_column_values_to_be_between",
15
"kwargs": {
16
"column": "total_miles_travelled",
17
"min_value": 0,
18
"max_value": 500,
19
"mostly": 0.99
20
},
21
"meta": {}
22
},
23
{
24
"expectation_type": "expect_column_mean_to_be_between",
25
"kwargs": {
26
"column": "trip_count",
27
"min_value": 10.387244591346153,
28
"max_value": 12.695521167200855
29
},
30
"meta": {}
31
},
32
{
33
"expectation_type": "expect_column_mean_to_be_between",
34
"kwargs": {
35
"column": "earned_per_hour",
36
"min_value": 52.320624975640214,
37
"max_value": 63.94743052578249
38
},
39
"meta": {}
40
},
41
{
42
"expectation_type": "expect_column_quantile_values_to_be_between",
43
"kwargs": {
44
"column": "avg_fare",
45
"quantile_ranges": {
46
"quantiles": [
47
0.5,
48
0.75,
49
0.9,
50
0.95
51
],
52
"value_ranges": [
53
[
54
null,
55
16.4
56
],
57
[
58
null,
59
26.229166666666668
60
],
61
[
62
null,
63
36.4375
64
],
65
[
66
null,
67
42.0
68
]
69
]
70
}
71
},
72
"meta": {}
73
}
74
]>
Copied!
Verify that all expectations that we coded in our profiler are present here. Otherwise (if you can't find some expectations) it means that it failed to pass on the reference dataset (do it silently is default behavior of Great Expectations).
Now we can create validation reference from dataset and profiler function:
1
validation_reference = ds.as_reference(profiler=stats_profiler)
Copied!
and test it against our existing retrieval job
1
_ = job.to_df(validation_reference=validation_reference)
Copied!
1
02/02/2022 02:43:52 PM INFO: 5 expectation(s) included in expectation_suite. result_format settings filtered.
2
02/02/2022 02:43:53 PM INFO: Validating data_asset_name None with expectation_suite_name default
Copied!
Validation successfully passed as no exception were raised.

5. Validating new historical retrieval

Creating new timestamps for Dec 2020:
1
from feast.dqm.errors import ValidationFailed
Copied!
1
timestamps = pd.DataFrame()
2
timestamps["event_timestamp"] = pd.date_range("2020-12-01", "2020-12-07", freq='D')
Copied!
1
entity_df = pd.merge(taxi_ids, timestamps, how='cross')
2
entity_df
Copied!
Text
taxi_id
event_timestamp
0
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
2020-12-01
1
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
2020-12-02
2
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
2020-12-03
3
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
2020-12-04
4
91d5288487e87c5917b813ba6f75ab1c3a9749af906a2d...
2020-12-05
...
...
...
35443
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
2020-12-03
35444
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
2020-12-04
35445
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
2020-12-05
35446
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
2020-12-06
35447
7ebf27414a0c7b128e7925e1da56d51a8b81484f7630cf...
2020-12-07
35448 rows × 2 columns
1
job = store.get_historical_features(
2
entity_df=entity_df,
3
features=[
4
"trip_stats:total_miles_travelled",
5
"trip_stats:total_trip_seconds",
6
"trip_stats:total_earned",
7
"trip_stats:trip_count",
8
"on_demand_stats:avg_fare",
9
"on_demand_stats:avg_trip_seconds",
10
"on_demand_stats:avg_speed",
11
"on_demand_stats:earned_per_hour",
12
]
13
)
Copied!
Execute retrieval job with validation reference:
1
try:
2
df = job.to_df(validation_reference=validation_reference)
3
except ValidationFailed as exc:
4
print(exc.validation_report)
Copied!
1
02/02/2022 02:43:58 PM INFO: 5 expectation(s) included in expectation_suite. result_format settings filtered.
2
02/02/2022 02:43:59 PM INFO: Validating data_asset_name None with expectation_suite_name default
3
4
[
5
{
6
"expectation_config": {
7
"expectation_type": "expect_column_mean_to_be_between",
8
"kwargs": {
9
"column": "trip_count",
10
"min_value": 10.387244591346153,
11
"max_value": 12.695521167200855,
12
"result_format": "COMPLETE"
13
},
14
"meta": {}
15
},
16
"meta": {},
17
"result": {
18
"observed_value": 6.692920555429092,
19
"element_count": 35448,
20
"missing_count": 31055,
21
"missing_percent": 87.6071992778154
22
},
23
"exception_info": {
24
"raised_exception": false,
25
"exception_message": null,
26
"exception_traceback": null
27
},
28
"success": false
29
},
30
{
31
"expectation_config": {
32
"expectation_type": "expect_column_mean_to_be_between",
33
"kwargs": {
34
"column": "earned_per_hour",
35
"min_value": 52.320624975640214,
36
"max_value": 63.94743052578249,
37
"result_format": "COMPLETE"
38
},
39
"meta": {}
40
},
41
"meta": {},
42
"result": {
43
"observed_value": 68.99268345164135,
44
"element_count": 35448,
45
"missing_count": 31055,
46
"missing_percent": 87.6071992778154
47
},
48
"exception_info": {
49
"raised_exception": false,
50
"exception_message": null,
51
"exception_traceback": null
52
},
53
"success": false
54
},
55
{
56
"expectation_config": {
57
"expectation_type": "expect_column_quantile_values_to_be_between",
58
"kwargs": {
59
"column": "avg_fare",
60
"quantile_ranges": {
61
"quantiles": [
62
0.5,
63
0.75,
64
0.9,
65
0.95
66
],
67
"value_ranges": [
68
[
69
null,
70
16.4
71
],
72
[
73
null,
74
26.229166666666668
75
],
76
[
77
null,
78
36.4375
79
],
80
[
81
null,
82
42.0
83
]
84
]
85
},
86
"result_format": "COMPLETE"
87
},
88
"meta": {}
89
},
90
"meta": {},
91
"result": {
92
"observed_value": {
93
"quantiles": [
94
0.5,
95
0.75,
96
0.9,
97
0.95
98
],
99
"values": [
100
19.5,
101
28.1,
102
38.0,
103
44.125
104
]
105
},
106
"element_count": 35448,
107
"missing_count": 31055,
108
"missing_percent": 87.6071992778154,
109
"details": {
110
"success_details": [
111
false,
112
false,
113
false,
114
false
115
]
116
}
117
},
118
"exception_info": {
119
"raised_exception": false,
120
"exception_message": null,
121
"exception_traceback": null
122
},
123
"success": false
124
}
125
]
Copied!
Validation failed since several expectations didn't pass:
  • Trip count (mean) decreased more than 10% (which is expected when comparing Dec 2020 vs June 2019)
  • Average Fare increased - all quantiles are higher than expected
  • Earn per hour (mean) increased more than 10% (most probably due to increased fare)