LogoLogo
v0.30-branch
v0.30-branch
  • Introduction
  • Community & getting help
  • Roadmap
  • Changelog
  • Getting started
    • Quickstart
    • Concepts
      • Overview
      • Data ingestion
      • Entity
      • Feature view
      • Feature retrieval
      • Point-in-time joins
      • Registry
      • [Alpha] Saved dataset
    • Architecture
      • Overview
      • Registry
      • Offline store
      • Online store
      • Batch Materialization Engine
      • Provider
    • Third party integrations
    • FAQ
  • Tutorials
    • Sample use-case tutorials
      • Driver ranking
      • Fraud detection on GCP
      • Real-time credit scoring on AWS
      • Driver stats on Snowflake
    • Validating historical features with Great Expectations
    • Using Scalable Registry
    • Building streaming features
  • How-to Guides
    • Running Feast with Snowflake/GCP/AWS
      • Install Feast
      • Create a feature repository
      • Deploy a feature store
      • Build a training dataset
      • Load data into the online store
      • Read features from the online store
      • Scaling Feast
      • Structuring Feature Repos
    • Running Feast in production (e.g. on Kubernetes)
    • Upgrading for Feast 0.20+
    • Customizing Feast
      • Adding a custom batch materialization engine
      • Adding a new offline store
      • Adding a new online store
      • Adding a custom provider
    • Adding or reusing tests
  • Reference
    • Codebase Structure
    • Type System
    • Data sources
      • Overview
      • File
      • Snowflake
      • BigQuery
      • Redshift
      • Push
      • Kafka
      • Kinesis
      • Spark (contrib)
      • PostgreSQL (contrib)
      • Trino (contrib)
      • Azure Synapse + Azure SQL (contrib)
    • Offline stores
      • Overview
      • File
      • Snowflake
      • BigQuery
      • Redshift
      • Spark (contrib)
      • PostgreSQL (contrib)
      • Trino (contrib)
      • Azure Synapse + Azure SQL (contrib)
    • Online stores
      • Overview
      • SQLite
      • Snowflake
      • Redis
      • Datastore
      • DynamoDB
      • Bigtable
      • PostgreSQL (contrib)
      • Cassandra + Astra DB (contrib)
      • MySQL (contrib)
      • Rockset (contrib)
    • Providers
      • Local
      • Google Cloud Platform
      • Amazon Web Services
      • Azure
    • Batch Materialization Engines
      • Bytewax
      • Snowflake
      • AWS Lambda (alpha)
      • Spark (contrib)
    • Feature repository
      • feature_store.yaml
      • .feastignore
    • Feature servers
      • Python feature server
      • [Alpha] Go feature server
      • [Alpha] AWS Lambda feature server
    • [Beta] Web UI
    • [Alpha] On demand feature view
    • [Alpha] Data quality monitoring
    • Feast CLI reference
    • Python API reference
    • Usage
  • Project
    • Contribution process
    • Development guide
    • Backwards Compatibility Policy
      • Maintainer Docs
    • Versioning policy
    • Release process
    • Feast 0.9 vs Feast 0.10+
Powered by GitBook
On this page
  • Description
  • Stream sources
  • Example (basic)
  • Defining a push source
  • Pushing data
  • Example (Spark Streaming)

Was this helpful?

Edit on GitHub
Export as PDF
  1. Reference
  2. Data sources

Push

PreviousRedshiftNextKafka

Last updated 2 years ago

Was this helpful?

Description

Push sources allow feature values to be pushed to the online store and offline store in real time. This allows fresh feature values to be made available to applications. Push sources supercede the .

Push sources can be used by multiple feature views. When data is pushed to a push source, Feast propagates the feature values to all the consuming feature views.

Push sources must have a batch source specified. The batch source will be used for retrieving historical features. Thus users are also responsible for pushing data to a batch data source such as a data warehouse table. When using a push source as a stream source in the definition of a feature view, a batch source doesn't need to be specified in the feature view definition explicitly.

Stream sources

Streaming data sources are important sources of feature values. A typical setup with streaming data looks like:

  1. Raw events come in (stream 1)

  2. Streaming transformations applied (e.g. generating features like last_N_purchased_categories) (stream 2)

  3. Write stream 2 values to an offline store as a historical log for training (optional)

  4. Write stream 2 values to an online store for low latency feature serving

  5. Periodically materialize feature values from the offline store into the online store for decreased training-serving skew and improved model performance

Feast allows users to push features previously registered in a feature view to the online store for fresher features. It also allows users to push batches of stream data to the offline store by specifying that the push be directed to the offline store. This will push the data to the offline store declared in the repository configuration used to initialize the feature store.

Example (basic)

Defining a push source

Note that the push schema needs to also include the entity.

from feast import Entity, PushSource, ValueType, BigQuerySource, FeatureView, Feature, Field
from feast.types import Int64

push_source = PushSource(
    name="push_source",
    batch_source=BigQuerySource(table="test.test"),
)

user = Entity(name="user", join_keys=["user_id"])

fv = FeatureView(
    name="feature view",
    entities=[user],
    schema=[Field(name="life_time_value", dtype=Int64)],
    source=push_source,
)

Pushing data

Note that the to parameter is optional and defaults to online but we can specify these options: PushMode.ONLINE, PushMode.OFFLINE, or PushMode.ONLINE_AND_OFFLINE.

from feast import FeatureStore
import pandas as pd
from feast.data_source import PushMode

fs = FeatureStore(...)
feature_data_frame = pd.DataFrame()
fs.push("push_source_name", feature_data_frame, to=PushMode.ONLINE_AND_OFFLINE)

Example (Spark Streaming)

The default option to write features from a stream is to add the Python SDK into your existing PySpark pipeline.

from feast import FeatureStore

store = FeatureStore(...)

spark = SparkSession.builder.getOrCreate()

streamingDF = spark.readStream.format(...).load()

def feast_writer(spark_df):
    pandas_df = spark_df.to_pandas()
    store.push("driver_hourly_stats", pandas_df)

streamingDF.writeStream.foreachBatch(feast_writer).start()

See also for instructions on how to push data to a deployed feature server.

This can also be used under the hood by a contrib stream processor (see )

FeatureStore.write_to_online_store
Python feature server
Tutorial: Building streaming features