Spark (contrib)

Description

Spark Compute Engine provides a distributed execution engine for batch materialization operations (materialize and materialize-incremental) and historical retrieval operations (get_historical_features).

It is designed to handle large-scale data processing and can be used with various offline stores, such as Snowflake, BigQuery, and Spark SQL.

Design

The Spark Compute engine is implemented as a subclass of feast.infra.compute_engine.ComputeEngine. Offline store is used to read and write data, while the Spark engine is used to perform transformations and aggregations on the data. The engine supports the following features:

  • Support for reading different data sources, such as Spark SQL, BigQuery, and Snowflake.

  • Distributed execution of feature transformations and aggregations.

  • Support for custom transformations using Spark SQL or UDFs.

Example

feature_store.yaml
...
offline_store:
  type: snowflake.offline
...
batch_engine:
  type: spark.engine
  partitions: 10 # number of partitions when writing to the online or offline store
  spark_conf:
    spark.master: "local[*]"
    spark.app.name: "Feast Spark Engine"
    spark.sql.shuffle.partitions: 100
    spark.executor.memory: "4g"

Example in Python

feature_store.py
from feast import FeatureStore, RepoConfig
from feast.repo_config import RegistryConfig
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
from feast.infra.offline_stores.contrib.spark_offline_store.spark import SparkOfflineStoreConfig

repo_config = RepoConfig(
    registry="s3://[YOUR_BUCKET]/feast-registry.db",
    project="feast_repo",
    provider="aws",
    offline_store=SparkOfflineStoreConfig(
      spark_conf={
        "spark.ui.enabled": "false",
        "spark.eventLog.enabled": "false",
        "spark.sql.catalogImplementation": "hive",
        "spark.sql.parser.quotedRegexColumnNames": "true",
        "spark.sql.session.timeZone": "UTC"
      }
    ),
    batch_engine={
      "type": "spark.engine",
      "partitions": 10
    },
    online_store=DynamoDBOnlineStoreConfig(region="us-west-1"),
    entity_key_serialization_version=3
)

store = FeatureStore(config=repo_config)

Last updated

Was this helpful?