Feast batch materialization operations (materialize and materialize-incremental) execute through a BatchMaterializationEngine.
Custom batch materialization engines allow Feast users to extend Feast to customize the materialization process. Examples include:
Setting up custom materialization-specific infrastructure during feast apply (e.g. setting up Spark clusters or Lambda Functions)
Launching custom batch ingestion (materialization) jobs (Spark, Beam, AWS Lambda)
Tearing down custom materialization-specific infrastructure during feast teardown (e.g. tearing down Spark clusters, or deleting Lambda Functions)
Feast comes with built-in materialization engines, e.g, LocalMaterializationEngine, and an experimental LambdaMaterializationEngine. However, users can develop their own materialization engines by creating a class that implements the contract in the BatchMaterializationEngine class.
The fastest way to add custom logic to Feast is to extend an existing materialization engine. The most generic engine is the LocalMaterializationEngine which contains no cloud-specific logic. The guide that follows will extend the LocalProvider with operations that print text to the console. It is up to you as a developer to add your custom code to the engine methods, but the guide below will provide the necessary scaffolding to get you started.
The first step is to define a custom materialization engine class. We've created the MyCustomEngine below.
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.infra.materialization import LocalMaterializationEngine, LocalMaterializationJob, MaterializationTask
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.repo_config import RepoConfig
class MyCustomEngine(LocalMaterializationEngine):
def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
super().__init__(
repo_config=repo_config,
offline_store=offline_store,
online_store=online_store,
**kwargs,
)
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
print("Creating new infrastructure is easy here!")
pass
def materialize(
self, registry, tasks: List[MaterializationTask]
) -> List[LocalMaterializationJob]:
print("Launching custom batch jobs or multithreading things is pretty easy...")
return [
self._materialize_one(
registry,
task.feature_view,
task.start_time,
task.end_time,
task.project,
task.tqdm_builder,
)
for task in tasks
]Notice how in the above engine we have only overwritten two of the methods on the LocalMaterializatinEngine, namely update and materialize. These two methods are convenient to replace if you are planning to launch custom batch jobs.
Configure your feature_store.yaml file to point to your new engine class:
project: repo
registry: registry.db
batch_engine: feast_custom_engine.MyCustomEngine
online_store:
type: sqlite
path: online_store.db
offline_store:
type: fileNotice how the batch_engine field above points to the module and class where your engine can be found.
Now you should be able to use your engine by running a Feast command:
feast applyRegistered entity driver_id
Registered feature view driver_hourly_stats
Deploying infrastructure for driver_hourly_stats
Creating new infrastructure is easy here!It may also be necessary to add the module root path to your PYTHONPATH as follows:
PYTHONPATH=$PYTHONPATH:/home/my_user/my_custom_engine feast applyThat's it. You should now have a fully functional custom engine!
Feast is highly pluggable and configurable:
One can use existing plugins (offline store, online store, batch materialization engine, providers) and configure those using the built in options. See reference documentation for details.
The other way to customize Feast is to build your own custom components, and then point Feast to delegate to them.
Below are some guides on how to add new custom components:
Adding a new offline storeAdding a new online storeAdding a custom batch materialization engineAdding a custom providerAll Feast operations execute through a provider. Operations like materializing data from the offline to the online store, updating infrastructure like databases, launching streaming ingestion jobs, building training datasets, and reading features from the online store.
Custom providers allow Feast users to extend Feast to execute any custom logic. Examples include:
Launching custom streaming ingestion jobs (Spark, Beam)
Launching custom batch ingestion (materialization) jobs (Spark, Beam)
Adding custom validation to feature repositories during feast apply
Adding custom infrastructure setup logic which runs during feast apply
Extending Feast commands with in-house metrics, logging, or tracing
Feast comes with built-in providers, e.g, LocalProvider, GcpProvider, and AwsProvider. However, users can develop their own providers by creating a class that implements the contract in the Provider class.
The fastest way to add custom logic to Feast is to extend an existing provider. The most generic provider is the LocalProvider which contains no cloud-specific logic. The guide that follows will extend the LocalProvider with operations that print text to the console. It is up to you as a developer to add your custom code to the provider methods, but the guide below will provide the necessary scaffolding to get you started.
The first step is to define a custom provider class. We've created the MyCustomProvider below.
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from feast.entity import Entity
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
from feast.infra.local import LocalProvider
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.infra.registry.registry import Registry
from feast.repo_config import RepoConfig
class MyCustomProvider(LocalProvider):
def __init__(self, config: RepoConfig, repo_path):
super().__init__(config)
# Add your custom init code here. This code runs on every Feast operation.
def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
super().update_infra(
project,
tables_to_delete,
tables_to_keep,
entities_to_delete,
entities_to_keep,
partial,
)
print("Launching custom streaming jobs is pretty easy...")
def materialize_single_feature_view(
self,
config: RepoConfig,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
super().materialize_single_feature_view(
config, feature_view, start_date, end_date, registry, project, tqdm_builder
)
print("Launching custom batch jobs is pretty easy...")Notice how in the above provider we have only overwritten two of the methods on the LocalProvider, namely update_infra and materialize_single_feature_view. These two methods are convenient to replace if you are planning to launch custom batch or streaming jobs. update_infra can be used for launching idempotent streaming jobs, and materialize_single_feature_view can be used for launching batch ingestion jobs.
It is possible to overwrite all the methods on the provider class. In fact, it isn't even necessary to subclass an existing provider like LocalProvider. The only requirement for the provider class is that it follows the Provider contract.
Configure your feature_store.yaml file to point to your new provider class:
project: repo
registry: registry.db
provider: feast_custom_provider.custom_provider.MyCustomProvider
online_store:
type: sqlite
path: online_store.db
offline_store:
type: fileNotice how the provider field above points to the module and class where your provider can be found.
Now you should be able to use your provider by running a Feast command:
feast applyRegistered entity driver_id
Registered feature view driver_hourly_stats
Deploying infrastructure for driver_hourly_stats
Launching custom streaming jobs is pretty easy...It may also be necessary to add the module root path to your PYTHONPATH as follows:
PYTHONPATH=$PYTHONPATH:/home/my_user/my_custom_provider feast applyThat's it. You should now have a fully functional custom provider!
Have a look at the custom provider demo repository for a fully functional example of a custom provider. Feel free to fork it when creating your own custom provider!
Feast makes adding support for a new offline store easy. Developers can simply implement the OfflineStore interface to add support for a new store (other than the existing stores like Parquet files, Redshift, and Bigquery).
In this guide, we will show you how to extend the existing File offline store and use in a feature repo. While we will be implementing a specific store, this guide should be representative for adding support for any new offline store.
The full working code for this guide can be found at feast-dev/feast-custom-offline-store-demo.
The process for using a custom offline store consists of 8 steps:
Defining an OfflineStore class.
Defining an OfflineStoreConfig class.
Defining a RetrievalJob class for this offline store.
Defining a DataSource class for the offline store
Referencing the OfflineStore in a feature repo's feature_store.yaml file.
Testing the OfflineStore class.
Updating dependencies.
Adding documentation.
New offline stores go in sdk/python/feast/infra/offline_stores/contrib/.
Not guaranteed to implement all interface methods
Not guaranteed to be stable.
Should have warnings for users to indicate this is a contrib plugin that is not maintained by the maintainers.
To move an offline store plugin out of contrib, you need:
GitHub actions (i.e make test-python-integration) is setup to run all tests against the offline store and pass.
At least two contributors own the plugin (ideally tracked in our OWNERS / CODEOWNERS file).
The OfflineStore class contains a couple of methods to read features from the offline store. Unlike the OnlineStore class, Feast does not manage any infrastructure for the offline store.
To fully implement the interface for the offline store, you will need to implement these methods:
pull_latest_from_table_or_query is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method. This method pull data from the offline store, and the FeatureStore class takes care of writing this data into the online store.
get_historical_features is invoked when reading values from the offline store using the FeatureStore.get_historical_features() method. Typically, this method is used to retrieve features when training ML models.
(optional) offline_write_batch is a method that supports directly pushing a pyarrow table to a feature view. Given a feature view with a specific schema, this function should write the pyarrow table to the batch source defined. More details about the push api can be found here. This method only needs implementation if you want to support the push api in your offline store.
(optional) pull_all_from_table_or_query is a method that pulls all the data from an offline store from a specified start date to a specified end date. This method is only used for SavedDatasets as part of data quality monitoring validation.
(optional) write_logged_features is a method that takes a pyarrow table or a path that points to a parquet file and writes the data to a defined source defined by LoggingSource and LoggingConfig. This method is only used internally for SavedDatasets.
# Only prints out runtime warnings once.
warnings.simplefilter("once", RuntimeWarning)
def get_historical_features(self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry, project: str,
full_feature_names: bool = False) -> RetrievalJob:
""" Perform point-in-time correct join of features onto an entity dataframe(entity key and timestamp). More details about how this should work at https://docs.feast.dev/v/v0.6-branch/user-guide/feature-retrieval#3.-historical-feature-retrieval.
print("Getting historical features from my offline store")."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def pull_latest_from_table_or_query(self,
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime) -> RetrievalJob:
""" Pulls data from the offline store for use in materialization."""
print("Pulling latest features from my offline store")
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def pull_all_from_table_or_query(
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
start_date: datetime,
end_date: datetime,
) -> RetrievalJob:
""" Optional method that returns a Retrieval Job for all join key columns, feature name columns, and the event timestamp columns that occur between the start_date and end_date."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def write_logged_features(
config: RepoConfig,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: BaseRegistry,
):
""" Optional method to have Feast support logging your online features."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
pass
def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
""" Optional method to have Feast support the offline push api for your offline store."""
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
# Implementation here.
passMost offline stores will have to perform some custom mapping of offline store datatypes to feast value types.
The function to implement here are source_datatype_to_feast_value_type and get_column_names_and_types in your DataSource class.
source_datatype_to_feast_value_type is used to convert your DataSource's datatypes to feast value types.
get_column_names_and_types retrieves the column names and corresponding datasource types.
Add any helper functions for type conversion to sdk/python/feast/type_map.py.
Be sure to implement correct type mapping so that Feast can process your feature columns without casting incorrectly that can potentially cause loss of information or incorrect data.
Additional configuration may be needed to allow the OfflineStore to talk to the backing store. For example, Redshift needs configuration information like the connection information for the Redshift instance, credentials for connecting to the database, etc.
To facilitate configuration, all OfflineStore implementations are required to also define a corresponding OfflineStoreConfig class in the same file. This OfflineStoreConfig class should inherit from the FeastConfigBaseModel class, which is defined here.
The FeastConfigBaseModel is a pydantic class, which parses yaml configuration into python objects. Pydantic also allows the model classes to define validators for the config classes, to make sure that the config classes are correctly defined.
This config class must container a type field, which contains the fully qualified class name of its corresponding OfflineStore class.
Additionally, the name of the config class must be the same as the OfflineStore class, with the Config suffix.
An example of the config class for the custom file offline store :
class CustomFileOfflineStoreConfig(FeastConfigBaseModel):
""" Custom offline store config for local (file-based) store """
type: Literal["feast_custom_offline_store.file.CustomFileOfflineStore"] \
= "feast_custom_offline_store.file.CustomFileOfflineStore"
uri: str # URI for your offline store(in this case it would be a path)This configuration can be specified in the feature_store.yaml as follows:
project: my_project
registry: data/registry.db
provider: local
offline_store:
type: feast_custom_offline_store.file.CustomFileOfflineStore
uri: <File URI>
online_store:
path: data/online_store.dbThis configuration information is available to the methods of the OfflineStore, via the config: RepoConfig parameter which is passed into the methods of the OfflineStore interface, specifically at the config.offline_store field of the config parameter. This fields in the feature_store.yaml should map directly to your OfflineStoreConfig class that is detailed above in Section 2.
def get_historical_features(self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry, project: str,
full_feature_names: bool = False) -> RetrievalJob:
warnings.warn(
"This offline store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
offline_store_config = config.offline_store
assert isinstance(offline_store_config, CustomFileOfflineStoreConfig)
store_type = offline_store_config.typeThe offline store methods aren't expected to perform their read operations eagerly. Instead, they are expected to execute lazily, and they do so by returning a RetrievalJob instance, which represents the execution of the actual query against the underlying store.
Custom offline stores may need to implement their own instances of the RetrievalJob interface.
The RetrievalJob interface exposes two methods - to_df and to_arrow. The expectation is for the retrieval job to be able to return the rows read from the offline store as a parquet DataFrame, or as an Arrow table respectively.
Users who want to have their offline store support scalable batch materialization for online use cases (detailed in this RFC) will also need to implement to_remote_storage to distribute the reading and writing of offline store records to blob storage (such as S3). This may be used by a custom Materialization Engine to parallelize the materialization of data by processing it in chunks. If this is not implemented, Feast will default to local materialization (pulling all records into memory to materialize).
class CustomFileRetrievalJob(RetrievalJob):
def __init__(self, evaluation_function: Callable):
"""Initialize a lazy historical retrieval job"""
# The evaluation function executes a stored procedure to compute a historical retrieval.
self.evaluation_function = evaluation_function
def to_df(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
print("Getting a pandas DataFrame from a File is easy!")
df = self.evaluation_function()
return df
def to_arrow(self):
# Only execute the evaluation function to build the final historical retrieval dataframe at the last moment.
print("Getting a pandas DataFrame from a File is easy!")
df = self.evaluation_function()
return pyarrow.Table.from_pandas(df)
def to_remote_storage(self):
# Optional method to write to an offline storage location to support scalable batch materialization.
passBefore this offline store can be used as the batch source for a feature view in a feature repo, a subclass of the DataSource base class needs to be defined. This class is responsible for holding information needed by specific feature views to support reading historical values from the offline store. For example, a feature view using Redshift as the offline store may need to know which table contains historical feature values.
The data source class should implement two methods - from_proto, and to_proto.
For custom offline stores that are not being implemented in the main feature repo, the custom_options field should be used to store any configuration needed by the data source. In this case, the implementer is responsible for serializing this configuration into bytes in the to_proto method and reading the value back from bytes in the from_proto method.
class CustomFileDataSource(FileSource):
"""Custom data source class for local files"""
def __init__(
self,
timestamp_field: Optional[str] = "",
path: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
created_timestamp_column: Optional[str] = "",
date_partition_column: Optional[str] = "",
):
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
super(CustomFileDataSource, self).__init__(
timestamp_field=timestamp_field,
created_timestamp_column,
field_mapping,
date_partition_column,
)
self._path = path
@staticmethod
def from_proto(data_source: DataSourceProto):
custom_source_options = str(
data_source.custom_options.configuration, encoding="utf8"
)
path = json.loads(custom_source_options)["path"]
return CustomFileDataSource(
field_mapping=dict(data_source.field_mapping),
path=path,
timestamp_field=data_source.timestamp_field,
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
)
def to_proto(self) -> DataSourceProto:
config_json = json.dumps({"path": self.path})
data_source_proto = DataSourceProto(
type=DataSourceProto.CUSTOM_SOURCE,
custom_options=DataSourceProto.CustomSourceOptions(
configuration=bytes(config_json, encoding="utf8")
),
)
data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column
return data_source_protoAfter implementing these classes, the custom offline store can be used by referencing it in a feature repo's feature_store.yaml file, specifically in the offline_store field. The value specified should be the fully qualified class name of the OfflineStore.
As long as your OfflineStore class is available in your Python environment, it will be imported by Feast dynamically at runtime.
To use our custom file offline store, we can use the following feature_store.yaml:
project: test_custom
registry: data/registry.db
provider: local
offline_store:
# Make sure to specify the type as the fully qualified path that Feast can import.
type: feast_custom_offline_store.file.CustomFileOfflineStoreIf additional configuration for the offline store is not required, then we can omit the other fields and only specify the type of the offline store class as the value for the offline_store.
project: test_custom
registry: data/registry.db
provider: local
offline_store: feast_custom_offline_store.file.CustomFileOfflineStoreFinally, the custom data source class can be use in the feature repo to define a data source, and refer to in a feature view definition.
driver_hourly_stats = CustomFileDataSource(
path="feature_repo/data/driver_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
driver_hourly_stats_view = FeatureView(
source=driver_hourly_stats,
...
)Even if you have created the OfflineStore class in a separate repo, you can still test your implementation against the Feast test suite, as long as you have Feast as a submodule in your repo.
In order to test against the test suite, you need to create a custom DataSourceCreator that implement our testing infrastructure methods, create_data_source and optionally, created_saved_dataset_destination.
create_data_source should create a datasource based on the dataframe passed in. It may be implemented by uploading the contents of the dataframe into the offline store and returning a datasource object pointing to that location. See BigQueryDataSourceCreator for an implementation of a data source creator.
created_saved_dataset_destination is invoked when users need to save the dataset for use in data validation. This functionality is still in alpha and is optional.
Make sure that your offline store doesn't break any unit tests first by running:
make test-pythonNext, set up your offline store to run the universal integration tests. These are integration tests specifically intended to test offline and online stores against Feast API functionality, to ensure that the Feast APIs works with your offline store.
Feast parametrizes integration tests using the FULL_REPO_CONFIGS variable defined in sdk/python/tests/integration/feature_repos/repo_configuration.py which stores different offline store classes for testing.
To overwrite the default configurations to use your own offline store, you can simply create your own file that contains a FULL_REPO_CONFIGS dictionary, and point Feast to that file by setting the environment variable FULL_REPO_CONFIGS_MODULE to point to that file. The module should add new IntegrationTestRepoConfig classes to the AVAILABLE_OFFLINE_STORES by defining an offline store that you would like Feast to test with.
A sample FULL_REPO_CONFIGS_MODULE looks something like this:
# Should go in sdk/python/feast/infra/offline_stores/contrib/postgres_repo_configuration.py
from feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source import (
PostgreSQLDataSourceCreator,
)
AVAILABLE_OFFLINE_STORES = [("local", PostgreSQLDataSourceCreator)]You should swap out the FULL_REPO_CONFIGS environment variable and run the integration tests against your offline store. In the example repo, the file that overwrites FULL_REPO_CONFIGS is feast_custom_offline_store/feast_tests.py, so you would run:
export FULL_REPO_CONFIGS_MODULE='feast_custom_offline_store.feast_tests'
make test-python-universalIf the integration tests fail, this indicates that there is a mistake in the implementation of this offline store!
Remember to add your datasource to repo_config.py similar to how we added spark, trino, etc, to the dictionary OFFLINE_STORE_CLASS_FOR_TYPE. This will allow Feast to load your class from the feature_store.yaml.
Finally, add a Makefile target to the Makefile to run your datastore specific tests by setting the FULL_REPO_CONFIGS_MODULE and PYTEST_PLUGINS environment variable. The PYTEST_PLUGINS environment variable allows pytest to load in the DataSourceCreator for your datasource. You can remove certain tests that are not relevant or still do not work for your datastore using the -k option.
test-python-universal-spark:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.spark_repo_configuration \
PYTEST_PLUGINS=feast.infra.offline_stores.contrib.spark_offline_store.tests \
FEAST_USAGE=False IS_TEST=True \
python -m pytest -n 8 --integration \
-k "not test_historical_retrieval_fails_on_validation and \
not test_historical_retrieval_with_validation and \
not test_historical_features_persisting and \
not test_historical_retrieval_fails_on_validation and \
not test_universal_cli and \
not test_go_feature_server and \
not test_feature_logging and \
not test_reorder_columns and \
not test_logged_features_validation and \
not test_lambda_materialization_consistency and \
not test_offline_write and \
not test_push_features_to_offline_store.py and \
not gcs_registry and \
not s3_registry and \
not test_universal_types" \
sdk/python/testsAdd any dependencies for your offline store to our sdk/python/setup.py under a new <OFFLINE_STORE>__REQUIRED list with the packages and add it to the setup script so that if your offline store is needed, users can install the necessary python packages. These packages should be defined as extras so that they are not installed by users by default. You will need to regenerate our requirements files. To do this, create separate pyenv environments for python 3.8, 3.9, and 3.10. In each environment, run the following commands:
export PYTHON=<version>
make lock-python-ci-dependenciesRemember to add documentation for your offline store.
Add a new markdown file to docs/reference/offline-stores/ and docs/reference/data-sources/. Use these files to document your offline store functionality similar to how the other offline stores are documented.
You should also add a reference in docs/reference/data-sources/README.md and docs/SUMMARY.md to these markdown files.
NOTE: Be sure to document the following things about your offline store:
How to create the datasource and most what configuration is needed in the feature_store.yaml file in order to create the datasource.
Make sure to flag that the datasource is in alpha development.
Add some documentation on what the data model is for the specific offline store for more clarity.
Finally, generate the python code docs by running:
make build-sphinxFeast makes adding support for a new online store (database) easy. Developers can simply implement the OnlineStore interface to add support for a new store (other than the existing stores like Redis, DynamoDB, SQLite, and Datastore).
In this guide, we will show you how to integrate with MySQL as an online store. While we will be implementing a specific store, this guide should be representative for adding support for any new online store.
The full working code for this guide can be found at feast-dev/feast-custom-online-store-demo.
The process of using a custom online store consists of 6 steps:
Defining the OnlineStore class.
Defining the OnlineStoreConfig class.
Referencing the OnlineStore in a feature repo's feature_store.yaml file.
Testing the OnlineStore class.
Update dependencies.
Add documentation.
New online stores go in sdk/python/feast/infra/online_stores/contrib/.
Not guaranteed to implement all interface methods
Not guaranteed to be stable.
Should have warnings for users to indicate this is a contrib plugin that is not maintained by the maintainers.
To move an online store plugin out of contrib, you need:
GitHub actions (i.e make test-python-integration) is setup to run all tests against the online store and pass.
At least two contributors own the plugin (ideally tracked in our OWNERS / CODEOWNERS file).
The OnlineStore class broadly contains two sets of methods
One set deals with managing infrastructure that the online store needed for operations
One set deals with writing data into the store, and reading data from the store.
There are two methods that deal with managing infrastructure for online stores, update and teardown
update is invoked when users run feast apply as a CLI command, or the FeatureStore.apply() sdk method.
The update method should be used to perform any operations necessary before data can be written to or read from the store. The update method can be used to create MySQL tables in preparation for reads and writes to new feature views.
teardown is invoked when users run feast teardown or FeatureStore.teardown().
The teardown method should be used to perform any clean-up operations. teardown can be used to drop MySQL indices and tables corresponding to the feature views being deleted.
# Only prints out runtime warnings once.
warnings.simplefilter("once", RuntimeWarning)
def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
"""
An example of creating managing the tables needed for a mysql-backed online store.
"""
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
project = config.project
for table in tables_to_keep:
cur.execute(
f"CREATE TABLE IF NOT EXISTS {_table_id(project, table)} (entity_key VARCHAR(512), feature_name VARCHAR(256), value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
)
cur.execute(
f"CREATE INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)} (entity_key);"
)
for table in tables_to_delete:
cur.execute(
f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};"
)
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")
def teardown(
self,
config: RepoConfig,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
):
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
project = config.project
for table in tables:
cur.execute(
f"DROP INDEX {_table_id(project, table)}_ek ON {_table_id(project, table)};"
)
cur.execute(f"DROP TABLE IF EXISTS {_table_id(project, table)}")There are two methods that deal with writing data to and from the online stores.online_write_batch and online_read.
online_write_batch is invoked when running materialization (using the feast materialize or feast materialize-incremental commands, or the corresponding FeatureStore.materialize() method.
online_read is invoked when reading values from the online store using the FeatureStore.get_online_features() method.
# Only prints out runtime warnings once.
warnings.simplefilter("once", RuntimeWarning)
def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
project = config.project
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)
for feature_name, val in values.items():
self.write_to_table(created_ts, cur, entity_key_bin, feature_name, project, table, timestamp, val)
self._conn.commit()
if progress:
progress(1)
def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
warnings.warn(
"This online store is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
conn = self._get_conn(config)
cur = conn.cursor(buffered=True)
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
project = config.project
for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
print(f"entity_key_bin: {entity_key_bin}")
cur.execute(
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s",
(entity_key_bin,),
)
res = {}
res_ts = None
for feature_name, val_bin, ts in cur.fetchall():
val = ValueProto()
val.ParseFromString(val_bin)
res[feature_name] = val
res_ts = ts
if not res:
result.append((None, None))
else:
result.append((res_ts, res))
return resultAdditional configuration may be needed to allow the OnlineStore to talk to the backing store. For example, MySQL may need configuration information like the host at which the MySQL instance is running, credentials for connecting to the database, etc.
To facilitate configuration, all OnlineStore implementations are required to also define a corresponding OnlineStoreConfig class in the same file. This OnlineStoreConfig class should inherit from the FeastConfigBaseModel class, which is defined here.
The FeastConfigBaseModel is a pydantic class, which parses yaml configuration into python objects. Pydantic also allows the model classes to define validators for the config classes, to make sure that the config classes are correctly defined.
This config class must container a type field, which contains the fully qualified class name of its corresponding OnlineStore class.
Additionally, the name of the config class must be the same as the OnlineStore class, with the Config suffix.
An example of the config class for MySQL :
class MySQLOnlineStoreConfig(FeastConfigBaseModel):
type: Literal["feast_custom_online_store.mysql.MySQLOnlineStore"] = "feast_custom_online_store.mysql.MySQLOnlineStore"
host: Optional[StrictStr] = None
user: Optional[StrictStr] = None
password: Optional[StrictStr] = None
database: Optional[StrictStr] = NoneThis configuration can be specified in the feature_store.yaml as follows:
online_store:
type: feast_custom_online_store.mysql.MySQLOnlineStore
user: foo
password: barThis configuration information is available to the methods of the OnlineStore, via theconfig: RepoConfig parameter which is passed into all the methods of the OnlineStore interface, specifically at the config.online_store field of the config parameter.
def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
online_store_config = config.online_store
assert isinstance(online_store_config, MySQLOnlineStoreConfig)
connection = mysql.connector.connect(
host=online_store_config.host or "127.0.0.1",
user=online_store_config.user or "root",
password=online_store_config.password,
database=online_store_config.database or "feast",
autocommit=True
)After implementing both these classes, the custom online store can be used by referencing it in a feature repo's feature_store.yaml file, specifically in the online_store field. The value specified should be the fully qualified class name of the OnlineStore.
As long as your OnlineStore class is available in your Python environment, it will be imported by Feast dynamically at runtime.
To use our MySQL online store, we can use the following feature_store.yaml:
project: test_custom
registry: data/registry.db
provider: local
online_store:
# Make sure to specify the type as the fully qualified path that Feast can import.
type: feast_custom_online_store.mysql.MySQLOnlineStore
user: foo
password: barIf additional configuration for the online store is **not **required, then we can omit the other fields and only specify the type of the online store class as the value for the online_store.
project: test_custom
registry: data/registry.db
provider: local
online_store: feast_custom_online_store.mysql.MySQLOnlineStoreEven if you have created the OnlineStore class in a separate repo, you can still test your implementation against the Feast test suite, as long as you have Feast as a submodule in your repo.
In the Feast submodule, we can run all the unit tests and make sure they pass:
make test-pythonThe universal tests, which are integration tests specifically intended to test offline and online stores, should be run against Feast to ensure that the Feast APIs works with your online store.
Feast parametrizes integration tests using the FULL_REPO_CONFIGS variable defined in sdk/python/tests/integration/feature_repos/repo_configuration.py which stores different online store classes for testing.
To overwrite these configurations, you can simply create your own file that contains a FULL_REPO_CONFIGS variable, and point Feast to that file by setting the environment variable FULL_REPO_CONFIGS_MODULE to point to that file.
A sample FULL_REPO_CONFIGS_MODULE looks something like this:
from feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source import (
PostgreSQLDataSourceCreator,
)
AVAILABLE_ONLINE_STORES = {"postgres": (None, PostgreSQLDataSourceCreator)}If you are planning to start the online store up locally(e.g spin up a local Redis Instance) for testing, then the dictionary entry should be something like:
{
"sqlite": ({"type": "sqlite"}, None),
# Specifies sqlite as the online store. The `None` object specifies to not use a containerized docker container.
}If you are planning instead to use a Dockerized container to run your tests against your online store, you can define a OnlineStoreCreator and replace the None object above with your OnlineStoreCreator class. You should make this class available to pytest through the PYTEST_PLUGINS environment variable.
If you create a containerized docker image for testing, developers who are trying to test with your online store will not have to spin up their own instance of the online store for testing. An example of an OnlineStoreCreator is shown below:
class RedisOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, **kwargs):
super().__init__(project_name)
def create_online_store(self) -> Dict[str, str]:
self.container.start()
log_string_to_wait_for = "Ready to accept connections"
wait_for_logs(
container=self.container, predicate=log_string_to_wait_for, timeout=10
)
self.container.stop()3. Add a Makefile target to the Makefile to run your datastore specific tests by setting the FULL_REPO_CONFIGS_MODULE environment variable. Add PYTEST_PLUGINS if pytest is having trouble loading your DataSourceCreator. You can remove certain tests that are not relevant or still do not work for your datastore using the -k option.
test-python-universal-cassandra:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.online_stores.contrib.cassandra_repo_configuration \
PYTEST_PLUGINS=sdk.python.tests.integration.feature_repos.universal.online_store.cassandra \
FEAST_USAGE=False \
IS_TEST=True \
python -m pytest -x --integration \
sdk/python/testsIf there are some tests that fail, this indicates that there is a mistake in the implementation of this online store!
Add any dependencies for your online store to our sdk/python/setup.py under a new <ONLINE_STORE>_REQUIRED list with the packages and add it to the setup script so that if your online store is needed, users can install the necessary python packages. These packages should be defined as extras so that they are not installed by users by default.
You will need to regenerate our requirements files. To do this, create separate pyenv environments for python 3.8, 3.9, and 3.10. In each environment, run the following commands:
export PYTHON=<version>
make lock-python-ci-dependenciesRemember to add the documentation for your online store.
Add a new markdown file to docs/reference/online-stores/.
You should also add a reference in docs/reference/online-stores/README.md and docs/SUMMARY.md. Add a new markdown document to document your online store functionality similar to how the other online stores are documented.
NOTE:Be sure to document the following things about your online store:
Be sure to cover how to create the datasource and what configuration is needed in the feature_store.yaml file in order to create the datasource.
Make sure to flag that the online store is in alpha development.
Add some documentation on what the data model is for the specific online store for more clarity.
Finally, generate the python code docs by running:
make build-sphinx