Adding a custom batch materialization engine
Overview
Guide
Step 1: Define an Engine class
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.infra.materialization import LocalMaterializationEngine, LocalMaterializationJob, MaterializationTask
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.repo_config import RepoConfig
class MyCustomEngine(LocalMaterializationEngine):
def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
super().__init__(
repo_config=repo_config,
offline_store=offline_store,
online_store=online_store,
**kwargs,
)
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
print("Creating new infrastructure is easy here!")
pass
def materialize(
self, registry, tasks: List[MaterializationTask]
) -> List[LocalMaterializationJob]:
print("Launching custom batch jobs or multithreading things is pretty easy...")
return [
self._materialize_one(
registry,
task.feature_view,
task.start_time,
task.end_time,
task.project,
task.tqdm_builder,
)
for task in tasks
]Step 2: Configuring Feast to use the engine
Step 3: Using the engine
Last updated
Was this helpful?