arrow-left

All pages
gitbookPowered by GitBook
1 of 13

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Data sources

Please see Data Source for a conceptual explanation of data sources.

Overviewchevron-rightFilechevron-rightSnowflakechevron-rightBigQuerychevron-rightRedshiftchevron-rightPushchevron-rightKafkachevron-rightKinesischevron-rightSpark (contrib)chevron-rightPostgreSQL (contrib)chevron-rightTrino (contrib)chevron-rightAzure Synapse + Azure SQL (contrib)chevron-right

File

hashtag
Description

File data sources are files on disk or on S3. Currently only Parquet files are supported.

circle-exclamation

FileSource is meant for development purposes only and is not optimized for production use.

hashtag
Example

The full set of configuration options is available .

hashtag
Supported Types

File data sources support all eight primitive types and their corresponding array types. For a comparison against other batch data sources, please see .

from feast import FileSource
from feast.data_format import ParquetFormat

parquet_file_source = FileSource(
    file_format=ParquetFormat(),
    path="file:///feast/customer.parquet",
)
herearrow-up-right
here

Kafka

Warning: This is an experimental feature. It's intended for early testing and feedback, and could change without warnings in future releases.

hashtag
Description

Kafka sources allow users to register Kafka streams as data sources. Feast currently does not launch or monitor jobs to ingest data from Kafka. Users are responsible for launching and monitoring their own ingestion jobs, which should write feature values to the online store through FeatureStore.write_to_online_storearrow-up-right. An example of how to launch such a job with Spark can be found herearrow-up-right. Feast also provides functionality to write to the offline store using the write_to_offline_store functionality.

Kafka sources must have a batch source specified. The batch source will be used for retrieving historical features. Thus users are also responsible for writing data from their Kafka streams to a batch data source such as a data warehouse table. When using a Kafka source as a stream source in the definition of a feature view, a batch source doesn't need to be specified in the feature view definition explicitly.

hashtag
Stream sources

Streaming data sources are important sources of feature values. A typical setup with streaming data looks like:

  1. Raw events come in (stream 1)

  2. Streaming transformations applied (e.g. generating features like last_N_purchased_categories) (stream 2)

  3. Write stream 2 values to an offline store as a historical log for training (optional)

hashtag
Example

hashtag
Defining a Kafka source

Note that the Kafka source has a batch source.

hashtag
Using the Kafka source in a stream feature view

The Kafka source can be used in a stream feature view.

hashtag
Ingesting data

See for a example of how to ingest data from a Kafka source into Feast.

Snowflake

hashtag
Description

Snowflake data sources are Snowflake tables or views. These can be specified either by a table reference or a SQL query.

hashtag
Examples

Using a table reference:

Using a query:

circle-exclamation

Be careful about how Snowflake handles table and column name conventions. In particular, you can read more about quote identifiers .

The full set of configuration options is available .

hashtag
Supported Types

Snowflake data sources support all eight primitive types, but currently do not support array types. For a comparison against other batch data sources, please see .

PostgreSQL (contrib)

hashtag
Description

PostgreSQL data sources are PostgreSQL tables or views. These can be specified either by a table reference or a SQL query.

hashtag
Disclaimer

The PostgreSQL data source does not achieve full test coverage. Please do not assume complete stability.

hashtag
Examples

Defining a Postgres source:

The full set of configuration options is available .

hashtag
Supported Types

PostgreSQL data sources support all eight primitive types and their corresponding array types. For a comparison against other batch data sources, please see .

Spark (contrib)

hashtag
Description

Spark data sources are tables or files that can be loaded from some Spark store (e.g. Hive or in-memory). They can also be specified by a SQL query.

hashtag

Redshift

hashtag
Description

Redshift data sources are Redshift tables or views. These can be specified either by a table reference or a SQL query. However, no performance guarantees can be provided for SQL query-based sources, so table references are recommended.

hashtag

