Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
project: feast_demo_aws
provider: aws
registry:
path: s3://[YOUR BUCKET YOU CREATED]/registry.pb
cache_ttl_seconds: 60
online_store: null
offline_store:
type: fileproject: feast_demo_gcp
provider: gcp
registry:
path: gs://[YOUR BUCKET YOU CREATED]/registry.pb
cache_ttl_seconds: 60
online_store: null
offline_store:
type: filefeature_store.yaml filepip install feastpip install 'feast[snowflake]'pip install 'feast[gcp]'pip install 'feast[aws]'feast init -t snowflake
Snowflake Deployment URL: ...
Snowflake User Name: ...
Snowflake Password: ...
Snowflake Role Name: ...
Snowflake Warehouse Name: ...
Snowflake Database Name: ...
Creating a new Feast repository in /<...>/tiny_pika.feast init -t gcp
Creating a new Feast repository in /<...>/tiny_pika.feast init -t aws
AWS Region (e.g. us-west-2): ...
Redshift Cluster ID: ...
Redshift Database Name: ...
Redshift User Name: ...
Redshift S3 Staging Location (s3://*): ...
Redshift IAM Role for S3 (arn:aws:iam::*:role/*): ...
Should I upload example data to Redshift (overwriting 'feast_driver_hourly_stats' table)? (Y/n):
Creating a new Feast repository in /<...>/tiny_pika.project: <your project name>
provider: <provider name>
online_store: redis
offline_store: file
registry:
registry_type: sql
path: postgresql://postgres:[email protected]:55001/feast
cache_ttl_seconds: 60repo_config = RepoConfig(
registry=RegistryConfig(path="gs://feast-test-gcs-bucket/registry.pb"),
project="feast_demo_gcp",
provider="gcp",
offline_store="file", # Could also be the OfflineStoreConfig e.g. FileOfflineStoreConfig
online_store="null", # Could also be the OnlineStoreConfig e.g. RedisOnlineStoreConfig
)
store = FeatureStore(config=repo_config)project: feast_demo_aws
provider: aws
registry: s3://feast-test-s3-bucket/registry.pb
online_store: null
offline_store:
type: filestore = FeatureStore(repo_path=".")from feast import FeatureStore
from feast.infra.offline_stores.bigquery_source import SavedDatasetBigQueryStorage
store = FeatureStore()
historical_job = store.get_historical_features(
features=["driver:avg_trip"],
entity_df=...,
)
dataset = store.create_saved_dataset(
from_=historical_job,
name='my_training_dataset',
storage=SavedDatasetBigQueryStorage(table_ref='<gcp-project>.<gcp-dataset>.my_training_dataset'),
tags={'author': 'oleksii'}
)
dataset.to_df()dataset = store.get_saved_dataset('my_training_dataset')
dataset.to_df()project: <your project name>
provider: <provider name>
online_store: redis
offline_store: file
registry:
registry_type: sql
path: postgresql://postgres:[email protected]:55001/feast
cache_ttl_seconds: 60pip install 'feast[redis]'feast init
Creating a new Feast repository in /<...>/tiny_pika.$ tree
.
└── tiny_pika
├── data
│ └── driver_stats.parquet
├── example.py
└── feature_store.yaml
1 directory, 3 files# Replace "tiny_pika" with your auto-generated dir name
cd tiny_pikafeast apply
# Processing example.py as example
# Done!feast teardownfeature_refs = [
"driver_trips:average_daily_rides",
"driver_trips:maximum_daily_rides",
"driver_trips:rating",
"driver_trips:rating:trip_completed",
]import pandas as pd
from datetime import datetime
entity_df = pd.DataFrame(
{
"event_timestamp": [pd.Timestamp(datetime.now(), tz="UTC")],
"driver_id": [1001]
}
)entity_df = "SELECT event_timestamp, driver_id FROM my_gcp_project.table"from feast import FeatureStore
fs = FeatureStore(repo_path="path/to/your/feature/repo")
training_df = fs.get_historical_features(
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate"
],
entity_df=entity_df
).to_df()feast materialize 2021-04-07T00:00:00 2021-04-08T00:00:00feast materialize 2021-04-07T00:00:00 2021-04-08T00:00:00 \
--views driver_hourly_statsfeast materialize-incremental 2021-04-08T00:00:00├── .github
│ └── workflows
│ ├── production.yml
│ └── staging.yml
│
├── staging
│ ├── driver_repo.py
│ └── feature_store.yaml
│
└── production
├── driver_repo.py
└── feature_store.yamlproject: staging
registry: gs://feast-ci-demo-registry/staging/registry.db
provider: gcp└── production
├── common
│ ├── __init__.py
│ ├── sources.py
│ └── entities.py
├── ranking
│ ├── __init__.py
│ ├── views.py
│ └── transformations.py
├── segmentation
│ ├── __init__.py
│ ├── views.py
│ └── transformations.py
└── feature_store.yaml├── .github
│ └── workflows
│ ├── production.yml
│ └── staging.yml
├── staging
│ └── feature_store.yaml
├── production
│ └── feature_store.yaml
└── driver_repo.pyfeast -f staging/feature_store.yaml applyfrom typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.infra.materialization import LocalMaterializationEngine, LocalMaterializationJob, MaterializationTask
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.repo_config import RepoConfig
class MyCustomEngine(LocalMaterializationEngine):
def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
super().__init__(
repo_config=repo_config,
offline_store=offline_store,
online_store=online_store,
**kwargs,
)
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
print("Creating new infrastructure is easy here!")
pass
def materialize(
self, registry, tasks: List[MaterializationTask]
) -> List[LocalMaterializationJob]:
print("Launching custom batch jobs or multithreading things is pretty easy...")
return [
self._materialize_one(
registry,
task.feature_view,
task.start_time,
task.end_time,
task.project,
task.tqdm_builder,
)
for task in tasks
]project: repo
registry: registry.db
batch_engine: feast_custom_engine.MyCustomEngine
online_store:
type: sqlite
path: online_store.db
offline_store:
type: filefeast applyRegistered entity driver_id
Registered feature view driver_hourly_stats
Deploying infrastructure for driver_hourly_stats
Creating new infrastructure is easy here!PYTHONPATH=$PYTHONPATH:/home/my_user/my_custom_engine feast apply$ tree -L 1 -d
.
├── docs
├── examples
├── go
├── infra
├── java
├── protos
├── sdk
└── ui$ tree --dirsfirst -L 1 infra
infra
├── contrib
├── feature_servers
├── materialization
├── offline_stores
├── online_stores
├── registry
├── transformation_servers
├── utils
├── __init__.py
├── aws.py
├── gcp.py
├── infra_object.py
├── key_encoding_utils.py
├── local.py
├── passthrough_provider.py
└── provider.pyfrom feast import BigQuerySource
BigQuerySource(
query="SELECT timestamp as ts, created, f1, f2 "
"FROM `my_project.my_dataset.my_features`",
)from feast import Entity, PushSource, ValueType, BigQuerySource, FeatureView, Feature, Field
from feast.types import Int64
push_source = PushSource(
name="push_source",
batch_source=BigQuerySource(table="test.test"),
)
user = Entity(name="user", join_keys=["user_id"])
fv = FeatureView(
name="feature view",
entities=[user],
schema=[Field(name="life_time_value", dtype=Int64)],
source=push_source,
)from feast import FeatureStore
import pandas as pd
from feast.data_source import PushMode
fs = FeatureStore(...)
feature_data_frame = pd.DataFrame()
fs.push("push_source_name", feature_data_frame, to=PushMode.ONLINE_AND_OFFLINE)from feast import FeatureStore
store = FeatureStore(...)
spark = SparkSession.builder.getOrCreate()
streamingDF = spark.readStream.format(...).load()
def feast_writer(spark_df):
pandas_df = spark_df.to_pandas()
store.push("driver_hourly_stats", pandas_df)
streamingDF.writeStream.foreachBatch(feast_writer).start()from datetime import timedelta
from feast import Field, FileSource, KafkaSource, stream_feature_view
from feast.data_format import JsonFormat
from feast.types import Float32
driver_stats_batch_source = FileSource(
name="driver_stats_source",
path="data/driver_stats.parquet",
timestamp_field="event_timestamp",
)
driver_stats_stream_source = KafkaSource(
name="driver_stats_stream",
kafka_bootstrap_servers="localhost:9092",
topic="drivers",
timestamp_field="event_timestamp",
batch_source=driver_stats_batch_source,
message_format=JsonFormat(
schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
),
watermark_delay_threshold=timedelta(minutes=5),
)@stream_feature_view(
entities=[driver],
ttl=timedelta(seconds=8640000000),
mode="spark",
schema=[
Field(name="conv_percentage", dtype=Float32),
Field(name="acc_percentage", dtype=Float32),
],
timestamp_field="event_timestamp",
online=True,
source=driver_stats_stream_source,
)
def driver_hourly_stats_stream(df: DataFrame):
from pyspark.sql.functions import col
return (
df.withColumn("conv_percentage", col("conv_rate") * 100.0)
.withColumn("acc_percentage", col("acc_rate") * 100.0)
.drop("conv_rate", "acc_rate")
)from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
PostgreSQLSource,
)
driver_stats_source = PostgreSQLSource(
name="feast_driver_hourly_stats",
query="SELECT * FROM feast_driver_hourly_stats",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)from feast.infra.offline_stores.contrib.trino_offline_store.trino_source import (
TrinoSource,
)
driver_hourly_stats = TrinoSource(
event_timestamp_column="event_timestamp",
table_ref="feast.driver_stats",
created_timestamp_column="created",
)pip install feastfeast init my_project
cd my_project/feature_repoCreating a new Feast repository in /home/Jovyan/my_project.project: my_project
# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry)
registry: data/registry.db
# The provider primarily specifies default offline / online stores & storing the registry in a given cloud
provider: local
online_store:
type: sqlite
path: data/online_store.db
entity_key_serialization_version: 2# This is an example feature definition file
from datetime import timedelta
import pandas as pd
from feast import (
Entity,
FeatureService,
FeatureView,
Field,
FileSource,
PushSource,
RequestSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64
# Define an entity for the driver. You can think of entity as a primary key used to
# fetch features.
driver = Entity(name="driver", join_keys=["driver_id"])
# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_stats_source = FileSource(
name="driver_hourly_stats_source",
path="%PARQUET_PATH%",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_stats_fv = FeatureView(
# The unique name of this feature view. Two feature views in a single
# project cannot have the same name
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
# The list of features defined below act as a schema to both define features
# for both materialization of features into a store, and are used as references
# during retrieval for building a training dataset or serving features
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
)
# Defines a way to push data (to be available offline, online or both) into Feast.
driver_stats_push_source = PushSource(
name="driver_stats_push_source",
batch_source=driver_stats_source,
)
# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)
# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fv, input_request],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df
# This groups features into a model version
driver_activity_v1 = FeatureService(
name="driver_activity_v1",
features=[
driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view
transformed_conv_rate, # Selects all features from the feature view
],
)
driver_activity_v2 = FeatureService(
name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
)import pandas as pd
pd.read_parquet("data/driver_stats.parquet")feast applyCreated entity driver
Created feature view driver_hourly_stats
Created on demand feature view transformed_conv_rate
Created feature service driver_activity_v1
Created feature service driver_activity_v2
Created sqlite table my_project_driver_hourly_statsfrom datetime import datetime
import pandas as pd
from feast import FeatureStore
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for
# more details on how to retrieve for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
{
# entity's join key -> entity values
"driver_id": [1001, 1002, 1003],
# "event_timestamp" (reserved key) -> timestamps
"event_timestamp": [
datetime(2021, 4, 12, 10, 59, 42),
datetime(2021, 4, 12, 8, 12, 10),
datetime(2021, 4, 12, 16, 40, 26),
],
# (optional) label name -> label values. Feast does not process these
"label_driver_reported_satisfaction": [1, 5, 3],
# values we're using for an on-demand transformation
"val_to_add": [1, 2, 3],
"val_to_add_2": [10, 20, 30],
}
)
store = FeatureStore(repo_path=".")
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
],
).to_df()
print("----- Feature schema -----\n")
print(training_df.info())
print()
print("----- Example features -----\n")
print(training_df.head())----- Feature schema -----
<class 'pandas.core.frame.DataFrame'>
Int64Index: 3 entries, 0 to 2
Data columns (total 6 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 event_timestamp 3 non-null datetime64[ns, UTC]
1 driver_id 3 non-null int64
2 label_driver_reported_satisfaction 3 non-null int64
3 conv_rate 3 non-null float32
4 acc_rate 3 non-null float32
5 avg_daily_trips 3 non-null int32
dtypes: datetime64[ns, UTC](1), float32(2), int32(1), int64(2)
memory usage: 132.0 bytes
None
----- Example features -----
event_timestamp driver_id ... acc_rate avg_daily_trips
0 2021-08-23 15:12:55.489091+00:00 1003 ... 0.077863 741
1 2021-08-23 15:49:55.489089+00:00 1002 ... 0.074327 113
2 2021-08-23 16:14:55.489075+00:00 1001 ... 0.105046 347
[3 rows x 6 columns]entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
],
).to_df()
print("\n----- Example features -----\n")
print(training_df.head())----- Example features -----
driver_id event_timestamp ... acc_rate avg_daily_trips conv_rate_plus_val1
0 1001 2022-08-08 18:22:06.555018+00:00 ... 0.864639 359 1.663844
1 1002 2022-08-08 18:22:06.555018+00:00 ... 0.695982 311 2.151189
2 1003 2022-08-08 18:22:06.555018+00:00 ... 0.949191 789 3.769165 CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
feast materialize-incremental $CURRENT_TIMEMaterializing 1 feature views to 2021-08-23 16:25:46+00:00 into the sqlite online
store.
driver_hourly_stats from 2021-08-22 16:25:47+00:00 to 2021-08-23 16:25:46+00:00:
100%|████████████████████████████████████████████| 5/5 [00:00<00:00, 592.05it/s]from pprint import pprint
from feast import FeatureStore
store = FeatureStore(repo_path=".")
feature_vector = store.get_online_features(
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
],
entity_rows=[
# {join_key: entity_value}
{"driver_id": 1004},
{"driver_id": 1005},
],
).to_dict()
pprint(feature_vector){
'acc_rate': [0.5732735991477966, 0.7828438878059387],
'avg_daily_trips': [33, 984],
'conv_rate': [0.15498852729797363, 0.6263588070869446],
'driver_id': [1004, 1005]
}from feast import FeatureService
driver_stats_fs = FeatureService(
name="driver_activity_v1", features=[driver_hourly_stats_view]
)from pprint import pprint
from feast import FeatureStore
feature_store = FeatureStore('.') # Initialize the feature store
feature_service = feature_store.get_feature_service("driver_activity_v1")
feature_vector = feature_store.get_online_features(
features=feature_service,
entity_rows=[
# {join_key: entity_value}
{"driver_id": 1004},
{"driver_id": 1005},
],
).to_dict()
pprint(feature_vector){
'acc_rate': [0.5732735991477966, 0.7828438878059387],
'avg_daily_trips': [33, 984],
'conv_rate': [0.15498852729797363, 0.6263588070869446],
'driver_id': [1004, 1005]
}feast uiINFO: Started server process [66664]
08/17/2022 01:25:49 PM uvicorn.error INFO: Started server process [66664]
INFO: Waiting for application startup.
08/17/2022 01:25:49 PM uvicorn.error INFO: Waiting for application startup.
INFO: Application startup complete.
08/17/2022 01:25:49 PM uvicorn.error INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8888 (Press CTRL+C to quit)
08/17/2022 01:25:49 PM uvicorn.error INFO: Uvicorn running on http://0.0.0.0:8888 (Press CTRL+C to quit)test_workflow.pyfrom feast import BigQuerySource, Entity, FeatureView, Field
from
from feast import BigQuerySource, FeatureView, Field
from feast.types import Int64
global_stats_fv = FeatureView(
name="global_stats",
entities=[],
schema=[
Field(name="total_trips_today_by_all_drivers", dtype=Int64),
],
source=BigQuerySource(
table="feast-oss.demo_data.global_stats"
)
)from feast import BigQuerySource, Entity, FeatureView, Field
from feast.types import Int32, Int64
location = Entity(name="location", join_keys=["location_id"])
location_stats_fv= FeatureView(
name="location_stats",
entities=[location],
schema=[
Field(name="temperature", dtype=Int32),
Field(name="location_id", dtype=Int64),
],
source=BigQuerySource(
table="feast-oss.demo_data.location_stats"
),
)from location_stats_feature_view import location_stats_fv
temperatures_fs = FeatureService(
name="temperatures",
features=[
location_stats_fv
.with_name("origin_stats")
.with_join_key_map(
{"location_id": "origin_id"}
),
location_stats_fv
.with_name("destination_stats")
.with_join_key_map(
{"location_id": "destination_id"}
),
],
)from feast import Field
from feast.types import Float32
trips_today = Field(
name="trips_today",
dtype=Float32
)from feast import Field, RequestSource
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float64
# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=PrimitiveFeastType.INT64),
Field(name="val_to_add_2": dtype=PrimitiveFeastType.INT64),
]
)
# Use the input data and feature view features to create new features
@on_demand_feature_view(
sources=[
driver_hourly_stats_view,
input_request
],
schema=[
Field(name='conv_rate_plus_val1', dtype=Float64),
Field(name='conv_rate_plus_val2', dtype=Float64)
]
)
def transformed_conv_rate(features_df: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df['conv_rate_plus_val1'] = (features_df['conv_rate'] + features_df['val_to_add'])
df['conv_rate_plus_val2'] = (features_df['conv_rate'] + features_df['val_to_add_2'])
return dffrom datetime import timedelta
from feast import Field, FileSource, KafkaSource, stream_feature_view
from feast.data_format import JsonFormat
from feast.types import Float32
driver_stats_batch_source = FileSource(
name="driver_stats_source",
path="data/driver_stats.parquet",
timestamp_field="event_timestamp",
)
driver_stats_stream_source = KafkaSource(
name="driver_stats_stream",
kafka_bootstrap_servers="localhost:9092",
topic="drivers",
timestamp_field="event_timestamp",
batch_source=driver_stats_batch_source,
message_format=JsonFormat(
schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
),
watermark_delay_threshold=timedelta(minutes=5),
)
@stream_feature_view(
entities=[driver],
ttl=timedelta(seconds=8640000000),
mode="spark",
schema=[
Field(name="conv_percentage", dtype=Float32),
Field(name="acc_percentage", dtype=Float32),
],
timestamp_field="event_timestamp",
online=True,
source=driver_stats_stream_source,
)
def driver_hourly_stats_stream(df: DataFrame):
from pyspark.sql.functions import col
return (
df.withColumn("conv_percentage", col("conv_rate") * 100.0)
.withColumn("acc_percentage", col("acc_rate") * 100.0)
.drop("conv_rate", "acc_rate")
)from driver_ratings_feature_view import driver_ratings_fv
from driver_trips_feature_view import driver_stats_fv
driver_stats_fs = FeatureService(
name="driver_activity",
features=[driver_stats_fv, driver_ratings_fv[["lifetime_rating"]]]
)from feast import FeatureStore
feature_store = FeatureStore('.') # Initialize the feature store
feature_service = feature_store.get_feature_service("driver_activity")
features = feature_store.get_online_features(
features=feature_service, entity_rows=[entity_dict]
)from feast import FeatureStore
feature_store = FeatureStore('.') # Initialize the feature store
feature_service = feature_store.get_feature_service("driver_activity")
feature_store.get_historical_features(features=feature_service, entity_df=entity_df)online_features = fs.get_online_features(
features=[
'driver_locations:lon',
'drivers_activity:trips_today'
],
entity_rows=[
# {join_key: entity_value}
{'driver': 'driver_1001'}
]
)training_df = store.get_historical_features(
entity_df=entity_df,
features=store.get_feature_service("model_v1"),
).to_df()training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_daily_features:daily_miles_driven"
],
).to_df()entity_df = pd.DataFrame.from_dict(
{
"driver_id": [1001, 1002, 1003, 1004, 1001],
"event_timestamp": [
datetime(2021, 4, 12, 10, 59, 42),
datetime(2021, 4, 12, 8, 12, 10),
datetime(2021, 4, 12, 16, 40, 26),
datetime(2021, 4, 12, 15, 1, 12),
datetime.now()
]
}
)
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_daily_features:daily_miles_driven"
],
).to_df()entity_sql = f"""
SELECT
driver_id,
event_timestamp
FROM {store.get_data_source("driver_hourly_stats_source").get_table_query_string()}
WHERE event_timestamp BETWEEN '2021-01-01' and '2021-12-31'
"""
training_df = store.get_historical_features(
entity_df=entity_sql,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_daily_features:daily_miles_driven"
],
).to_df()fs = FeatureStore("my_feature_repo/")
print(fs.list_feature_views())fs = FeatureStore("my_feature_repo/")
fv = fs.get_feature_view(“my_fv1”)get_historical_features without providing an entity dataframe?A common use case in machine learning, this tutorial is an end-to-end, production-ready fraud prediction system. It predicts in real-time whether a transaction made by a user is fraudulent.
!pip install 'feast[ge]'!pip install google-cloud-bigqueryimport pyarrow.parquet
from google.cloud.bigquery import Clientbq_client = Client(project='kf-feast')data_query = """SELECT
taxi_id,
TIMESTAMP_TRUNC(trip_start_timestamp, DAY) as day,
SUM(trip_miles) as total_miles_travelled,
SUM(trip_seconds) as total_trip_seconds,
SUM(fare) as total_earned,
COUNT(*) as trip_count
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE
trip_miles > 0 AND trip_seconds > 60 AND
trip_start_timestamp BETWEEN '2019-01-01' and '2020-12-31' AND
trip_total < 1000
GROUP BY taxi_id, TIMESTAMP_TRUNC(trip_start_timestamp, DAY)"""driver_stats_table = bq_client.query(data_query).to_arrow()
# Storing resulting dataset into parquet file
pyarrow.parquet.write_table(driver_stats_table, "trips_stats.parquet")def entities_query(year):
return f"""SELECT
distinct taxi_id
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE
trip_miles > 0 AND trip_seconds > 0 AND
trip_start_timestamp BETWEEN '{year}-01-01' and '{year}-12-31'
"""entities_2019_table = bq_client.query(entities_query(2019)).to_arrow()
# Storing entities (taxi ids) into parquet file
pyarrow.parquet.write_table(entities_2019_table, "entities.parquet")import pyarrow.parquet
import pandas as pd
from feast import FeatureView, Entity, FeatureStore, Field, BatchFeatureView
from feast.types import Float64, Int64
from feast.value_type import ValueType
from feast.data_format import ParquetFormat
from feast.on_demand_feature_view import on_demand_feature_view
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.file import SavedDatasetFileStorage
from datetime import timedelta
batch_source = FileSource(
timestamp_field="day",
path="trips_stats.parquet", # using parquet file that we created on previous step
file_format=ParquetFormat()
)taxi_entity = Entity(name='taxi', join_keys=['taxi_id'])trips_stats_fv = BatchFeatureView(
name='trip_stats',
entities=['taxi'],
features=[
Field(name="total_miles_travelled", dtype=Float64),
Field(name="total_trip_seconds", dtype=Float64),
Field(name="total_earned", dtype=Float64),
Field(name="trip_count", dtype=Int64),
],
ttl=timedelta(seconds=86400),
source=batch_source,
)@on_demand_feature_view(
schema=[
Field("avg_fare", Float64),
Field("avg_speed", Float64),
Field("avg_trip_seconds", Float64),
Field("earned_per_hour", Float64),
],
sources=[
trips_stats_fv,
]
)
def on_demand_stats(inp):
out = pd.DataFrame()
out["avg_fare"] = inp["total_earned"] / inp["trip_count"]
out["avg_speed"] = 3600 * inp["total_miles_travelled"] / inp["total_trip_seconds"]
out["avg_trip_seconds"] = inp["total_trip_seconds"] / inp["trip_count"]
out["earned_per_hour"] = 3600 * inp["total_earned"] / inp["total_trip_seconds"]
return outstore = FeatureStore(".") # using feature_store.yaml that stored in the same directorystore.apply([taxi_entity, trips_stats_fv, on_demand_stats]) # writing to the registrytaxi_ids = pyarrow.parquet.read_table("entities.parquet").to_pandas()timestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2019-06-01", "2019-07-01", freq='D')entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_dfjob = store.get_historical_features(
entity_df=entity_df,
features=[
"trip_stats:total_miles_travelled",
"trip_stats:total_trip_seconds",
"trip_stats:total_earned",
"trip_stats:trip_count",
"on_demand_stats:avg_fare",
"on_demand_stats:avg_trip_seconds",
"on_demand_stats:avg_speed",
"on_demand_stats:earned_per_hour",
]
)
store.create_saved_dataset(
from_=job,
name='my_training_ds',
storage=SavedDatasetFileStorage(path='my_training_ds.parquet')
)<SavedDataset(name = my_training_ds, features = ['trip_stats:total_miles_travelled', 'trip_stats:total_trip_seconds', 'trip_stats:total_earned', 'trip_stats:trip_count', 'on_demand_stats:avg_fare', 'on_demand_stats:avg_trip_seconds', 'on_demand_stats:avg_speed', 'on_demand_stats:earned_per_hour'], join_keys = ['taxi_id'], storage = <feast.infra.offline_stores.file_source.SavedDatasetFileStorage object at 0x1276e7950>, full_feature_names = False, tags = {}, _retrieval_job = <feast.infra.offline_stores.file.FileRetrievalJob object at 0x12716fed0>, min_event_timestamp = 2019-06-01 00:00:00, max_event_timestamp = 2019-07-01 00:00:00)>import numpy as np
from feast.dqm.profilers.ge_profiler import ge_profiler
from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.dataset import PandasDatasetds = store.get_saved_dataset('my_training_ds')
ds.to_df()DELTA = 0.1 # controlling allowed window in fraction of the value on scale [0, 1]
@ge_profiler
def stats_profiler(ds: PandasDataset) -> ExpectationSuite:
# simple checks on data consistency
ds.expect_column_values_to_be_between(
"avg_speed",
min_value=0,
max_value=60,
mostly=0.99 # allow some outliers
)
ds.expect_column_values_to_be_between(
"total_miles_travelled",
min_value=0,
max_value=500,
mostly=0.99 # allow some outliers
)
# expectation of means based on observed values
observed_mean = ds.trip_count.mean()
ds.expect_column_mean_to_be_between("trip_count",
min_value=observed_mean * (1 - DELTA),
max_value=observed_mean * (1 + DELTA))
observed_mean = ds.earned_per_hour.mean()
ds.expect_column_mean_to_be_between("earned_per_hour",
min_value=observed_mean * (1 - DELTA),
max_value=observed_mean * (1 + DELTA))
# expectation of quantiles
qs = [0.5, 0.75, 0.9, 0.95]
observed_quantiles = ds.avg_fare.quantile(qs)
ds.expect_column_quantile_values_to_be_between(
"avg_fare",
quantile_ranges={
"quantiles": qs,
"value_ranges": [[None, max_value] for max_value in observed_quantiles]
})
return ds.get_expectation_suite()ds.get_profile(profiler=stats_profiler)02/02/2022 02:43:47 PM INFO: 5 expectation(s) included in expectation_suite. result_format settings filtered.
<GEProfile with expectations: [
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "avg_speed",
"min_value": 0,
"max_value": 60,
"mostly": 0.99
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "total_miles_travelled",
"min_value": 0,
"max_value": 500,
"mostly": 0.99
},
"meta": {}
},
{
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "trip_count",
"min_value": 10.387244591346153,
"max_value": 12.695521167200855
},
"meta": {}
},
{
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "earned_per_hour",
"min_value": 52.320624975640214,
"max_value": 63.94743052578249
},
"meta": {}
},
{
"expectation_type": "expect_column_quantile_values_to_be_between",
"kwargs": {
"column": "avg_fare",
"quantile_ranges": {
"quantiles": [
0.5,
0.75,
0.9,
0.95
],
"value_ranges": [
[
null,
16.4
],
[
null,
26.229166666666668
],
[
null,
36.4375
],
[
null,
42.0
]
]
}
},
"meta": {}
}
]>validation_reference = ds.as_reference(profiler=stats_profiler)_ = job.to_df(validation_reference=validation_reference)02/02/2022 02:43:52 PM INFO: 5 expectation(s) included in expectation_suite. result_format settings filtered.
02/02/2022 02:43:53 PM INFO: Validating data_asset_name None with expectation_suite_name defaultfrom feast.dqm.errors import ValidationFailedtimestamps = pd.DataFrame()
timestamps["event_timestamp"] = pd.date_range("2020-12-01", "2020-12-07", freq='D')entity_df = pd.merge(taxi_ids, timestamps, how='cross')
entity_dfjob = store.get_historical_features(
entity_df=entity_df,
features=[
"trip_stats:total_miles_travelled",
"trip_stats:total_trip_seconds",
"trip_stats:total_earned",
"trip_stats:trip_count",
"on_demand_stats:avg_fare",
"on_demand_stats:avg_trip_seconds",
"on_demand_stats:avg_speed",
"on_demand_stats:earned_per_hour",
]
)try:
df = job.to_df(validation_reference=validation_reference)
except ValidationFailed as exc:
print(exc.validation_report)02/02/2022 02:43:58 PM INFO: 5 expectation(s) included in expectation_suite. result_format settings filtered.
02/02/2022 02:43:59 PM INFO: Validating data_asset_name None with expectation_suite_name default
[
{
"expectation_config": {
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "trip_count",
"min_value": 10.387244591346153,
"max_value": 12.695521167200855,
"result_format": "COMPLETE"
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": 6.692920555429092,
"element_count": 35448,
"missing_count": 31055,
"missing_percent": 87.6071992778154
},
"exception_info": {
"raised_exception": false,
"exception_message": null,
"exception_traceback": null
},
"success": false
},
{
"expectation_config": {
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "earned_per_hour",
"min_value": 52.320624975640214,
"max_value": 63.94743052578249,
"result_format": "COMPLETE"
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": 68.99268345164135,
"element_count": 35448,
"missing_count": 31055,
"missing_percent": 87.6071992778154
},
"exception_info": {
"raised_exception": false,
"exception_message": null,
"exception_traceback": null
},
"success": false
},
{
"expectation_config": {
"expectation_type": "expect_column_quantile_values_to_be_between",
"kwargs": {
"column": "avg_fare",
"quantile_ranges": {
"quantiles": [
0.5,
0.75,
0.9,
0.95
],
"value_ranges": [
[
null,
16.4
],
[
null,
26.229166666666668
],
[
null,
36.4375
],
[
null,
42.0
]
]
},
"result_format": "COMPLETE"
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": {
"quantiles": [
0.5,
0.75,
0.9,
0.95
],
"values": [
19.5,
28.1,
38.0,
44.125
]
},
"element_count": 35448,
"missing_count": 31055,
"missing_percent": 87.6071992778154,
"details": {
"success_details": [
false,
false,
false,
false
]
}
},
"exception_info": {
"raised_exception": false,
"exception_message": null,
"exception_traceback": null
},
"success": false
}
]
batch_engine:
type: bytewax
namespace: bytewax
image: bytewax/bytewax-feast:latest
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: aws-access-key-id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: aws-secret-access-keyfrom airflow.decorators import task
from feast import RepoConfig, FeatureStore
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
from feast.repo_config import RegistryConfig
# Define Python callable
@task()
def materialize(data_interval_start=None, data_interval_end=None):
repo_config = RepoConfig(
registry=RegistryConfig(path="s3://[YOUR BUCKET]/registry.pb"),
project="feast_demo_aws",
provider="aws",
offline_store="file",
online_store=DynamoDBOnlineStoreConfig(region="us-west-2"),
entity_key_serialization_version=2
)
store = FeatureStore(config=repo_config)
# Option 1: materialize just one feature view
# store.materialize_incremental(datetime.datetime.now(), feature_views=["my_fv_name"])
# Option 2: materialize all feature views incrementally
# store.materialize_incremental(datetime.datetime.now())
# Option 3: Let Airflow manage materialization state
# Add 1 hr overlap to account for late data
store.materialize(data_interval_start.subtract(hours=1), data_interval_end)from feast import FeatureStore
fs = FeatureStore(repo_path="production/")import mlflow.pyfunc
# Load model from MLflow
model_name = "my-model"
model_version = 1
model = mlflow.pyfunc.load_model(
model_uri=f"models:/{model_name}/{model_version}"
)
fs = FeatureStore(repo_path="production/")
# Read online features using the same model name and model version
feature_vector = fs.get_online_features(
features=fs.get_feature_service(f"{model_name}_v{model_version}"),
entity_rows=[{"driver_id": 1001}]
).to_dict()
# Make a prediction
prediction = model.predict(feature_vector)from feast import FeatureStore
with open('feature_refs.json', 'r') as f:
feature_refs = json.loads(f)
fs = FeatureStore(repo_path="production/")
# Read online features
feature_vector = fs.get_online_features(
features=feature_refs,
entity_rows=[{"driver_id": 1001}]
).to_dict()helm repo add feast-charts https://feast-helm-charts.storage.googleapis.com
helm repo updatehelm install feast-release feast-charts/feast-feature-server \
--set feature_store_yaml_base64=$(base64 feature_store.yaml) project: my_project
registry: data/registry.db
provider: local
online_store:
type: redis
connection_string: ${REDIS_CONNECTION_STRING}project: my_project
registry: data/registry.db
provider: local
online_store:
type: redis
connection_string: ${REDIS_CONNECTION_STRING:"0.0.0.0:6379"}from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
from feast.infra.local import LocalProvider
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.infra.registry.registry import Registry
from feast.repo_config import RepoConfig
class MyCustomProvider(LocalProvider):
def __init__(self, config: RepoConfig, repo_path):
super().__init__(config)
# Add your custom init code here. This code runs on every Feast operation.
def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
super().update_infra(
project,
tables_to_delete,
tables_to_keep,
entities_to_delete,
entities_to_keep,
partial,
)
print("Launching custom streaming jobs is pretty easy...")
def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
super().materialize_single_feature_view(
config, feature_view, start_date, end_date, registry, project, tqdm_builder
)
print("Launching custom batch jobs is pretty easy...")project: repo
registry: registry.db
provider: feast_custom_provider.custom_provider.MyCustomProvider
online_store:
type: sqlite
path: online_store.db
offline_store:
type: filefeast applyRegistered entity driver_id
Registered feature view driver_hourly_stats
Deploying infrastructure for driver_hourly_stats
Launching custom streaming jobs is pretty easy...PYTHONPATH=$PYTHONPATH:/home/my_user/my_custom_provider feast apply$ tree
.
├── e2e
│ ├── test_go_feature_server.py
│ ├── test_python_feature_server.py
│ ├── test_universal_e2e.py
│ ├── test_usage_e2e.py
│ └── test_validation.py
├── feature_repos
│ ├── integration_test_repo_config.py
│ ├── repo_configuration.py
│ └── universal
│ ├── catalog
│ ├── data_source_creator.py
│ ├── data_sources
│ │ ├── __init__.py
│ │ ├── bigquery.py
│ │ ├── file.py
│ │ ├── redshift.py
│ │ └── snowflake.py
│ ├── entities.py
│ ├── feature_views.py
│ ├── online_store
│ │ ├── __init__.py
│ │ ├── datastore.py
│ │ ├── dynamodb.py
│ │ ├── hbase.py
│ │ └── redis.py
│ └── online_store_creator.py
├── materialization
│ └── test_lambda.py
├── offline_store
│ ├── test_feature_logging.py
│ ├── test_offline_write.py
│ ├── test_push_features_to_offline_store.py
│ ├── test_s3_custom_endpoint.py
│ └── test_universal_historical_retrieval.py
├── online_store
│ ├── test_push_features_to_online_store.py
│ └── test_universal_online.py
└── registration
├── test_feature_store.py
├── test_inference.py
├── test_registry.py
├── test_universal_cli.py
├── test_universal_odfv_feature_inference.py
└── test_universal_types.py
@pytest.mark.integration
@pytest.mark.universal_offline_stores
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: f"full:{v}")
def test_historical_features(environment, universal_data_sources, full_feature_names):
store = environment.feature_store
(entities, datasets, data_sources) = universal_data_sources
feature_views = construct_universal_feature_views(data_sources)
entity_df_with_request_data = datasets.entity_df.copy(deep=True)
entity_df_with_request_data["val_to_add"] = [
i for i in range(len(entity_df_with_request_data))
]
entity_df_with_request_data["driver_age"] = [
i + 100 for i in range(len(entity_df_with_request_data))
]
feature_service = FeatureService(
name="convrate_plus100",
features=[feature_views.driver[["conv_rate"]], feature_views.driver_odfv],
)
feature_service_entity_mapping = FeatureService(
name="entity_mapping",
features=[
feature_views.location.with_name("origin").with_join_key_map(
{"location_id": "origin_id"}
),
feature_views.location.with_name("destination").with_join_key_map(
{"location_id": "destination_id"}
),
],
)
store.apply(
[
driver(),
customer(),
location(),
feature_service,
feature_service_entity_mapping,
*feature_views.values(),
]
)
# ... more test code
job_from_df = store.get_historical_features(
entity_df=entity_df_with_request_data,
features=[
"driver_stats:conv_rate",
"driver_stats:avg_daily_trips",
"customer_profile:current_balance",
"customer_profile:avg_passenger_count",
"customer_profile:lifetime_trip_count",
"conv_rate_plus_100:conv_rate_plus_100",
"conv_rate_plus_100:conv_rate_plus_100_rounded",
"conv_rate_plus_100:conv_rate_plus_val_to_add",
"order:order_is_success",
"global_stats:num_rides",
"global_stats:avg_ride_length",
"field_mapping:feature_name",
],
full_feature_names=full_feature_names,
)
if job_from_df.supports_remote_storage_export():
files = job_from_df.to_remote_storage()
print(files)
assert len(files) > 0 # This test should be way more detailed
start_time = datetime.utcnow()
actual_df_from_df_entities = job_from_df.to_df()
# ... more test code
validate_dataframes(
expected_df,
table_from_df_entities,
sort_by=[event_timestamp, "order_id", "driver_id", "customer_id"],
event_timestamp = event_timestamp,
)
# ... more test code@pytest.mark.universal_online_stores(only=["redis"])@pytest.mark.integration
def your_test(environment: Environment):
df = #...#
data_source = environment.data_source_creator.create_data_source(
df,
destination_name=environment.feature_store.project
)
your_fv = driver_feature_view(data_source)
entity = driver(value_type=ValueType.UNKNOWN)
fs.apply([fv, entity])
# ... run testStarting 6001
Starting 6002
Starting 6003
Starting 6004
Starting 6005
Starting 6006project: my_feature_repo
registry: data/registry.db
provider: local
offline_store:
type: snowflake.offline
account: snowflake_deployment.us-east-1
user: user_login
password: user_password
role: sysadmin
warehouse: demo_wh
database: FEASTproject: my_feature_repo
registry: data/registry.db
provider: aws
offline_store:
type: redshift
region: us-west-2
cluster_id: feast-cluster
database: feast-database
user: redshift-user
s3_staging_location: s3://feast-bucket/redshift
iam_role: arn:aws:iam::123456789012:role/redshift_s3_access_role{
"Statement": [
{
"Action": [
"s3:ListBucket",
"s3:PutObject",
"s3:GetObject",
"s3:DeleteObject"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::<bucket_name>/*",
"arn:aws:s3:::<bucket_name>"
]
},
{
"Action": [
"redshift-data:DescribeTable",
"redshift:GetClusterCredentials",
"redshift-data:ExecuteStatement"
],
"Effect": "Allow",
"Resource": [
"arn:aws:redshift:<region>:<account_id>:dbuser:<redshift_cluster_id>/<redshift_username>",
"arn:aws:redshift:<region>:<account_id>:dbname:<redshift_cluster_id>/<redshift_database_name>",
"arn:aws:redshift:<region>:<account_id>:cluster:<redshift_cluster_id>"
]
},
{
"Action": [
"redshift-data:DescribeStatement"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
}{
"Statement": [
{
"Action": "s3:*",
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::feast-integration-tests",
"arn:aws:s3:::feast-integration-tests/*"
]
}
],
"Version": "2012-10-17"
}{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "redshift.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}project: my_project
registry: data/registry.db
provider: local
offline_store:
type: spark
spark_conf:
spark.master: "local[*]"
spark.ui.enabled: "false"
spark.eventLog.enabled: "false"
spark.sql.catalogImplementation: "hive"
spark.sql.parser.quotedRegexColumnNames: "true"
spark.sql.session.timeZone: "UTC"
online_store:
path: data/online_store.dbproject: my_project
registry: data/registry.db
provider: local
offline_store:
type: postgres
host: DB_HOST
port: DB_PORT
database: DB_NAME
db_schema: DB_SCHEMA
user: DB_USERNAME
password: DB_PASSWORD
sslmode: verify-ca
sslkey_path: /path/to/client-key.pem
sslcert_path: /path/to/client-cert.pem
sslrootcert_path: /path/to/server-ca.pem
online_store:
path: data/online_store.dbregistry:
registry_store_type: AzureRegistryStore
path: ${REGISTRY_PATH} # Environment Variable
project: production
provider: azure
online_store:
type: redis
connection_string: ${REDIS_CONN} # Environment Variable
offline_store:
type: mssql
connection_string: ${SQL_CONN} # Environment Variableproject: my_feature_repo
registry: data/registry.db
provider: local
online_store:
type: redis
connection_string: "localhost:6379"project: my_feature_repo
registry: data/registry.db
provider: local
online_store:
type: redis
redis_type: redis_cluster
connection_string: "redis1:6379,redis2:6379,ssl=true,password=my_password"project: my_feature_repo
registry: data/registry.db
provider: local
online_store:
type: redis
key_ttl_seconds: 604800
connection_string: "localhost:6379"project: my_feature_repo
registry: data/registry.db
provider: aws
online_store:
type: dynamodb
region: us-west-2{
"Statement": [
{
"Action": [
"dynamodb:CreateTable",
"dynamodb:DescribeTable",
"dynamodb:DeleteTable",
"dynamodb:BatchWriteItem",
"dynamodb:BatchGetItem"
],
"Effect": "Allow",
"Resource": [
"arn:aws:dynamodb:<region>:<account_id>:table/*"
]
}
],
"Version": "2012-10-17"
}project: my_feature_repo
registry: data/registry.db
provider: local
online_store:
type: postgres
host: DB_HOST
port: DB_PORT
database: DB_NAME
db_schema: DB_SCHEMA
user: DB_USERNAME
password: DB_PASSWORD
sslmode: verify-ca
sslkey_path: /path/to/client-key.pem
sslcert_path: /path/to/client-cert.pem
sslrootcert_path: /path/to/server-ca.pemproject: my_feature_repo
registry: data/registry.db
provider: local
online_store:
type: cassandra
hosts:
- 192.168.1.1
- 192.168.1.2
- 192.168.1.3
keyspace: KeyspaceName
port: 9042 # optional
username: user # optional
password: secret # optional
protocol_version: 5 # optional
load_balancing: # optional
local_dc: 'datacenter1' # optional
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
read_concurrency: 100 # optional
write_concurrency: 100 # optionalproject: my_feature_repo
registry: data/registry.db
provider: local
online_store:
type: cassandra
secure_bundle_path: /path/to/secure/bundle.zip
keyspace: KeyspaceName
username: Client_ID
password: Client_Secret
protocol_version: 4 # optional
load_balancing: # optional
local_dc: 'eu-central-1' # optional
load_balancing_policy: 'TokenAwarePolicy(DCAwareRoundRobinPolicy)' # optional
read_concurrency: 100 # optional
write_concurrency: 100 # optional




training_retrieval_job = fs.get_historical_features(
entity_df=entity_df_or_sql_string,
features=fs.get_feature_service("driver_activity_v1"),
)
# Option 1: In memory model training
model = ml.fit(training_retrieval_job.to_df())
# Option 2: Unloading to blob storage. Further post-processing can occur before kicking off distributed training.
training_retrieval_job.to_remote_storage()entity_df = pd.DataFrame.from_dict(
{
"driver_id": [1001, 1002, 1003, 1004, 1001],
"event_timestamp": [
datetime(2021, 4, 12, 10, 59, 42),
datetime(2021, 4, 12, 8, 12, 10),
datetime(2021, 4, 12, 16, 40, 26),
datetime(2021, 4, 12, 15, 1, 12),
datetime.now()
]
}
)
training_df = store.get_historical_features(
entity_df=entity_df,
features=store.get_feature_service("model_v1"),
).to_df()
print(training_df.head())from feast import FeatureStore
store = FeatureStore(repo_path=".")
# Get the latest feature values for unique entities
entity_sql = f"""
SELECT
driver_id,
CURRENT_TIMESTAMP() as event_timestamp
FROM {store.get_data_source("driver_hourly_stats_source").get_table_query_string()}
WHERE event_timestamp BETWEEN '2021-01-01' and '2021-12-31'
GROUP BY driver_id
"""
batch_scoring_features = store.get_historical_features(
entity_df=entity_sql,
features=store.get_feature_service("model_v2"),
).to_df()
# predictions = model.predict(batch_scoring_features)from feast import RepoConfig, FeatureStore
from feast.repo_config import RegistryConfig
repo_config = RepoConfig(
registry=RegistryConfig(path="gs://feast-test-gcs-bucket/registry.pb"),
project="feast_demo_gcp",
provider="gcp",
)
store = FeatureStore(config=repo_config)
features = store.get_online_features(
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_daily_features:daily_miles_driven",
],
entity_rows=[
{
"driver_id": 1001,
}
],
).to_dict()import requests
import json
online_request = {
"features": [
"driver_hourly_stats:conv_rate",
],
"entities": {"driver_id": [1001, 1002]},
}
r = requests.post('http://localhost:6566/get-online-features', data=json.dumps(online_request))
print(json.dumps(r.json(), indent=4, sort_keys=True))from feast import FeatureStore, RepoConfig
from feast.repo_config import RegistryConfig
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
from feast.infra.offline_stores.contrib.spark_offline_store.spark import SparkOfflineStoreConfig
repo_config = RepoConfig(
registry="s3://[YOUR_BUCKET]/feast-registry.db",
project="feast_repo",
provider="aws",
offline_store=SparkOfflineStoreConfig(
spark_conf={
"spark.ui.enabled": "false",
"spark.eventLog.enabled": "false",
"spark.sql.catalogImplementation": "hive",
"spark.sql.parser.quotedRegexColumnNames": "true",
"spark.sql.session.timeZone": "UTC"
}
),
batch_engine={
"type": "spark.engine",
"partitions": 10
},
online_store=DynamoDBOnlineStoreConfig(region="us-west-1"),
entity_key_serialization_version=2
)
store = FeatureStore(config=repo_config)...
offline_store:
type: snowflake.offline
...
batch_engine:
type: snowflake.engine
account: snowflake_deployment.us-east-1
user: user_login
password: user_password
role: sysadmin
warehouse: demo_wh
database: FEASTproject: my_feature_repo
registry: gs://my-bucket/data/registry.db
provider: gcpkubectl create secret generic -n bytewax aws-credentials --from-literal=aws-access-key-id='<access key id>' --from-literal=aws-secret-access-key='<secret access key>'batch_engine:
type: bytewax
namespace: bytewax
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: aws-access-key-id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: aws-secret-access-keybatch_engine:
type: bytewax
namespace: bytewax
image: bytewax/bytewax-feast:latestDOCKER_BUILDKIT=1 docker build . -f ./sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile -t <image tag>batch_engine:
type: bytewax
namespace: bytewax
image: <image tag>$ tree -a
.
├── data
│ └── driver_stats.parquet
├── driver_features.py
├── feature_store.yaml
└── .feastignore
1 directory, 4 filesproject: my_feature_repo_1
registry: data/metadata.db
provider: local
online_store:
path: data/online_store.db# Ignore virtual environment
venv
# Ignore a specific Python file
scripts/foo.py
# Ignore all Python files directly under scripts directory
scripts/*.py
# Ignore all "foo.py" anywhere under scripts directory
scripts/**/foo.pyfrom datetime import timedelta
from feast import BigQuerySource, Entity, Feature, FeatureView, Field
from feast.types import Float32, Int64, String
driver_locations_source = BigQuerySource(
table_ref="rh_prod.ride_hailing_co.drivers",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)
driver = Entity(
name="driver",
description="driver id",
)
driver_locations = FeatureView(
name="driver_locations",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="lat", dtype=Float32),
Field(name="lon", dtype=String),
Field(name="driver", dtype=Int64),
],
source=driver_locations_source,
)
from feast.infra.offline_stores.contrib.mssql_offline_store.mssqlserver_source import (
MsSqlServerSource,
)
driver_hourly_table = "driver_hourly"
driver_source = MsSqlServerSource(
table_ref=driver_hourly_table,
event_timestamp_column="datetime",
created_timestamp_column="created",
)# Define Python callable
def materialize():
repo_config = RepoConfig(
registry=RegistryConfig(path="s3://[YOUR BUCKET]/registry.pb"),
project="feast_demo_aws",
provider="aws",
offline_store="file",
online_store=DynamoDBOnlineStoreConfig(region="us-west-2")
)
store = FeatureStore(config=repo_config)
store.materialize_incremental(datetime.datetime.now())
# (In production) Use Airflow PythonOperator
materialize_python = PythonOperator(
task_id='materialize_python',
python_callable=materialize,
)CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
feast materialize-incremental $CURRENT_TIME# Use BashOperator
materialize_bash = BashOperator(
task_id='materialize',
bash_command=f'feast materialize-incremental {datetime.datetime.now().replace(microsecond=0).isoformat()}',
)from feast import RedshiftSource
my_redshift_source = RedshiftSource(
query="SELECT timestamp as ts, created, f1, f2 "
"FROM redshift_table",
)from feast import SnowflakeSource
my_snowflake_source = SnowflakeSource(
database="FEAST",
schema="PUBLIC",
table="FEATURE_TABLE",
)from feast import SnowflakeSource
my_snowflake_source = SnowflakeSource(
query="""
SELECT
timestamp_column AS "ts",
"created",
"f1",
"f2"
FROM
`FEAST.PUBLIC.FEATURE_TABLE`
""",
)

