!pip install 'feast[ge]'!pip install google-cloud-bigqueryimport pyarrow.parquet
from google.cloud.bigquery import Clientbq_client = Client(project='kf-feast')data_query = """SELECT
taxi_id,
TIMESTAMP_TRUNC(trip_start_timestamp, DAY) as day,
SUM(trip_miles) as total_miles_travelled,
SUM(trip_seconds) as total_trip_seconds,
SUM(fare) as total_earned,
COUNT(*) as trip_count
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE
trip_miles > 0 AND trip_seconds > 60 AND
trip_start_timestamp BETWEEN '2019-01-01' and '2020-12-31' AND
trip_total < 1000
GROUP BY taxi_id, TIMESTAMP_TRUNC(trip_start_timestamp, DAY)"""driver_stats_table = bq_client.query(data_query).to_arrow()
# Storing resulting dataset into parquet file
pyarrow.parquet.write_table(driver_stats_table, "trips_stats.parquet")def entities_query(year):
return f"""SELECT
distinct taxi_id
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE
trip_miles > 0 AND trip_seconds > 0 AND
trip_start_timestamp BETWEEN '{year}-01-01' and '{year}-12-31'
"""entities_2019_table = bq_client.query(entities_query(2019)).to_arrow()
# Storing entities (taxi ids) into parquet file
pyarrow.parquet.write_table(entities_2019_table, "entities.parquet")import pyarrow.parquet
import pandas as pd
from feast import FeatureView, Entity, FeatureStore, Field, BatchFeatureView
from feast.types import Float64, Int64
from feast.value_type import ValueType
from feast.data_format import ParquetFormat
from feast.on_demand_feature_view import on_demand_feature_view
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.file import SavedDatasetFileStorage
from datetime import timedelta
batch_source = FileSource(
timestamp_field="day",
path="trips_stats.parquet", # using parquet file that we created on previous step
file_format=ParquetFormat()
)taxi_entity = Entity(name='taxi', join_keys=['taxi_id'])trips_stats_fv = BatchFeatureView(
name='trip_stats',
entities=['taxi'],
features=[
Field(name="total_miles_travelled", dtype=Float64),
Field(name="total_trip_seconds", dtype=Float64),
Field(name="total_earned", dtype=Float64),
Field(name="trip_count", dtype=Int64),
],
ttl=timedelta(seconds=86400),
source=batch_source,
)@on_demand_feature_view(
schema=[
Field("avg_fare", Float64),
Field("avg_speed", Float64),
Field("avg_trip_seconds", Float64),
Field("earned_per_hour", Float64),
],
sources=[
trips_stats_fv,
]
)
def on_demand_stats(inp):
out = pd.DataFrame()
out["avg_fare"] = inp["total_earned"] / inp["trip_count"]
out["avg_speed"] = 3600 * inp["total_miles_travelled"] / inp["total_trip_seconds"]
out["avg_trip_seconds"] = inp["total_trip_seconds"] / inp["trip_count"]
out["earned_per_hour"] = 3600 * inp["total_earned"] / inp["total_trip_seconds"]
return outstore = FeatureStore(".") # using feature_store.yaml that stored in the same directorystore.apply([taxi_entity, trips_stats_fv, on_demand_stats]) # writing to the registrytaxi_ids = pyarrow.parquet.read_table("entities.parquet").to_pandas()timestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2019-06-01", "2019-07-01", freq='D')entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_dfjob = store.get_historical_features(
entity_df=entity_df,
features=[
"trip_stats:total_miles_travelled",
"trip_stats:total_trip_seconds",
"trip_stats:total_earned",
"trip_stats:trip_count",
"on_demand_stats:avg_fare",
"on_demand_stats:avg_trip_seconds",
"on_demand_stats:avg_speed",
"on_demand_stats:earned_per_hour",
]
)
store.create_saved_dataset(
from_=job,
name='my_training_ds',
storage=SavedDatasetFileStorage(path='my_training_ds.parquet')
)<SavedDataset(name = my_training_ds, features = ['trip_stats:total_miles_travelled', 'trip_stats:total_trip_seconds', 'trip_stats:total_earned', 'trip_stats:trip_count', 'on_demand_stats:avg_fare', 'on_demand_stats:avg_trip_seconds', 'on_demand_stats:avg_speed', 'on_demand_stats:earned_per_hour'], join_keys = ['taxi_id'], storage = <feast.infra.offline_stores.file_source.SavedDatasetFileStorage object at 0x1276e7950>, full_feature_names = False, tags = {}, _retrieval_job = <feast.infra.offline_stores.file.FileRetrievalJob object at 0x12716fed0>, min_event_timestamp = 2019-06-01 00:00:00, max_event_timestamp = 2019-07-01 00:00:00)>import numpy as np
from feast.dqm.profilers.ge_profiler import ge_profiler
from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset import PandasDatasetds = store.get_saved_dataset('my_training_ds')
ds.to_df()DELTA = 0.1 # controlling allowed window in fraction of the value on scale [0, 1]
@ge_profiler
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:
# simple checks on data consistency
ds.expect_column_values_to_be_between(
"avg_speed",
min_value=0,
max_value=60,
mostly=0.99 # allow some outliers
)
ds.expect_column_values_to_be_between(
"total_miles_travelled",
min_value=0,
max_value=500,
mostly=0.99 # allow some outliers
)
# expectation of means based on observed values
observed_mean = ds.trip_count.mean()
ds.expect_column_mean_to_be_between("trip_count",
min_value=observed_mean * (1 - DELTA),
max_value=observed_mean * (1 + DELTA))
observed_mean = ds.earned_per_hour.mean()
ds.expect_column_mean_to_be_between("earned_per_hour",
min_value=observed_mean * (1 - DELTA),
max_value=observed_mean * (1 + DELTA))
# expectation of quantiles
qs = [0.5, 0.75, 0.9, 0.95]
observed_quantiles = ds.avg_fare.quantile(qs)
ds.expect_column_quantile_values_to_be_between(
"avg_fare",
quantile_ranges={
"quantiles": qs,
"value_ranges": [[None, max_value] for max_value in observed_quantiles]
})
return ds.get_expectation_suite()ds.get_profile(profiler=stats_profiler)02/02/2022 02:43:47 PM INFO: 5 expectation(s) included in expectation_suite. result_format settings filtered.
<GEProfile with expectations: [
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "avg_speed",
"min_value": 0,
"max_value": 60,
"mostly": 0.99
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "total_miles_travelled",
"min_value": 0,
"max_value": 500,
"mostly": 0.99
},
"meta": {}
},
{
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "trip_count",
"min_value": 10.387244591346153,
"max_value": 12.695521167200855
},
"meta": {}
},
{
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "earned_per_hour",
"min_value": 52.320624975640214,
"max_value": 63.94743052578249
},
"meta": {}
},
{
"expectation_type": "expect_column_quantile_values_to_be_between",
"kwargs": {
"column": "avg_fare",
"quantile_ranges": {
"quantiles": [
0.5,
0.75,
0.9,
0.95
],
"value_ranges": [
[
null,
16.4
],
[
null,
26.229166666666668
],
[
null,
36.4375
],
[
null,
42.0
]
]
}
},
"meta": {}
}
]>validation_reference = ds.as_reference(profiler=stats_profiler)_ = job.to_df(validation_reference=validation_reference)02/02/2022 02:43:52 PM INFO: 5 expectation(s) included in expectation_suite. result_format settings filtered.
02/02/2022 02:43:53 PM INFO: Validating data_asset_name None with expectation_suite_name defaultfrom feast.dqm.errors import ValidationFailedtimestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2020-12-01", "2020-12-07", freq='D')entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_dfjob = store.get_historical_features(
entity_df=entity_df,
features=[
"trip_stats:total_miles_travelled",
"trip_stats:total_trip_seconds",
"trip_stats:total_earned",
"trip_stats:trip_count",
"on_demand_stats:avg_fare",
"on_demand_stats:avg_trip_seconds",
"on_demand_stats:avg_speed",
"on_demand_stats:earned_per_hour",
]
)try:
df = job.to_df(validation_reference=validation_reference)
except ValidationFailed as exc:
print(exc.validation_report)02/02/2022 02:43:58 PM INFO: 5 expectation(s) included in expectation_suite. result_format settings filtered.
02/02/2022 02:43:59 PM INFO: Validating data_asset_name None with expectation_suite_name default
[
{
"expectation_config": {
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "trip_count",
"min_value": 10.387244591346153,
"max_value": 12.695521167200855,
"result_format": "COMPLETE"
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": 6.692920555429092,
"element_count": 35448,
"missing_count": 31055,
"missing_percent": 87.6071992778154
},
"exception_info": {
"raised_exception": false,
"exception_message": null,
"exception_traceback": null
},
"success": false
},
{
"expectation_config": {
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "earned_per_hour",
"min_value": 52.320624975640214,
"max_value": 63.94743052578249,
"result_format": "COMPLETE"
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": 68.99268345164135,
"element_count": 35448,
"missing_count": 31055,
"missing_percent": 87.6071992778154
},
"exception_info": {
"raised_exception": false,
"exception_message": null,
"exception_traceback": null
},
"success": false
},
{
"expectation_config": {
"expectation_type": "expect_column_quantile_values_to_be_between",
"kwargs": {
"column": "avg_fare",
"quantile_ranges": {
"quantiles": [
0.5,
0.75,
0.9,
0.95
],
"value_ranges": [
[
null,
16.4
],
[
null,
26.229166666666668
],
[
null,
36.4375
],
[
null,
42.0
]
]
},
"result_format": "COMPLETE"
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": {
"quantiles": [
0.5,
0.75,
0.9,
0.95
],
"values": [
19.5,
28.1,
38.0,
44.125
]
},
"element_count": 35448,
"missing_count": 31055,
"missing_percent": 87.6071992778154,
"details": {
"success_details": [
false,
false,
false,
false
]
}
},
"exception_info": {
"raised_exception": false,
"exception_message": null,
"exception_traceback": null
},
"success": false
}
]