from denormalized import Context, FeastDataStream
from denormalized.datafusion import col, functions as f
from feast import FeatureStore
sample_event = {
"occurred_at_ms": 100,
"sensor_name": "foo",
"reading": 0.0,
}
# Create a stream from your Kafka topic
ds = FeastDataStream(Context().from_topic("temperature", json.dumps(sample_event), "localhost:9092", "occurred_at_ms"))
# Define your feature computations
ds = ds.window(
[col("sensor_name")], # Group by sensor
[
f.count(col("reading")).alias("count"),
f.min(col("reading")).alias("min"),
f.max(col("reading")).alias("max"),
f.avg(col("reading")).alias("average"),
],
1000, # Window size in ms
None # Slide interval (None = tumbling window)
)
feature_store = FeatureStore(repo_path="feature_repo/")
# This single line connects Denormalized to Feast!
ds.write_feast_feature(feature_store, "push_sensor_statistics")