[Alpha] Streaming feature computation with Denormalized
Denormalized makes it easy to compute real-time features and write them directly to your Feast online store. This guide will walk you through setting up a streaming pipeline that computes feature aggregations and pushes them to Feast in real-time.
Prerequisites
Python 3.12+
Kafka cluster (local or remote) OR docker installed
For a full working demo, check out the feast-example repo.
Quick Start
First, create a new Python project or use our template:
mkdirmy-feature-projectcdmy-feature-projectpython-mvenv.venvsource.venv/bin/activate# or `.venv\Scripts\activate` on Windowspipinstalldenormalized[feast]feast
docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest
This will spin up a docker container that runs a kafka instance and run a simple script to emit fake data to two topics.
Define Your Features
In feature_repo/sensor_data.py, define your feature view and entity:
from feast import Entity, FeatureView, PushSource, Fieldfrom feast.types import Float64, String# Define your entitysensor =Entity( name="sensor", join_keys=["sensor_name"],)# Create a push source for real-time featuressource =PushSource( name="push_sensor_statistics", batch_source=your_batch_source # Define your batch source)# Define your feature viewstats_view =FeatureView( name="sensor_statistics", entities=[sensor], schema=ds.get_feast_schema(), # Denormalized handles this for you! source=source, online=True,)
Create Your Streaming Pipeline
In stream_job.py, define your streaming computations:
from denormalized import Context, FeastDataStreamfrom denormalized.datafusion import col, functions as ffrom feast import FeatureStoresample_event ={"occurred_at_ms":100,"sensor_name":"foo","reading":0.0,}# Create a stream from your Kafka topicds =FeastDataStream(Context().from_topic("temperature", json.dumps(sample_event), "localhost:9092", "occurred_at_ms"))# Define your feature computationsds = ds.window( [col("sensor_name")], # Group by sensor [ f.count(col("reading")).alias("count"), f.min(col("reading")).alias("min"), f.max(col("reading")).alias("max"), f.avg(col("reading")).alias("average"), ],1000, # Window size in msNone# Slide interval (None = tumbling window))feature_store =FeatureStore(repo_path="feature_repo/")# This single line connects Denormalized to Feast!ds.write_feast_feature(feature_store, "push_sensor_statistics")