LogoLogo
v0.45-branch
v0.45-branch
  • Introduction
  • Community & getting help
  • Roadmap
  • Changelog
  • Getting started
    • Quickstart
    • Architecture
      • Overview
      • Language
      • Push vs Pull Model
      • Write Patterns
      • Feature Transformation
      • Feature Serving and Model Inference
      • Role-Based Access Control (RBAC)
    • Concepts
      • Overview
      • Project
      • Data ingestion
      • Entity
      • Feature view
      • Feature retrieval
      • Point-in-time joins
      • [Alpha] Saved dataset
      • Permission
      • Tags
    • Components
      • Overview
      • Registry
      • Offline store
      • Online store
      • Feature server
      • Batch Materialization Engine
      • Provider
      • Authorization Manager
    • Third party integrations
    • FAQ
  • Tutorials
    • Sample use-case tutorials
      • Driver ranking
      • Fraud detection on GCP
      • Real-time credit scoring on AWS
      • Driver stats on Snowflake
    • Validating historical features with Great Expectations
    • Building streaming features
  • How-to Guides
    • Running Feast with Snowflake/GCP/AWS
      • Install Feast
      • Create a feature repository
      • Deploy a feature store
      • Build a training dataset
      • Load data into the online store
      • Read features from the online store
      • Scaling Feast
      • Structuring Feature Repos
    • Running Feast in production (e.g. on Kubernetes)
    • Customizing Feast
      • Adding a custom batch materialization engine
      • Adding a new offline store
      • Adding a new online store
      • Adding a custom provider
    • Adding or reusing tests
    • Starting Feast servers in TLS(SSL) Mode
  • Reference
    • Codebase Structure
    • Type System
    • Data sources
      • Overview
      • File
      • Snowflake
      • BigQuery
      • Redshift
      • Push
      • Kafka
      • Kinesis
      • Spark (contrib)
      • PostgreSQL (contrib)
      • Trino (contrib)
      • Azure Synapse + Azure SQL (contrib)
    • Offline stores
      • Overview
      • Dask
      • Snowflake
      • BigQuery
      • Redshift
      • DuckDB
      • Spark (contrib)
      • PostgreSQL (contrib)
      • Trino (contrib)
      • Azure Synapse + Azure SQL (contrib)
      • Remote Offline
    • Online stores
      • Overview
      • SQLite
      • Snowflake
      • Redis
      • Dragonfly
      • IKV
      • Datastore
      • DynamoDB
      • Bigtable
      • Remote
      • PostgreSQL
      • Cassandra + Astra DB
      • Couchbase
      • MySQL
      • Hazelcast
      • ScyllaDB
      • SingleStore
      • Milvus
    • Registries
      • Local
      • S3
      • GCS
      • SQL
      • Snowflake
    • Providers
      • Local
      • Google Cloud Platform
      • Amazon Web Services
      • Azure
    • Batch Materialization Engines
      • Snowflake
      • AWS Lambda (alpha)
      • Spark (contrib)
    • Feature repository
      • feature_store.yaml
      • .feastignore
    • Feature servers
      • Python feature server
      • [Alpha] Go feature server
      • Offline Feature Server
    • [Beta] Web UI
    • [Beta] On demand feature view
    • [Alpha] Vector Database
    • [Alpha] Data quality monitoring
    • [Alpha] Streaming feature computation with Denormalized
    • Feast CLI reference
    • Python API reference
    • Usage
  • Project
    • Contribution process
    • Development guide
    • Backwards Compatibility Policy
      • Maintainer Docs
    • Versioning policy
    • Release process
    • Feast 0.9 vs Feast 0.10+
Powered by GitBook
On this page
  • Prerequisites
  • Quick Start
  • Project Structure
  • Define Your Features
  • Create Your Streaming Pipeline
  • Need Help?

Was this helpful?

Edit on GitHub
Export as PDF
  1. Reference

[Alpha] Streaming feature computation with Denormalized

Previous[Alpha] Data quality monitoringNextFeast CLI reference

Last updated 3 months ago

Was this helpful?

Denormalized makes it easy to compute real-time features and write them directly to your Feast online store. This guide will walk you through setting up a streaming pipeline that computes feature aggregations and pushes them to Feast in real-time.

Prerequisites

  • Python 3.12+

  • Kafka cluster (local or remote) OR docker installed

Quick Start

  1. First, create a new Python project or use our template:

mkdir my-feature-project
cd my-feature-project
python -m venv .venv
source .venv/bin/activate  # or `.venv\Scripts\activate` on Windows
pip install denormalized[feast] feast
  1. Set up your Feast feature repository:

feast init feature_repo

Project Structure

Your project should look something like this:

my-feature-project/
├── feature_repo/
│   ├── feature_store.yaml
│   └── sensor_data.py        # Feature definitions
├── stream_job.py             # Denormalized pipeline
└── main.py                   # Pipeline runner
  1. Run a test Kafka instance in docker

docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest

This will spin up a docker container that runs a kafka instance and run a simple script to emit fake data to two topics.

Define Your Features

In feature_repo/sensor_data.py, define your feature view and entity:

from feast import Entity, FeatureView, PushSource, Field
from feast.types import Float64, String

# Define your entity
sensor = Entity(
    name="sensor",
    join_keys=["sensor_name"],
)

# Create a push source for real-time features
source = PushSource(
    name="push_sensor_statistics",
    batch_source=your_batch_source  # Define your batch source
)

# Define your feature view
stats_view = FeatureView(
    name="sensor_statistics",
    entities=[sensor],
    schema=ds.get_feast_schema(),  # Denormalized handles this for you!
    source=source,
    online=True,
)

Create Your Streaming Pipeline

In stream_job.py, define your streaming computations:

from denormalized import Context, FeastDataStream
from denormalized.datafusion import col, functions as f
from feast import FeatureStore

sample_event = {
    "occurred_at_ms": 100,
    "sensor_name": "foo",
    "reading": 0.0,
}

# Create a stream from your Kafka topic
ds = FeastDataStream(Context().from_topic("temperature", json.dumps(sample_event), "localhost:9092", "occurred_at_ms"))

# Define your feature computations
ds = ds.window(
    [col("sensor_name")],  # Group by sensor
    [
        f.count(col("reading")).alias("count"),
        f.min(col("reading")).alias("min"),
        f.max(col("reading")).alias("max"),
        f.avg(col("reading")).alias("average"),
    ],
    1000,  # Window size in ms
    None   # Slide interval (None = tumbling window)
)

feature_store = FeatureStore(repo_path="feature_repo/")

# This single line connects Denormalized to Feast!
ds.write_feast_feature(feature_store, "push_sensor_statistics")

Need Help?

  • Email us at hello@denormalized.io

For a full working demo, check out the repo.

Check out more examples on our

feast-example
GitHub
Denormalized/Feast integration diagram