Write stream 2 values to an online store for low latency feature serving

  • Periodically materialize feature values from the offline store into the online store for decreased training-serving skew and improved model performance

  • herearrow-up-right
    herearrow-up-right
    herearrow-up-right
    here
    herearrow-up-right
    here
    Disclaimer

    The Spark data source does not achieve full test coverage. Please do not assume complete stability.

    hashtag
    Examples

    Using a table reference from SparkSession (for example, either in-memory or a Hive Metastore):

    Using a query:

    Using a file reference:

    The full set of configuration options is available herearrow-up-right.

    hashtag
    Supported Types

    Spark data sources support all eight primitive types and their corresponding array types. For a comparison against other batch data sources, please see here.

    Examples

    Using a table name:

    Using a query:

    The full set of configuration options is available herearrow-up-right.

    hashtag
    Supported Types

    Redshift data sources support all eight primitive types, but currently do not support array types. For a comparison against other batch data sources, please see here.

    from feast import RedshiftSource
    
    my_redshift_source = RedshiftSource(
        table="redshift_table",
    )
    from datetime import timedelta
    
    from feast import Field, FileSource, KafkaSource, stream_feature_view
    from feast.data_format import JsonFormat
    from feast.types import Float32
    
    driver_stats_batch_source = FileSource(
        name="driver_stats_source",
        path="data/driver_stats.parquet",
        timestamp_field="event_timestamp",
    )
    
    driver_stats_stream_source = KafkaSource(
        name="driver_stats_stream",
        kafka_bootstrap_servers="localhost:9092",
        topic="drivers",
        timestamp_field="event_timestamp",
        batch_source=driver_stats_batch_source,
        message_format=JsonFormat(
            schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
        ),
        watermark_delay_threshold=timedelta(minutes=5),
    )
    @stream_feature_view(
        entities=[driver],
        ttl=timedelta(seconds=8640000000),
        mode="spark",
        schema=[
            Field(name="conv_percentage", dtype=Float32),
            Field(name="acc_percentage", dtype=Float32),
        ],
        timestamp_field="event_timestamp",
        online=True,
        source=driver_stats_stream_source,
    )
    def driver_hourly_stats_stream(df: DataFrame):
        from pyspark.sql.functions import col
    
        return (
            df.withColumn("conv_percentage", col("conv_rate") * 100.0)
            .withColumn("acc_percentage", col("acc_rate") * 100.0)
            .drop("conv_rate", "acc_rate")
        )
    from feast import SnowflakeSource
    
    my_snowflake_source = SnowflakeSource(
        database="FEAST",
        schema="PUBLIC",
        table="FEATURE_TABLE",
    )
    from feast import SnowflakeSource
    
    my_snowflake_source = SnowflakeSource(
        query="""
        SELECT
            timestamp_column AS "ts",
            "created",
            "f1",
            "f2"
        FROM
            `FEAST.PUBLIC.FEATURE_TABLE`
          """,
    )
    from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
        PostgreSQLSource,
    )
    
    driver_stats_source = PostgreSQLSource(
        name="feast_driver_hourly_stats",
        query="SELECT * FROM feast_driver_hourly_stats",
        timestamp_field="event_timestamp",
        created_timestamp_column="created",
    )
    from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
        SparkSource,
    )
    
    my_spark_source = SparkSource(
        table="FEATURE_TABLE",
    )
    from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
        SparkSource,
    )
    
    my_spark_source = SparkSource(
        query="SELECT timestamp as ts, created, f1, f2 "
              "FROM spark_table",
    )
    from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
        SparkSource,
    )
    
    my_spark_source = SparkSource(
        path=f"{CURRENT_DIR}/data/driver_hourly_stats",
        file_format="parquet",
        timestamp_field="event_timestamp",
        created_timestamp_column="created",
    )
    from feast import RedshiftSource
    
    my_redshift_source = RedshiftSource(
        query="SELECT timestamp as ts, created, f1, f2 "
              "FROM redshift_table",
    )

    BigQuery

    hashtag
    Description

    BigQuery data sources are BigQuery tables or views. These can be specified either by a table reference or a SQL query. However, no performance guarantees can be provided for SQL query-based sources, so table references are recommended.

    hashtag
    Examples

    Using a table reference:

    Using a query:

    The full set of configuration options is available .

    hashtag
    Supported Types

    BigQuery data sources support all eight primitive types and their corresponding array types. For a comparison against other batch data sources, please see .

    Trino (contrib)

    hashtag
    Description

    Trino data sources are Trino tables or views. These can be specified either by a table reference or a SQL query.

    hashtag
    Disclaimer

    The Trino data source does not achieve full test coverage. Please do not assume complete stability.

    hashtag
    Examples

    Defining a Trino source:

    The full set of configuration options is available .

    hashtag
    Supported Types

    Trino data sources support all eight primitive types, but currently do not support array types. For a comparison against other batch data sources, please see .

    Azure Synapse + Azure SQL (contrib)

    hashtag
    Description

    MsSQL data sources are Microsoft sql table sources. These can be specified either by a table reference or a SQL query.

    hashtag

    from feast import BigQuerySource
    
    my_bigquery_source = BigQuerySource(
        table_ref="gcp_project:bq_dataset.bq_table",
    )
    herearrow-up-right
    here
    herearrow-up-right
    here
    Disclaimer

    The MsSQL data source does not achieve full test coverage. Please do not assume complete stability.

    hashtag
    Examples

    Defining a MsSQL source:

    from feast.infra.offline_stores.contrib.mssql_offline_store.mssqlserver_source import (
        MsSqlServerSource,
    )
    
    driver_hourly_table = "driver_hourly"
    
    driver_source = MsSqlServerSource(
        table_ref=driver_hourly_table,
        event_timestamp_column="datetime",
        created_timestamp_column="created",
    )
    from feast import BigQuerySource
    
    BigQuerySource(
        query="SELECT timestamp as ts, created, f1, f2 "
              "FROM `my_project.my_dataset.my_features`",
    )
    from feast.infra.offline_stores.contrib.trino_offline_store.trino_source import (
        TrinoSource,
    )
    
    driver_hourly_stats = TrinoSource(
        event_timestamp_column="event_timestamp",
        table_ref="feast.driver_stats",
        created_timestamp_column="created",
    )

    Kinesis

    Warning: This is an experimental feature. It's intended for early testing and feedback, and could change without warnings in future releases.

    hashtag
    Description

    Kinesis sources allow users to register Kinesis streams as data sources. Feast currently does not launch or monitor jobs to ingest data from Kinesis. Users are responsible for launching and monitoring their own ingestion jobs, which should write feature values to the online store through FeatureStore.write_to_online_storearrow-up-right. An example of how to launch such a job with Spark to ingest from Kafka can be found herearrow-up-right; by using a different plugin, the example can be adapted to Kinesis. Feast also provides functionality to write to the offline store using the write_to_offline_store functionality.

    Kinesis sources must have a batch source specified. The batch source will be used for retrieving historical features. Thus users are also responsible for writing data from their Kinesis streams to a batch data source such as a data warehouse table. When using a Kinesis source as a stream source in the definition of a feature view, a batch source doesn't need to be specified in the feature view definition explicitly.

    hashtag
    Stream sources

    Streaming data sources are important sources of feature values. A typical setup with streaming data looks like:

    1. Raw events come in (stream 1)

    2. Streaming transformations applied (e.g. generating features like last_N_purchased_categories) (stream 2)

    3. Write stream 2 values to an offline store as a historical log for training (optional)

    hashtag
    Example

    hashtag
    Defining a Kinesis source

    Note that the Kinesis source has a batch source.

    hashtag
    Using the Kinesis source in a stream feature view

    The Kinesis source can be used in a stream feature view.

    hashtag
    Ingesting data

    See for a example of how to ingest data from a Kafka source into Feast. The approach used in the tutorial can be easily adapted to work for Kinesis as well.

    Push

    hashtag
    Description

    Push sources allow feature values to be pushed to the online store and offline store in real time. This allows fresh feature values to be made available to applications. Push sources supercede the FeatureStore.write_to_online_storearrow-up-right.

    Push sources can be used by multiple feature views. When data is pushed to a push source, Feast propagates the feature values to all the consuming feature views.

    Push sources must have a batch source specified. The batch source will be used for retrieving historical features. Thus users are also responsible for pushing data to a batch data source such as a data warehouse table. When using a push source as a stream source in the definition of a feature view, a batch source doesn't need to be specified in the feature view definition explicitly.

    hashtag
    Stream sources

    Streaming data sources are important sources of feature values. A typical setup with streaming data looks like:

    1. Raw events come in (stream 1)

    2. Streaming transformations applied (e.g. generating features like last_N_purchased_categories) (stream 2)

    3. Write stream 2 values to an offline store as a historical log for training (optional)

    Feast allows users to push features previously registered in a feature view to the online store for fresher features. It also allows users to push batches of stream data to the offline store by specifying that the push be directed to the offline store. This will push the data to the offline store declared in the repository configuration used to initialize the feature store.

    hashtag
    Example (basic)

    hashtag
    Defining a push source

    Note that the push schema needs to also include the entity.

    hashtag
    Pushing data

    Note that the to parameter is optional and defaults to online but we can specify these options: PushMode.ONLINE, PushMode.OFFLINE, or PushMode.ONLINE_AND_OFFLINE.

    See also for instructions on how to push data to a deployed feature server.

    hashtag
    Example (Spark Streaming)

    The default option to write features from a stream is to add the Python SDK into your existing PySpark pipeline.

    This can also be used under the hood by a contrib stream processor (see )

    Write stream 2 values to an online store for low latency feature serving

  • Periodically materialize feature values from the offline store into the online store for decreased training-serving skew and improved model performance

  • herearrow-up-right

    Write stream 2 values to an online store for low latency feature serving

  • Periodically materialize feature values from the offline store into the online store for decreased training-serving skew and improved model performance

  • Python feature server
    Tutorial: Building streaming features
    from datetime import timedelta
    
    from feast import Field, FileSource, KinesisSource, stream_feature_view
    from feast.data_format import JsonFormat
    from feast.types import Float32
    
    driver_stats_batch_source = FileSource(
        name="driver_stats_source",
        path="data/driver_stats.parquet",
        timestamp_field="event_timestamp",
    )
    
    driver_stats_stream_source = KinesisSource(
        name="driver_stats_stream",
        stream_name="drivers",
        timestamp_field="event_timestamp",
        batch_source=driver_stats_batch_source,
        record_format=JsonFormat(
            schema_json="driver_id integer, event_timestamp timestamp, conv_rate double, acc_rate double, created timestamp"
        ),
        watermark_delay_threshold=timedelta(minutes=5),
    )
    @stream_feature_view(
        entities=[driver],
        ttl=timedelta(seconds=8640000000),
        mode="spark",
        schema=[
            Field(name="conv_percentage", dtype=Float32),
            Field(name="acc_percentage", dtype=Float32),
        ],
        timestamp_field="event_timestamp",
        online=True,
        source=driver_stats_stream_source,
    )
    def driver_hourly_stats_stream(df: DataFrame):
        from pyspark.sql.functions import col
    
        return (
            df.withColumn("conv_percentage", col("conv_rate") * 100.0)
            .withColumn("acc_percentage", col("acc_rate") * 100.0)
            .drop("conv_rate", "acc_rate")
        )
    from feast import Entity, PushSource, ValueType, BigQuerySource, FeatureView, Feature, Field
    from feast.types import Int64
    
    push_source = PushSource(
        name="push_source",
        batch_source=BigQuerySource(table="test.test"),
    )
    
    user = Entity(name="user", join_keys=["user_id"])
    
    fv = FeatureView(
        name="feature view",
        entities=[user],
        schema=[Field(name="life_time_value", dtype=Int64)],
        source=push_source,
    )
    from feast import FeatureStore
    import pandas as pd
    from feast.data_source import PushMode
    
    fs = FeatureStore(...)
    feature_data_frame = pd.DataFrame()
    fs.push("push_source_name", feature_data_frame, to=PushMode.ONLINE_AND_OFFLINE)
    from feast import FeatureStore
    
    store = FeatureStore(...)
    
    spark = SparkSession.builder.getOrCreate()
    
    streamingDF = spark.readStream.format(...).load()
    
    def feast_writer(spark_df):
        pandas_df = spark_df.to_pandas()
        store.push("driver_hourly_stats", pandas_df)
    
    streamingDF.writeStream.foreachBatch(feast_writer).start()

    Overview

    hashtag
    Functionality

    In Feast, each batch data source is associated with a corresponding offline store. For example, a SnowflakeSource can only be processed by the Snowflake offline store. Otherwise, the primary difference between batch data sources is the set of supported types. Feast has an internal type system, and aims to support eight primitive types (bytes, string, int32, int64, float32, float64, bool, and timestamp) along with the corresponding array types. However, not every batch data source supports all of these types.

    For more details on the Feast type system, see .

    hashtag
    Functionality Matrix

    There are currently four core batch data source implementations: FileSource, BigQuerySource, SnowflakeSource, and RedshiftSource. There are several additional implementations contributed by the Feast community (PostgreSQLSource, SparkSource, and TrinoSource), which are not guaranteed to be stable or to match the functionality of the core implementations. Details for each specific data source can be found .

    Below is a matrix indicating which data sources support which types.

    File
    BigQuery
    Snowflake
    Redshift
    Postgres
    Spark
    Trino

    yes

    yes

    yes

    yes

    yes

    string

    yes

    yes

    yes

    yes

    yes

    yes

    yes

    int32

    yes

    yes

    yes

    yes

    yes

    yes

    yes

    int64

    yes

    yes

    yes

    yes

    yes

    yes

    yes

    float32

    yes

    yes

    yes

    yes

    yes

    yes

    yes

    float64

    yes

    yes

    yes

    yes

    yes

    yes

    yes

    bool

    yes

    yes

    yes

    yes

    yes

    yes

    yes

    timestamp

    yes

    yes

    yes

    yes

    yes

    yes

    yes

    array types

    yes

    yes

    no

    no

    yes

    yes

    no

    bytes

    yes

    here
    here

    yes