Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
In Feast, each batch data source is associated with corresponding offline stores. For example, a SnowflakeSource can only be processed by the Snowflake offline store, while a FileSource can be processed by both File and DuckDB offline stores. Otherwise, the primary difference between batch data sources is the set of supported types. Feast has an internal type system, and aims to support eight primitive types (bytes, string, int32, int64, float32, float64, bool, and timestamp) along with the corresponding array types. However, not every batch data source supports all of these types.
For more details on the Feast type system, see .
There are currently four core batch data source implementations: FileSource, BigQuerySource, SnowflakeSource, and RedshiftSource. There are several additional implementations contributed by the Feast community (PostgreSQLSource, SparkSource, and TrinoSource), which are not guaranteed to be stable or to match the functionality of the core implementations. Details for each specific data source can be found .
Below is a matrix indicating which data sources support which types.
yes
yes
string
yes
yes
yes
yes
yes
yes
yes
yes
int32
yes
yes
yes
yes
yes
yes
yes
yes
int64
yes
yes
yes
yes
yes
yes
yes
yes
float32
yes
yes
yes
yes
yes
yes
yes
yes
float64
yes
yes
yes
yes
yes
yes
yes
yes
bool
yes
yes
yes
yes
yes
yes
yes
yes
timestamp
yes
yes
yes
yes
yes
yes
yes
yes
array types
yes
yes
yes
no
yes
yes
yes
no
bytes
yes
yes
yes
yes
yes
yes
Please see for a conceptual explanation of data sources.
Snowflake data sources are Snowflake tables or views. These can be specified either by a table reference or a SQL query.
Using a table reference:
from feast import SnowflakeSource
my_snowflake_source = SnowflakeSource(
database="FEAST",
schema="PUBLIC",
table="FEATURE_TABLE",
)Using a query:
Be careful about how Snowflake handles table and column name conventions. In particular, you can read more about quote identifiers .
The full set of configuration options is available .
Snowflake data sources support all eight primitive types. Array types are also supported but not with type inference. For a comparison against other batch data sources, please see .
BigQuery data sources are BigQuery tables or views. These can be specified either by a table reference or a SQL query. However, no performance guarantees can be provided for SQL query-based sources, so table references are recommended.
Using a table reference:
Using a query:
The full set of configuration options is available .
BigQuery data sources support all eight primitive types and their corresponding array types. For a comparison against other batch data sources, please see .
Redshift data sources are Redshift tables or views. These can be specified either by a table reference or a SQL query. However, no performance guarantees can be provided for SQL query-based sources, so table references are recommended.
Using a table name:
Using a query:
The full set of configuration options is available .
Redshift data sources support all eight primitive types, but currently do not support array types. For a comparison against other batch data sources, please see .
from feast import BigQuerySource
my_bigquery_source = BigQuerySource(
table_ref="gcp_project:bq_dataset.bq_table",
)from feast import BigQuerySource
BigQuerySource(
query="SELECT timestamp as ts, created, f1, f2 "
"FROM `my_project.my_dataset.my_features`",
)from feast import RedshiftSource
my_redshift_source = RedshiftSource(
table="redshift_table",
)from feast import RedshiftSource
my_redshift_source = RedshiftSource(
query="SELECT timestamp as ts, created, f1, f2 "
"FROM redshift_table",
)Defining a MsSQL source:
from feast.infra.offline_stores.contrib.mssql_offline_store.mssqlserver_source import (
MsSqlServerSource,
)
driver_hourly_table = "driver_hourly"
driver_source = MsSqlServerSource(
table_ref=driver_hourly_table,
event_timestamp_column="datetime",
created_timestamp_column="created",
)Defining a Trino source:
The full set of configuration options is available here.
Trino data sources support all eight primitive types and their corresponding array types. For a comparison against other batch data sources, please see here.
from feast.infra.offline_stores.contrib.trino_offline_store.trino_source import (
TrinoSource,
)
driver_hourly_stats = TrinoSource(
event_timestamp_column="event_timestamp",
table_ref="feast.driver_stats",
created_timestamp_column="created",
)from feast import SnowflakeSource
my_snowflake_source = SnowflakeSource(
query="""
SELECT
timestamp_column AS "ts",
"created",
"f1",
"f2"
FROM
`FEAST.PUBLIC.FEATURE_TABLE`
""",
)Couchbase Columnar data sources are Couchbase Capella Columnar collections that can be used as a source for feature data. Note that Couchbase Columnar is available through Couchbase Capella.
The Couchbase Columnar data source does not achieve full test coverage. Please do not assume complete stability.
Defining a Couchbase Columnar source:
The full set of configuration options is available .
Couchbase Capella Columnar data sources support BOOLEAN, STRING, BIGINT, and DOUBLE primitive types. For a comparison against other batch data sources, please see .
File data sources support all eight primitive types and their corresponding array types. For a comparison against other batch data sources, please see here.
from feast import FileSource
from feast.data_format import ParquetFormat
parquet_file_source = FileSource(
file_format=ParquetFormat(),
path="file:///feast/customer.parquet",
)Using a table reference from SparkSession (for example, either in-memory or a Hive Metastore):
Using a query:
Using a file reference:
The full set of configuration options is available here.
Spark data sources support all eight primitive types and their corresponding array types. For a comparison against other batch data sources, please see here.
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
my_spark_source = SparkSource(
table="FEATURE_TABLE",
)from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase_source import (
CouchbaseColumnarSource,
)
driver_stats_source = CouchbaseColumnarSource(
name="driver_hourly_stats_source",
query="SELECT * FROM Default.Default.`feast_driver_hourly_stats`",
database="Default",
scope="Default",
collection="feast_driver_hourly_stats",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
my_spark_source = SparkSource(
query="SELECT timestamp as ts, created, f1, f2 "
"FROM spark_table",
)from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
SparkSource,
)
my_spark_source = SparkSource(
path=f"{CURRENT_DIR}/data/driver_hourly_stats",
file_format="parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)Push sources allow feature values to be pushed to the online store and offline store in real time. This allows fresh feature values to be made available to applications. Push sources supercede the FeatureStore.write_to_online_store.
Push sources can be used by multiple feature views. When data is pushed to a push source, Feast propagates the feature values to all the consuming feature views.
Push sources can optionally have a batch_source specified. If provided, it enables retrieval of historical features and supports materialization from the offline store to the online store. However, if your features are generated post-training or are only needed online (e.g., embeddings), you can omit the batch_source.
When a batch_source is used, users are responsible for ensuring that data is also pushed to a batch data source, such as a data warehouse. Note that when a push source is used as a stream source in a feature view definition, a batch_source does not need to be explicitly specified in the feature view itself.
Streaming data sources are important sources of feature values. A typical setup with streaming data looks like:
Raw events come in (stream 1)
Streaming transformations applied (e.g. generating features like last_N_purchased_categories) (stream 2)
Write stream 2 values to an offline store as a historical log for training (optional)
Write stream 2 values to an online store for low latency feature serving
Feast allows users to push features previously registered in a feature view to the online store for fresher features. It also allows users to push batches of stream data to the offline store by specifying that the push be directed to the offline store. This will push the data to the offline store declared in the repository configuration used to initialize the feature store.
Note that the push schema needs to also include the entity.
Note that the to parameter is optional and defaults to online but we can specify these options: PushMode.ONLINE, PushMode.OFFLINE, or PushMode.ONLINE_AND_OFFLINE.
See also for instructions on how to push data to a deployed feature server.
The default option to write features from a stream is to add the Python SDK into your existing PySpark pipeline.
This can also be used under the hood by a contrib stream processor (see )
PostgreSQL data sources are PostgreSQL tables or views. These can be specified either by a table reference or a SQL query.
The PostgreSQL data source does not achieve full test coverage. Please do not assume complete stability.
Defining a Postgres source:
The full set of configuration options is available .
PostgreSQL data sources support all eight primitive types and their corresponding array types. For a comparison against other batch data sources, please see .
Warning: This is an experimental feature. It's intended for early testing and feedback, and could change without warnings in future releases.
Kinesis sources allow users to register Kinesis streams as data sources. Feast currently does not launch or monitor jobs to ingest data from Kinesis. Users are responsible for launching and monitoring their own ingestion jobs, which should write feature values to the online store through . An example of how to launch such a job with Spark to ingest from Kafka can be found ; by using a different plugin, the example can be adapted to Kinesis. Feast also provides functionality to write to the offline store using the write_to_offline_store functionality.
Kinesis sources must have a batch source specified. The batch source will be used for retrieving historical features. Thus users are also responsible for writing data from their Kinesis streams to a batch data source such as a data warehouse table. When using a Kinesis source as a stream source in the definition of a feature view, a batch source doesn't need to be specified in the feature view definition explicitly.
Periodically materialize feature values from the offline store into the online store for decreased training-serving skew and improved model performance
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
PostgreSQLSource,
)
driver_stats_source = PostgreSQLSource(
name="feast_driver_hourly_stats",
query="SELECT * FROM feast_driver_hourly_stats",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)Streaming data sources are important sources of feature values. A typical setup with streaming data looks like:
Raw events come in (stream 1)
Streaming transformations applied (e.g. generating features like last_N_purchased_categories) (stream 2)
Write stream 2 values to an offline store as a historical log for training (optional)
Write stream 2 values to an online store for low latency feature serving
Periodically materialize feature values from the offline store into the online store for decreased training-serving skew and improved model performance
Note that the Kinesis source has a batch source.
The Kinesis source can be used in a stream feature view.
See here for a example of how to ingest data from a Kafka source into Feast. The approach used in the tutorial can be easily adapted to work for Kinesis as well.
from feast import Entity, PushSource, ValueType, BigQuerySource, FeatureView, Feature, Field
from feast.types import Int64
push_source = PushSource(
name="push_source",
batch_source=BigQuerySource(table="test.test"),
)
user = Entity(name="user", join_keys=["user_id"])
fv = FeatureView(
name="feature view",
entities=[user],
schema=[Field(name="life_time_value", dtype=Int64)],
source=push_source,
)from feast import FeatureStore
import pandas as pd
from feast.data_source import PushMode
fs = FeatureStore(...)
feature_data_frame = pd.DataFrame()
fs.push("push_source_name", feature_data_frame, to=PushMode.ONLINE_AND_OFFLINE)from feast import FeatureStore
store = FeatureStore(...)
spark = SparkSession.builder.getOrCreate()
streamingDF = spark.readStream.format(...).load()
def feast_writer(spark_df):
pandas_df = spark_df.to_pandas()
store.push("driver_hourly_stats", pandas_df)
streamingDF.writeStream.foreachBatch(feast_writer).start()from datetime import timedelta
from feast import Field, FileSource, KinesisSource, stream_feature_view
from feast.data_format import JsonFormat
from feast.types import Float32
driver_stats_batch_source = FileSource(
name="driver_stats_source",
path="data/driver_stats.parquet",
timestamp_field="event_timestamp",
)
driver_stats_stream_source = KinesisSource(
name="driver_stats_stream",
stream_name="drivers",
timestamp_field="event_timestamp",
batch_source=driver_stats_batch_source,
record_format=JsonFormat(
schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
),
watermark_delay_threshold=timedelta(minutes=5),
)@stream_feature_view(
entities=[driver],
ttl=timedelta(seconds=8640000000),
mode="spark",
schema=[
Field(name="conv_percentage", dtype=Float32),
Field(name="acc_percentage", dtype=Float32),
],
timestamp_field="event_timestamp",
online=True,
source=driver_stats_stream_source,
)
def driver_hourly_stats_stream(df: DataFrame):
from pyspark.sql.functions import col
return (
df.withColumn("conv_percentage", col("conv_rate") * 100.0)
.withColumn("acc_percentage", col("acc_rate") * 100.0)
.drop("conv_rate", "acc_rate")
)Warning: This is an experimental feature. It's intended for early testing and feedback, and could change without warnings in future releases.
Kafka sources allow users to register Kafka streams as data sources. Feast currently does not launch or monitor jobs to ingest data from Kafka. Users are responsible for launching and monitoring their own ingestion jobs, which should write feature values to the online store through FeatureStore.write_to_online_store. An example of how to launch such a job with Spark can be found here. Feast also provides functionality to write to the offline store using the write_to_offline_store functionality.
Kafka sources must have a batch source specified. The batch source will be used for retrieving historical features. Thus users are also responsible for writing data from their Kafka streams to a batch data source such as a data warehouse table. When using a Kafka source as a stream source in the definition of a feature view, a batch source doesn't need to be specified in the feature view definition explicitly.
Streaming data sources are important sources of feature values. A typical setup with streaming data looks like:
Raw events come in (stream 1)
Streaming transformations applied (e.g. generating features like last_N_purchased_categories) (stream 2)
Write stream 2 values to an offline store as a historical log for training (optional)
Write stream 2 values to an online store for low latency feature serving
Note that the Kafka source has a batch source.
The Kafka source can be used in a stream feature view.
See for a example of how to ingest data from a Kafka source into Feast.
Periodically materialize feature values from the offline store into the online store for decreased training-serving skew and improved model performance
from datetime import timedelta
from feast import Field, FileSource, KafkaSource, stream_feature_view
from feast.data_format import JsonFormat
from feast.types import Float32
driver_stats_batch_source = FileSource(
name="driver_stats_source",
path="data/driver_stats.parquet",
timestamp_field="event_timestamp",
)
driver_stats_stream_source = KafkaSource(
name="driver_stats_stream",
kafka_bootstrap_servers="localhost:9092",
topic="drivers",
timestamp_field="event_timestamp",
batch_source=driver_stats_batch_source,
message_format=JsonFormat(
schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
),
watermark_delay_threshold=timedelta(minutes=5),
)@stream_feature_view(
entities=[driver],
ttl=timedelta(seconds=8640000000),
mode="spark",
schema=[
Field(name="conv_percentage", dtype=Float32),
Field(name="acc_percentage", dtype=Float32),
],
timestamp_field="event_timestamp",
online=True,
source=driver_stats_stream_source,
)
def driver_hourly_stats_stream(df: DataFrame):
from pyspark.sql.functions import col
return (
df.withColumn("conv_percentage", col("conv_rate") * 100.0)
.withColumn("acc_percentage", col("acc_rate") * 100.0)
.drop("conv_rate", "acc_rate")
)