Warning: This is an experimental feature. It's intended for early testing and feedback, and could change without warnings in future releases.
Kinesis sources allow users to register Kinesis streams as data sources. Feast currently does not launch or monitor jobs to ingest data from Kinesis. Users are responsible for launching and monitoring their own ingestion jobs, which should write feature values to the online store through FeatureStore.write_to_online_store. An example of how to launch such a job with Spark to ingest from Kafka can be found here; by using a different plugin, the example can be adapted to Kinesis. Feast also provides functionality to write to the offline store using the
Kinesis sources must have a batch source specified. The batch source will be used for retrieving historical features. Thus users are also responsible for writing data from their Kinesis streams to a batch data source such as a data warehouse table. When using a Kinesis source as a stream source in the definition of a feature view, a batch source doesn't need to be specified in the feature view definition explicitly.
Streaming data sources are important sources of feature values. A typical setup with streaming data looks like:
- 1.Raw events come in (stream 1)
- 2.Streaming transformations applied (e.g. generating features like
last_N_purchased_categories) (stream 2)
- 3.Write stream 2 values to an offline store as a historical log for training (optional)
- 4.Write stream 2 values to an online store for low latency feature serving
- 5.Periodically materialize feature values from the offline store into the online store for decreased training-serving skew and improved model performance
Note that the Kinesis source has a batch source.
from datetime import timedelta
from feast import Field, FileSource, KinesisSource, stream_feature_view
from feast.data_format import JsonFormat
from feast.types import Float32
driver_stats_batch_source = FileSource(
driver_stats_stream_source = KinesisSource(
schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
The Kinesis source can be used in a stream feature view.
def driver_hourly_stats_stream(df: DataFrame):
from pyspark.sql.functions import col
df.withColumn("conv_percentage", col("conv_rate") * 100.0)
.withColumn("acc_percentage", col("acc_rate") * 100.0)