LogoLogo
v0.24-branch
v0.24-branch
  • Introduction
  • Community & getting help
  • Roadmap
  • Changelog
  • Getting started
    • Quickstart
    • Concepts
      • Overview
      • Data ingestion
      • Entity
      • Feature view
      • Feature retrieval
      • Point-in-time joins
      • Registry
      • [Alpha] Saved dataset
    • Architecture
      • Overview
      • Registry
      • Offline store
      • Online store
      • Batch Materialization Engine
      • Provider
    • Third party integrations
    • FAQ
  • Tutorials
    • Sample use-case tutorials
      • Driver ranking
      • Fraud detection on GCP
      • Real-time credit scoring on AWS
      • Driver stats on Snowflake
    • Validating historical features with Great Expectations
    • Using Scalable Registry
    • Building streaming features
  • How-to Guides
    • Running Feast with Snowflake/GCP/AWS
      • Install Feast
      • Create a feature repository
      • Deploy a feature store
      • Build a training dataset
      • Load data into the online store
      • Read features from the online store
      • Scaling Feast
      • Structuring Feature Repos
    • Running Feast in production (e.g. on Kubernetes)
    • Upgrading for Feast 0.20+
    • Customizing Feast
      • Adding a custom batch materialization engine
      • Adding a new offline store
      • Adding a new online store
      • Adding a custom provider
    • Adding or reusing tests
  • Reference
    • Codebase Structure
    • Type System
    • Data sources
      • Overview
      • File
      • Snowflake
      • BigQuery
      • Redshift
      • Push
      • Kafka
      • Kinesis
      • Spark (contrib)
      • PostgreSQL (contrib)
      • Trino (contrib)
      • Azure Synapse + Azure SQL (contrib)
    • Offline stores
      • Overview
      • File
      • Snowflake
      • BigQuery
      • Redshift
      • Spark (contrib)
      • PostgreSQL (contrib)
      • Trino (contrib)
      • Azure Synapse + Azure SQL (contrib)
    • Online stores
      • Overview
      • SQLite
      • Snowflake
      • Redis
      • Datastore
      • DynamoDB
      • PostgreSQL (contrib)
      • Cassandra + Astra DB (contrib)
    • Providers
      • Local
      • Google Cloud Platform
      • Amazon Web Services
      • Azure
    • Batch Materialization Engines
      • Bytewax
      • Snowflake
    • Feature repository
      • feature_store.yaml
      • .feastignore
    • Feature servers
      • Python feature server
      • [Alpha] Go feature server
      • [Alpha] AWS Lambda feature server
    • [Beta] Web UI
    • [Alpha] On demand feature view
    • [Alpha] Data quality monitoring
    • Feast CLI reference
    • Python API reference
    • Usage
  • Project
    • Contribution process
    • Development guide
    • Backwards Compatibility Policy
      • Maintainer Docs
    • Versioning policy
    • Release process
    • Feast 0.9 vs Feast 0.10+
Powered by GitBook
On this page
  • Description
  • Stream sources
  • Example
  • Defining a Kafka source
  • Using the Kafka source in a stream feature view
  • Ingesting data

Was this helpful?

Edit on GitHub
Export as PDF
  1. Reference
  2. Data sources

Kafka

PreviousPushNextKinesis

Last updated 2 years ago

Was this helpful?

Warning: This is an experimental feature. It's intended for early testing and feedback, and could change without warnings in future releases.

Description

Kafka sources allow users to register Kafka streams as data sources. Feast currently does not launch or monitor jobs to ingest data from Kafka. Users are responsible for launching and monitoring their own ingestion jobs, which should write feature values to the online store through . An example of how to launch such a job with Spark can be found . Feast also provides functionality to write to the offline store using the write_to_offline_store functionality.

Kafka sources must have a batch source specified. The batch source will be used for retrieving historical features. Thus users are also responsible for writing data from their Kafka streams to a batch data source such as a data warehouse table. When using a Kafka source as a stream source in the definition of a feature view, a batch source doesn't need to be specified in the feature view definition explicitly.

Stream sources

Streaming data sources are important sources of feature values. A typical setup with streaming data looks like:

  1. Raw events come in (stream 1)

  2. Streaming transformations applied (e.g. generating features like last_N_purchased_categories) (stream 2)

  3. Write stream 2 values to an offline store as a historical log for training (optional)

  4. Write stream 2 values to an online store for low latency feature serving

  5. Periodically materialize feature values from the offline store into the online store for decreased training-serving skew and improved model performance

Example

Defining a Kafka source

Note that the Kafka source has a batch source.

from datetime import timedelta

from feast import Field, FileSource, KafkaSource, stream_feature_view
from feast.data_format import JsonFormat
from feast.types import Float32

driver_stats_batch_source = FileSource(
    name="driver_stats_source",
    path="data/driver_stats.parquet",
    timestamp_field="event_timestamp",
)

driver_stats_stream_source = KafkaSource(
    name="driver_stats_stream",
    kafka_bootstrap_servers="localhost:9092",
    topic="drivers",
    timestamp_field="event_timestamp",
    batch_source=driver_stats_batch_source,
    message_format=JsonFormat(
        schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
    ),
    watermark_delay_threshold=timedelta(minutes=5),
)

Using the Kafka source in a stream feature view

The Kafka source can be used in a stream feature view.

@stream_feature_view(
    entities=[driver],
    ttl=timedelta(seconds=8640000000),
    mode="spark",
    schema=[
        Field(name="conv_percentage", dtype=Float32),
        Field(name="acc_percentage", dtype=Float32),
    ],
    timestamp_field="event_timestamp",
    online=True,
    source=driver_stats_stream_source,
)
def driver_hourly_stats_stream(df: DataFrame):
    from pyspark.sql.functions import col

    return (
        df.withColumn("conv_percentage", col("conv_rate") * 100.0)
        .withColumn("acc_percentage", col("acc_rate") * 100.0)
        .drop("conv_rate", "acc_rate")
    )

Ingesting data

See for a example of how to ingest data from a Kafka source into Feast.

FeatureStore.write_to_online_store
here
here