features = [
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate"
]fs = FeatureStore(repo_path="path/to/feature/repo")
online_features = fs.get_online_features(
features=features,
entity_rows=[
# {join_key: entity_value, ...}
{"driver_id": 1001},
{"driver_id": 1002}]
).to_dict(){
"driver_hourly_stats__acc_rate":[
0.2897740304470062,
0.6447265148162842
],
"driver_hourly_stats__conv_rate":[
0.6508077383041382,
0.14802511036396027
],
"driver_id":[
1001,
1002
]
}from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
my_spark_source = SparkSource(
table="FEATURE_TABLE",
)from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
my_spark_source = SparkSource(
query="SELECT timestamp as ts, created, f1, f2 "
"FROM spark_table",
)from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
my_spark_source = SparkSource(
path=f"{CURRENT_DIR}/data/driver_hourly_stats",
file_format="parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)driver = Entity(name='driver', join_keys=['driver_id'])from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64
from datetime import timedelta
driver = Entity(name="driver", join_keys=["driver_id"])
driver_stats_fv = FeatureView(
name="driver_hourly_stats",
entities=[driver],
schema=[
Field(name="trips_today", dtype=Int64),
Field(name="earnings_today", dtype=Float32),
],
ttl=timedelta(hours=2),
source=FileSource(
path="driver_hourly_stats.parquet"
)
)
# Read in entity dataframe
entity_df = pd.read_csv("entity_df.csv")
training_df = store.get_historical_features(
entity_df=entity_df,
features = [
'driver_hourly_stats:trips_today',
'driver_hourly_stats:earnings_today'
],
)feast apply. This CLI command updates infrastructure and persists definitions in the object store registry.




$ feast repo-upgrade --write$ feast repo-upgrade --write
--- /Users/achal/feast/prompt_dory/example.py
+++ /Users/achal/feast/prompt_dory/example.py
@@ -13,7 +13,6 @@
path="/Users/achal/feast/prompt_dory/data/driver_stats.parquet",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
- date_partition_column="created"
)
# Define an entity for the driver. You can think of entity as a primary key used to
--- /Users/achal/feast/prompt_dory/example.py
+++ /Users/achal/feast/prompt_dory/example.py
@@ -3,7 +3,7 @@
from google.protobuf.duration_pb2 import Duration
import pandas as pd
-from feast import Entity, Feature, FeatureView, FileSource, ValueType, FeatureService, OnDemandFeatureView
+from feast import Entity, FeatureView, FileSource, ValueType, FeatureService, OnDemandFeatureView
# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
--- /Users/achal/feast/prompt_dory/example.py
+++ /Users/achal/feast/prompt_dory/example.py
@@ -4,6 +4,7 @@
import pandas as pd
from feast import Entity, Feature, FeatureView, FileSource, ValueType, FeatureService, OnDemandFeatureView
+from feast import Field
# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
--- /Users/achal/feast/prompt_dory/example.py
+++ /Users/achal/feast/prompt_dory/example.py
@@ -28,9 +29,9 @@
entities=[driver_id],
ttl=Duration(seconds=86400 * 365),
features=[
- Feature(name="conv_rate", dtype=ValueType.FLOAT),
- Feature(name="acc_rate", dtype=ValueType.FLOAT),
- Feature(name="avg_daily_trips", dtype=ValueType.INT64),
+ Field(name="conv_rate", dtype=ValueType.FLOAT),
+ Field(name="acc_rate", dtype=ValueType.FLOAT),
+ Field(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
batch_source=driver_hourly_stats,from datetime import timedelta
from feast import Field, FileSource, KinesisSource, stream_feature_view
from feast.data_format import JsonFormat
from feast.types import Float32
driver_stats_batch_source = FileSource(
name="driver_stats_source",
path="data/driver_stats.parquet",
timestamp_field="event_timestamp",
)
driver_stats_stream_source = KinesisSource(
name="driver_stats_stream",
stream_name="drivers",
timestamp_field="event_timestamp",
batch_source=driver_stats_batch_source,
record_format=JsonFormat(
schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
),
watermark_delay_threshold=timedelta(minutes=5),
)@stream_feature_view(
entities=[driver],
ttl=timedelta(seconds=8640000000),
mode="spark",
schema=[
Field(name="conv_percentage", dtype=Float32),
Field(name="acc_percentage", dtype=Float32),
],
timestamp_field="event_timestamp",
online=True,
source=driver_stats_stream_source,
)
def driver_hourly_stats_stream(df: DataFrame):
from pyspark.sql.functions import col
return (
df.withColumn("conv_percentage", col("conv_rate") * 100.0)
.withColumn("acc_percentage", col("acc_rate") * 100.0)
.drop("conv_rate", "acc_rate")
)project: my_feature_repo
registry: data/registry.db
provider: gcp
online_store:
type: datastore
project_id: my_gcp_project
namespace: my_datastore_namespaceInitial demonstration of Snowflake as an offline+online store with Feast, using the Snowflake demo template.
pip install 'feast[snowflake]'feast init -t snowflake {feature_repo_name}
Snowflake Deployment URL (exclude .snowflakecomputing.com):
Snowflake User Name::
Snowflake Password::
Snowflake Role Name (Case Sensitive)::
Snowflake Warehouse Name (Case Sensitive)::
Snowflake Database Name (Case Sensitive)::
Should I upload example data to Snowflake (overwrite table)? [Y/n]: Y
cd {feature_repo_name}project: ...
registry: ...
provider: local
offline_store:
type: snowflake.offline
account: SNOWFLAKE_DEPLOYMENT_URL #drop .snowflakecomputing.com
user: USERNAME
password: PASSWORD
role: ROLE_NAME #case sensitive
warehouse: WAREHOUSE_NAME #case sensitive
database: DATABASE_NAME #case cap sensitive
batch_engine:
type: snowflake.engine
account: SNOWFLAKE_DEPLOYMENT_URL #drop .snowflakecomputing.com
user: USERNAME
password: PASSWORD
role: ROLE_NAME #case sensitive
warehouse: WAREHOUSE_NAME #case sensitive
database: DATABASE_NAME #case cap sensitive
online_store:
type: snowflake.online
account: SNOWFLAKE_DEPLOYMENT_URL #drop .snowflakecomputing.com
user: USERNAME
password: PASSWORD
role: ROLE_NAME #case sensitive
warehouse: WAREHOUSE_NAME #case sensitive
database: DATABASE_NAME #case cap sensitivepython test.pyfrom datetime import datetime, timedelta
import pandas as pd
from driver_repo import driver, driver_stats_fv
from feast import FeatureStore
fs = FeatureStore(repo_path=".")
fs.apply([driver, driver_stats_fv])entity_df = pd.DataFrame(
{
"event_timestamp": [
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
for dt in pd.date_range(
start=datetime.now() - timedelta(days=3),
end=datetime.now(),
periods=3,
)
],
"driver_id": [1001, 1002, 1003],
}
)
features = ["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"]
training_df = fs.get_historical_features(
features=features, entity_df=entity_df
).to_df()fs.materialize_incremental(end_date=datetime.now())online_features = fs.get_online_features(
features=features,
entity_rows=[
# {join_key: entity_value}
{"driver_id": 1001},
{"driver_id": 1002}
],
).to_dict()project: my_feature_repo
registry: gs://my-bucket/data/registry.db
provider: gcp
offline_store:
type: bigquery
dataset: feast_bq_datasetproject: feature_repo
registry: data/registry.db
provider: local
offline_store:
type: feast_trino.trino.TrinoOfflineStore
host: localhost
port: 8080
catalog: memory
connector:
type: memory
online_store:
path: data/online_store.dbproject: my_feature_repo
registry: data/registry.db
provider: aws
online_store:
type: dynamodb
region: us-west-2
offline_store:
type: redshift
region: us-west-2
cluster_id: feast-cluster
database: feast-database
user: redshift-user
s3_staging_location: s3://feast-bucket/redshift
iam_role: arn:aws:iam::123456789012:role/redshift_s3_access_roleproject: loyal_spider
registry: data/registry.db
provider: local
online_store:
type: sqlite
path: data/online_store.dbregistry:
registry_store_type: AzureRegistryStore
path: ${REGISTRY_PATH} # Environment Variable
project: production
provider: azure
online_store:
type: redis
connection_string: ${REDIS_CONN} # Environment Variableproject: my_feature_repo
registry: data/registry.db
provider: local
online_store:
type: snowflake.online
account: SNOWFLAKE_DEPLOYMENT_URL
user: SNOWFLAKE_USER
password: SNOWFLAKE_PASSWORD
role: SNOWFLAKE_ROLE
warehouse: SNOWFLAKE_WAREHOUSE
database: SNOWFLAKE_DATABASEdriver_stats_fv = FeatureView(
...
tags={"snowflake-online-store/online_path": '"FEAST"."ONLINE"'},
)sdk/python/tests/integration/feature_repos/repo_configuration.py# Only prints out runtime warnings once.
warnings.simplefilter("once", RuntimeWarning)
def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
"""
An example of creating managing the tables needed for a mysql-backed online store.
"""
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
project = config.project
for table in tables_to_keep:
cur.execute(
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), feature_name VARCHAR(256), value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
)
cur.execute(
f"CREATE INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)} (entity_key);"
)
for table in tables_to_delete:
cur.execute(
f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};"
)
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")
def teardown(
self,
config: RepoConfig,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
):
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
project = config.project
for table in tables:
cur.execute(
f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};"
)
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")# Only prints out runtime warnings once.
warnings.simplefilter("once", RuntimeWarning)
def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
project = config.project
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)
for feature_name, val in values.items():
self.write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val)
self._conn.commit()
if progress:
progress(1)
def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
project = config.project
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
print(f"entity_key_bin: {entity_key_bin}")
cur.execute(
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s",
(entity_key_bin,),
)
res = {}
res_ts = None
for feature_name, val_bin, ts in cur.fetchall():
val = ValueProto()
val.ParseFromString(val_bin)
res[feature_name] = val
res_ts = ts
if not res:
result.append((None, None))
else:
result.append((res_ts, res))
return resultclass MySQLOnlineStoreConfig(FeastConfigBaseModel):
type: Literal["feast_custom_online_store.mysql.MySQLOnlineStore"] = "feast_custom_online_store.mysql.MySQLOnlineStore"
host: Optional[StrictStr] = None
user: Optional[StrictStr] = None
password: Optional[StrictStr] = None
database: Optional[StrictStr] = Noneonline_store:
type: feast_custom_online_store.mysql.MySQLOnlineStore
user: foo
password: bardef online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
online_store_config = config.online_store
assert isinstance(online_store_config, MySQLOnlineStoreConfig)
connection = mysql.connector.connect(
host=online_store_config.host or "127.0.0.1",
user=online_store_config.user or "root",
password=online_store_config.password,
database=online_store_config.database or "feast",
autocommit=True
)project: test_custom
registry: data/registry.db
provider: local
online_store:
# Make sure to specify the type as the fully qualified path that Feast can import.
type: feast_custom_online_store.mysql.MySQLOnlineStore
user: foo
password: barproject: test_custom
registry: data/registry.db
provider: local
online_store: feast_custom_online_store.mysql.MySQLOnlineStoremake test-pythonfrom feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source import (
PostgreSQLDataSourceCreator,
)
AVAILABLE_ONLINE_STORES = {"postgres": (None, PostgreSQLDataSourceCreator)}{
"sqlite": ({"type": "sqlite"}, None),
# Specifies sqlite as the online store. The `None` object specifies to not use a containerized docker container.
}class RedisOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
def create_online_store(self) -> Dict[str, str]:
self.container.start()
log_string_to_wait_for = "Ready to accept connections"
wait_for_logs(
container=self.container, predicate=log_string_to_wait_for, timeout=10
)
self.container.stop()test-python-universal-cassandra:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.cassandra_repo_configuration \
PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.cassandra \
FEAST_USAGE=False \
IS_TEST=True \
python -m pytest -x --integration \
sdk/python/testsexport PYTHON=<version>
make lock-python-ci-dependenciesmake build-sphinxFeatureStore.get_historical_features()make test-pythonexport FULL_REPO_CONFIGS_MODULE='feast_custom_offline_store.feast_tests'
make test-python-universal # Only prints out runtime warnings once.
warnings.simplefilter("once", RuntimeWarning)
def get_historical_features(self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry, project: str,
full_feature_names: bool = False) -> RetrievalJob:
""" Perform point-in-time correct join of features onto an entity dataframe(entity key and timestamp). More details about how this should work at https://docs.feast.dev/v/v0.6-branch/user-guide/feature-retrieval#3.-historical-feature-retrieval.
print("Getting historical features from my offline store")."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def pull_latest_from_table_or_query(self,
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime) -> RetrievalJob:
""" Pulls data from the offline store for use in materialization."""
print("Pulling latest features from my offline store")
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def pull_all_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
""" Optional method that returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def write_logged_features(
config: RepoConfig,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: BaseRegistry,
):
""" Optional method to have Feast support logging your online features."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
""" Optional method to have Feast support the offline push api for your offline store."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
passclass CustomFileOfflineStoreConfig(FeastConfigBaseModel):
""" Custom offline store config for local (file-based) store """
type: Literal["feast_custom_offline_store.file.CustomFileOfflineStore"] \
= "feast_custom_offline_store.file.CustomFileOfflineStore"
uri: str # URI for your offline store(in this case it would be a path)project: my_project
registry: data/registry.db
provider: local
offline_store:
type: feast_custom_offline_store.file.CustomFileOfflineStore
uri: <File URI>
online_store:
path: data/online_store.db def get_historical_features(self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry, project: str,
full_feature_names: bool = False) -> RetrievalJob:
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
offline_store_config = config.offline_store
assert isinstance(offline_store_config, CustomFileOfflineStoreConfig)
store_type = offline_store_config.typeclass CustomFileRetrievalJob(RetrievalJob):
def __init__(self, evaluation_function: Callable):
"""Initialize a lazy historical retrieval job"""
# The evaluation function executes a stored procedure to compute a historical retrieval.
self.evaluation_function = evaluation_function
def to_df(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
print("Getting a pandas DataFrame from a File is easy!")
df = self.evaluation_function()
return df
def to_arrow(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
print("Getting a pandas DataFrame from a File is easy!")
df = self.evaluation_function()
return pyarrow.Table.from_pandas(df)
def to_remote_storage(self):
# Optional method to write to an offline storage location to support scalable batch materialization.
passclass CustomFileDataSource(FileSource):
"""Custom data source class for local files"""
def __init__(
self,
timestamp_field: Optional[str] = "",
path: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
created_timestamp_column: Optional[str] = "",
date_partition_column: Optional[str] = "",
):
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
super(CustomFileDataSource, self).__init__(
timestamp_field=timestamp_field,
created_timestamp_column,
field_mapping,
date_partition_column,
)
self._path = path
@staticmethod
def from_proto(data_source: DataSourceProto):
custom_source_options = str(
data_source.custom_options.configuration, encoding="utf8"
)
path = json.loads(custom_source_options)["path"]
return CustomFileDataSource(
field_mapping=dict(data_source.field_mapping),
path=path,
timestamp_field=data_source.timestamp_field,
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
)
def to_proto(self) -> DataSourceProto:
config_json = json.dumps({"path": self.path})
data_source_proto = DataSourceProto(
type=DataSourceProto.CUSTOM_SOURCE,
custom_options=DataSourceProto.CustomSourceOptions(
configuration=bytes(config_json, encoding="utf8")
),
)
data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column
return data_source_protoproject: test_custom
registry: data/registry.db
provider: local
offline_store:
# Make sure to specify the type as the fully qualified path that Feast can import.
type: feast_custom_offline_store.file.CustomFileOfflineStoreproject: test_custom
registry: data/registry.db
provider: local
offline_store: feast_custom_offline_store.file.CustomFileOfflineStoredriver_hourly_stats = CustomFileDataSource(
path="feature_repo/data/driver_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
driver_hourly_stats_view = FeatureView(
source=driver_hourly_stats,
...
)test-python-universal-spark:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.spark_repo_configuration \
PYTEST_PLUGINS=feast.infra.offline_stores.contrib.spark_offline_store.tests \
FEAST_USAGE=False IS_TEST=True \
python -m pytest -n 8 --integration \
-k "not test_historical_retrieval_fails_on_validation and \
not test_historical_retrieval_with_validation and \
not test_historical_features_persisting and \
not test_historical_retrieval_fails_on_validation and \
not test_universal_cli and \
not test_go_feature_server and \
not test_feature_logging and \
not test_reorder_columns and \
not test_logged_features_validation and \
not test_lambda_materialization_consistency and \
not test_offline_write and \
not test_push_features_to_offline_store.py and \
not gcs_registry and \
not s3_registry and \
not test_universal_types" \
sdk/python/testsexport PYTHON=<version>
make lock-python-ci-dependenciesmake build-sphinx# Should go in sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py
from feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source import (
PostgreSQLDataSourceCreator,
)
AVAILABLE_OFFLINE_STORES = [("local", PostgreSQLDataSourceCreator)]