from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.infra.materialization import LocalMaterializationEngine, LocalMaterializationJob, MaterializationTask
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.repo_config import RepoConfig
class MyCustomEngine(LocalMaterializationEngine):
def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
super().__init__(
repo_config=repo_config,
offline_store=offline_store,
online_store=online_store,
**kwargs,
)
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
print("Creating new infrastructure is easy here!")
pass
def materialize(
self, registry, tasks: List[MaterializationTask]
) -> List[LocalMaterializationJob]:
print("Launching custom batch jobs or multithreading things is pretty easy...")
return [
self._materialize_one(
registry,
task.feature_view,
task.start_time,
task.end_time,
task.project,
task.tqdm_builder,
)
for task in tasks
]project: repo
registry: registry.db
batch_engine: feast_custom_engine.MyCustomEngine
online_store:
type: sqlite
path: online_store.db
offline_store:
type: filefeast applyRegistered entity driver_id
Registered feature view driver_hourly_stats
Deploying infrastructure for driver_hourly_stats
Creating new infrastructure is easy here!PYTHONPATH=$PYTHONPATH:/home/my_user/my_custom_engine feast applyfrom datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
from feast.infra.local import LocalProvider
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.infra.registry.registry import Registry
from feast.repo_config import RepoConfig
class MyCustomProvider(LocalProvider):
def __init__(self, config: RepoConfig, repo_path):
super().__init__(config)
# Add your custom init code here. This code runs on every Feast operation.
def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
super().update_infra(
project,
tables_to_delete,
tables_to_keep,
entities_to_delete,
entities_to_keep,
partial,
)
print("Launching custom streaming jobs is pretty easy...")
def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
super().materialize_single_feature_view(
config, feature_view, start_date, end_date, registry, project, tqdm_builder
)
print("Launching custom batch jobs is pretty easy...")project: repo
registry: registry.db
provider: feast_custom_provider.custom_provider.MyCustomProvider
online_store:
type: sqlite
path: online_store.db
offline_store:
type: filefeast applyRegistered entity driver_id
Registered feature view driver_hourly_stats
Deploying infrastructure for driver_hourly_stats
Launching custom streaming jobs is pretty easy...PYTHONPATH=$PYTHONPATH:/home/my_user/my_custom_provider feast applyfeature_store.yamlsdk/python/tests/integration/feature_repos/repo_configuration.py# Only prints out runtime warnings once.
warnings.simplefilter("once", RuntimeWarning)
def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
"""
An example of creating managing the tables needed for a mysql-backed online store.
"""
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
project = config.project
for table in tables_to_keep:
cur.execute(
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), feature_name VARCHAR(256), value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
)
cur.execute(
f"CREATE INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)} (entity_key);"
)
for table in tables_to_delete:
cur.execute(
f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};"
)
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")
def teardown(
self,
config: RepoConfig,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
):
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
project = config.project
for table in tables:
cur.execute(
f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};"
)
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")# Only prints out runtime warnings once.
warnings.simplefilter("once", RuntimeWarning)
def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
project = config.project
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)
for feature_name, val in values.items():
self.write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val)
self._conn.commit()
if progress:
progress(1)
def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
project = config.project
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
print(f"entity_key_bin: {entity_key_bin}")
cur.execute(
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s",
(entity_key_bin,),
)
res = {}
res_ts = None
for feature_name, val_bin, ts in cur.fetchall():
val = ValueProto()
val.ParseFromString(val_bin)
res[feature_name] = val
res_ts = ts
if not res:
result.append((None, None))
else:
result.append((res_ts, res))
return resultclass MySQLOnlineStoreConfig(FeastConfigBaseModel):
type: Literal["feast_custom_online_store.mysql.MySQLOnlineStore"] = "feast_custom_online_store.mysql.MySQLOnlineStore"
host: Optional[StrictStr] = None
user: Optional[StrictStr] = None
password: Optional[StrictStr] = None
database: Optional[StrictStr] = Noneonline_store:
type: feast_custom_online_store.mysql.MySQLOnlineStore
user: foo
password: bardef online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
online_store_config = config.online_store
assert isinstance(online_store_config, MySQLOnlineStoreConfig)
connection = mysql.connector.connect(
host=online_store_config.host or "127.0.0.1",
user=online_store_config.user or "root",
password=online_store_config.password,
database=online_store_config.database or "feast",
autocommit=True
)project: test_custom
registry: data/registry.db
provider: local
online_store:
# Make sure to specify the type as the fully qualified path that Feast can import.
type: feast_custom_online_store.mysql.MySQLOnlineStore
user: foo
password: barproject: test_custom
registry: data/registry.db
provider: local
online_store: feast_custom_online_store.mysql.MySQLOnlineStoremake test-pythonfrom feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source import (
PostgreSQLDataSourceCreator,
)
AVAILABLE_ONLINE_STORES = {"postgres": (None, PostgreSQLDataSourceCreator)}{
"sqlite": ({"type": "sqlite"}, None),
# Specifies sqlite as the online store. The `None` object specifies to not use a containerized docker container.
}class RedisOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
def create_online_store(self) -> Dict[str, str]:
self.container.start()
log_string_to_wait_for = "Ready to accept connections"
wait_for_logs(
container=self.container, predicate=log_string_to_wait_for, timeout=10
)
self.container.stop()test-python-universal-cassandra:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.cassandra_repo_configuration \
PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.cassandra \
FEAST_USAGE=False \
IS_TEST=True \
python -m pytest -x --integration \
sdk/python/testsexport PYTHON=<version>
make lock-python-ci-dependenciesmake build-sphinxget_historical_featuresFeatureStore.get_historical_features()BigQueryDataSourceCreatormake test-pythonexport FULL_REPO_CONFIGS_MODULE='feast_custom_offline_store.feast_tests'
make test-python-universal # Only prints out runtime warnings once.
warnings.simplefilter("once", RuntimeWarning)
def get_historical_features(self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry, project: str,
full_feature_names: bool = False) -> RetrievalJob:
""" Perform point-in-time correct join of features onto an entity dataframe(entity key and timestamp). More details about how this should work at https://docs.feast.dev/v/v0.6-branch/user-guide/feature-retrieval#3.-historical-feature-retrieval.
print("Getting historical features from my offline store")."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def pull_latest_from_table_or_query(self,
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime) -> RetrievalJob:
""" Pulls data from the offline store for use in materialization."""
print("Pulling latest features from my offline store")
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def pull_all_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
""" Optional method that returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def write_logged_features(
config: RepoConfig,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: BaseRegistry,
):
""" Optional method to have Feast support logging your online features."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
""" Optional method to have Feast support the offline push api for your offline store."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
passclass CustomFileOfflineStoreConfig(FeastConfigBaseModel):
""" Custom offline store config for local (file-based) store """
type: Literal["feast_custom_offline_store.file.CustomFileOfflineStore"] \
= "feast_custom_offline_store.file.CustomFileOfflineStore"
uri: str # URI for your offline store(in this case it would be a path)project: my_project
registry: data/registry.db
provider: local
offline_store:
type: feast_custom_offline_store.file.CustomFileOfflineStore
uri: <File URI>
online_store:
path: data/online_store.db def get_historical_features(self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry, project: str,
full_feature_names: bool = False) -> RetrievalJob:
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
offline_store_config = config.offline_store
assert isinstance(offline_store_config, CustomFileOfflineStoreConfig)
store_type = offline_store_config.typeclass CustomFileRetrievalJob(RetrievalJob):
def __init__(self, evaluation_function: Callable):
"""Initialize a lazy historical retrieval job"""
# The evaluation function executes a stored procedure to compute a historical retrieval.
self.evaluation_function = evaluation_function
def to_df(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
print("Getting a pandas DataFrame from a File is easy!")
df = self.evaluation_function()
return df
def to_arrow(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
print("Getting a pandas DataFrame from a File is easy!")
df = self.evaluation_function()
return pyarrow.Table.from_pandas(df)
def to_remote_storage(self):
# Optional method to write to an offline storage location to support scalable batch materialization.
passclass CustomFileDataSource(FileSource):
"""Custom data source class for local files"""
def __init__(
self,
timestamp_field: Optional[str] = "",
path: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
created_timestamp_column: Optional[str] = "",
date_partition_column: Optional[str] = "",
):
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
super(CustomFileDataSource, self).__init__(
timestamp_field=timestamp_field,
created_timestamp_column,
field_mapping,
date_partition_column,
)
self._path = path
@staticmethod
def from_proto(data_source: DataSourceProto):
custom_source_options = str(
data_source.custom_options.configuration, encoding="utf8"
)
path = json.loads(custom_source_options)["path"]
return CustomFileDataSource(
field_mapping=dict(data_source.field_mapping),
path=path,
timestamp_field=data_source.timestamp_field,
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
)
def to_proto(self) -> DataSourceProto:
config_json = json.dumps({"path": self.path})
data_source_proto = DataSourceProto(
type=DataSourceProto.CUSTOM_SOURCE,
custom_options=DataSourceProto.CustomSourceOptions(
configuration=bytes(config_json, encoding="utf8")
),
)
data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column
return data_source_protoproject: test_custom
registry: data/registry.db
provider: local
offline_store:
# Make sure to specify the type as the fully qualified path that Feast can import.
type: feast_custom_offline_store.file.CustomFileOfflineStoreproject: test_custom
registry: data/registry.db
provider: local
offline_store: feast_custom_offline_store.file.CustomFileOfflineStoredriver_hourly_stats = CustomFileDataSource(
path="feature_repo/data/driver_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
driver_hourly_stats_view = FeatureView(
source=driver_hourly_stats,
...
)test-python-universal-spark:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.spark_repo_configuration \
PYTEST_PLUGINS=feast.infra.offline_stores.contrib.spark_offline_store.tests \
FEAST_USAGE=False IS_TEST=True \
python -m pytest -n 8 --integration \
-k "not test_historical_retrieval_fails_on_validation and \
not test_historical_retrieval_with_validation and \
not test_historical_features_persisting and \
not test_historical_retrieval_fails_on_validation and \
not test_universal_cli and \
not test_go_feature_server and \
not test_feature_logging and \
not test_reorder_columns and \
not test_logged_features_validation and \
not test_lambda_materialization_consistency and \
not test_offline_write and \
not test_push_features_to_offline_store.py and \
not gcs_registry and \
not s3_registry and \
not test_universal_types" \
sdk/python/testsexport PYTHON=<version>
make lock-python-ci-dependenciesmake build-sphinx# Should go in sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py
from feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source import (
PostgreSQLDataSourceCreator,
)
AVAILABLE_OFFLINE_STORES = [("local", PostgreSQLDataSourceCreator)]