🧠Compute Engines
The ComputeEngine
is Feast’s pluggable abstraction for executing feature pipelines — including transformations, aggregations, joins, and materializations/get_historical_features — on a backend of your choice (e.g., Spark, PyArrow, Pandas, Ray).
It powers both:
materialize()
– for batch and stream generation of features to offline/online storesget_historical_features()
– for point-in-time correct training dataset retrieval
This system builds and executes DAGs (Directed Acyclic Graphs) of typed operations, enabling modular and scalable workflows.
🧠 Core Concepts
Feature resolver and builder
The FeatureBuilder
initializes a FeatureResolver
that extracts a DAG from the FeatureView
definitions, resolving dependencies and ensuring the correct execution order.
The FeatureView represents a logical data source, while DataSource represents the physical data source (e.g., BigQuery, Spark, etc.).
When defining a FeatureView
, the source can be a physical DataSource
, a derived FeatureView
, or a list of FeatureViews
. The FeatureResolver walks through the FeatureView sources, and topologically sorts the DAG nodes based on dependencies, and returns a head node that represents the final output of the DAG.
Subsequently, the FeatureBuilder
builds the DAG nodes from the resolved head node, creating a DAGNode
for each operation (read, join, filter, aggregate, etc.). An example of built output from FeatureBuilder:
- Output(Agg(daily_driver_stats))
- Agg(daily_driver_stats)
- Filter(daily_driver_stats)
- Transform(daily_driver_stats)
- Agg(hourly_driver_stats)
- Filter(hourly_driver_stats)
- Transform(hourly_driver_stats)
- Source(hourly_driver_stats)
Diagram

✨ Available Engines
🔥 SparkComputeEngine
Spark (contrib)Distributed DAG execution via Apache Spark
Supports point-in-time joins and large-scale materialization
Integrates with
SparkOfflineStore
andSparkMaterializationJob
🧪 LocalComputeEngine
https://github.com/feast-dev/feast/blob/master/docs/reference/compute-engine/local.mdRuns on Arrow + Specified backend (e.g., Pandas, Polars)
Designed for local dev, testing, or lightweight feature generation
Supports
LocalMaterializationJob
andLocalHistoricalRetrievalJob
🧊 SnowflakeComputeEngine
Runs entirely in Snowflake
Supports Snowflake SQL for feature transformations and aggregations
Integrates with
SnowflakeOfflineStore
andSnowflakeMaterializationJob
LambdaComputeEngine
AWS Lambda (alpha)🛠️ Feature Builder Flow
SourceReadNode
|
v
TransformationNode (If feature_transformation is defined) | JoinNode (default behavior for multiple sources)
|
v
FilterNode (Always included; applies TTL or user-defined filters)
|
v
AggregationNode (If aggregations are defined in FeatureView)
|
v
DeduplicationNode (If no aggregation is defined for get_historical_features)
|
v
ValidationNode (If enable_validation = True)
|
v
Output
├──> RetrievalOutput (For get_historical_features)
└──> OnlineStoreWrite / OfflineStoreWrite (For materialize)
Each step is implemented as a DAGNode
. An ExecutionPlan
executes these nodes in topological order, caching DAGValue
outputs.
🧩 Implementing a Custom Compute Engine
To create your own compute engine:
Implement the interface
from feast.infra.compute_engines.base import ComputeEngine
from typing import Sequence, Union
from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.common.materialization_job import (
MaterializationJob,
MaterializationTask,
)
from feast.infra.common.retrieval_task import HistoricalRetrievalTask
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.registry.base_registry import BaseRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.stream_feature_view import StreamFeatureView
class MyComputeEngine(ComputeEngine):
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView, OnDemandFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
...
def _materialize_one(
self,
registry: BaseRegistry,
task: MaterializationTask,
**kwargs,
) -> MaterializationJob:
...
def get_historical_features(self, task: HistoricalRetrievalTask) -> RetrievalJob:
...
Create a FeatureBuilder
from feast.infra.compute_engines.feature_builder import FeatureBuilder
class CustomFeatureBuilder(FeatureBuilder):
def build_source_node(self): ...
def build_aggregation_node(self, input_node): ...
def build_join_node(self, input_node): ...
def build_filter_node(self, input_node):
def build_dedup_node(self, input_node):
def build_transformation_node(self, input_node): ...
def build_output_nodes(self, input_node): ...
def build_validation_node(self, input_node): ...
Define DAGNode subclasses
ReadNode, AggregationNode, JoinNode, WriteNode, etc.
Each DAGNode.execute(context) -> DAGValue
Return an ExecutionPlan
ExecutionPlan stores DAG nodes in topological order
Automatically handles intermediate value caching
🚧 Roadmap
Last updated
Was this helpful?