Ray (contrib)

The Ray compute engine is a distributed compute implementation that leverages Ray for executing feature pipelines including transformations, aggregations, joins, and materializations. It provides scalable and efficient distributed processing for both materialize() and get_historical_features() operations.

Quick Start with Ray Template

Ray RAG Template - Batch Embedding at Scale

For RAG (Retrieval-Augmented Generation) applications with distributed embedding generation:

feast init -t ray_rag my_rag_project
cd my_rag_project/feature_repo

The Ray RAG template demonstrates:

  • Parallel Embedding Generation: Uses Ray compute engine to generate embeddings across multiple workers

  • Vector Search Integration: Works with Milvus for semantic similarity search

  • Complete RAG Pipeline: Data → Embeddings → Search workflow

The Ray compute engine automatically distributes the embedding generation across available workers, making it ideal for processing large datasets efficiently.

Overview

The Ray compute engine provides:

  • Distributed DAG Execution: Executes feature computation DAGs across Ray clusters

  • Intelligent Join Strategies: Automatic selection between broadcast and distributed joins

  • Lazy Evaluation: Deferred execution for optimal performance

  • Resource Management: Automatic scaling and resource optimization

  • Point-in-Time Joins: Efficient temporal joins for historical feature retrieval

  • GPU Support: Schedule transformation workers on GPU nodes via num_gpus config (all modes including KubeRay)

Architecture

The Ray compute engine follows Feast's DAG-based architecture:

Core Components

Component
Description

RayComputeEngine

Main engine implementing ComputeEngine interface

RayFeatureBuilder

Constructs DAG from Feature View definitions

RayDAGNode

Ray-specific DAG node implementations

RayDAGRetrievalJob

Executes retrieval plans and returns results

RayMaterializationJob

Handles materialization job tracking

Configuration

Configure the Ray compute engine in your feature_store.yaml:

Configuration Options

Option
Type
Default
Description

type

string

"ray.engine"

Must be ray.engine

max_workers

int

None (uses all cores)

Maximum number of Ray workers

enable_optimization

boolean

true

Enable performance optimizations

broadcast_join_threshold_mb

int

100

Size threshold for broadcast joins (MB)

max_parallelism_multiplier

int

2

Parallelism as multiple of CPU cores

target_partition_size_mb

int

64

Target partition size (MB)

window_size_for_joins

string

"1H"

Time window for distributed joins

ray_address

string

None

Ray cluster address (triggers REMOTE mode)

use_kuberay

boolean

None

Enable KubeRay mode (overrides ray_address)

kuberay_conf

dict

None

KubeRay configuration dict with keys: cluster_name (required), namespace (default: "default"), auth_token, auth_server, skip_tls (default: false)

enable_ray_logging

boolean

false

Enable Ray progress bars and logging

enable_distributed_joins

boolean

true

Enable distributed joins for large datasets

staging_location

string

None

Remote path for batch materialization jobs

ray_conf

dict

None

Ray configuration parameters (memory, CPU limits)

num_gpus

float

None

Number of GPUs to request per worker task. Requires GPU nodes in the Ray cluster. Fractional values (e.g. 0.5) are supported. Supported in all modes including KubeRay.

gpu_batch_format

string

"pandas"

Batch format for map_batches when num_gpus is set. Use "numpy" or "pyarrow" for GPU-native libraries (e.g. cuDF, PyTorch).

worker_task_options

dict

None

Arbitrary Ray .options() kwargs applied to every worker task. See Worker Resource Scheduling for the full reference.

Mode Detection Precedence

The Ray compute engine automatically detects the execution mode:

  1. Environment Variables → KubeRay mode (if FEAST_RAY_USE_KUBERAY=true)

  2. Config kuberay_conf → KubeRay mode

  3. Config ray_address → Remote mode

  4. Default → Local mode

Usage Examples

Basic Historical Feature Retrieval

Batch Materialization

Large-Scale Feature Retrieval

Advanced Configuration

Complete Example Configuration

Here's a complete example configuration showing how to use Ray offline store with Ray compute engine:

DAG Node Types

The Ray compute engine implements several specialized DAG nodes:

RayReadNode

Reads data from Ray-compatible sources:

  • Supports Parquet, CSV, and other formats

  • Handles partitioning and schema inference

  • Applies field mappings and filters

RayJoinNode

Performs distributed joins:

  • Broadcast Join: For small datasets (<100MB)

  • Distributed Join: For large datasets with time-based windowing

  • Automatic Strategy Selection: Based on dataset size and cluster resources

RayFilterNode

Applies filters and time-based constraints:

  • TTL-based filtering

  • Timestamp range filtering

  • Custom predicate filtering

RayAggregationNode

Handles feature aggregations:

  • Windowed aggregations

  • Grouped aggregations

  • Custom aggregation functions

RayTransformationNode

Applies feature transformations:

  • Row-level transformations

  • Column-level transformations

  • Custom transformation functions

RayWriteNode

Writes results to various targets:

  • Online stores

  • Offline stores

  • Temporary storage

Join Strategies

The Ray compute engine automatically selects optimal join strategies:

Broadcast Join

Used for small feature datasets:

  • Automatically selected when feature data < 100MB

  • Features are cached in Ray's object store

  • Entities are distributed across cluster

  • Each worker gets a copy of feature data

Distributed Windowed Join

Used for large feature datasets:

  • Automatically selected when feature data > 100MB

  • Data is partitioned by time windows

  • Point-in-time joins within each window

  • Results are combined across windows

Strategy Selection Logic

Performance Optimization

Automatic Optimization

The Ray compute engine includes several automatic optimizations:

  1. Partition Optimization: Automatically determines optimal partition sizes

  2. Join Strategy Selection: Chooses between broadcast and distributed joins

  3. Resource Allocation: Scales workers based on available resources

  4. Memory Management: Handles out-of-core processing for large datasets

Manual Tuning

For specific workloads, you can fine-tune performance:

Monitoring and Metrics

Monitor Ray compute engine performance:

Worker Resource Scheduling

worker_task_options is a passthrough dict of Ray .options() kwargs applied to every worker task Feast dispatches. It pairs with ray_conf (cluster-level ray.init options) — worker_task_options targets individual worker tasks. Options are forwarded at two levels so Ray schedules correctly:

  1. On the @ray.remote orchestration task via .options(**worker_task_options) — controls node selection.

  2. Inside map_batches for the scheduling-relevant subset (num_gpus, num_cpus, accelerator_type, resources) — controls which nodes run the data workers.

This is supported across all execution modes: local, remote, and KubeRay.

Common worker_task_options keys

Key
Type
Description

num_cpus

float

CPUs per task (default: 1). Fractional values supported.

memory

int

Heap memory in bytes (e.g. 8589934592 for 8 GB).

accelerator_type

string

Pin tasks to a specific GPU model — "A100", "T4", "V100", etc. Useful on KubeRay clusters with mixed GPU node pools.

resources

dict

Custom/Kubernetes extended resource labels, e.g. {"intel.com/gpu": 1}.

runtime_env

dict

Per-task Ray runtime environmentpip, conda, env_vars, working_dir, etc. For KubeRay, use this to install packages on worker pods without rebuilding images.

max_retries

int

Task retry count on worker failure (default: 3).

scheduling_strategy

string

"DEFAULT", "SPREAD", or a placement group strategy.

For the full list of supported keys see the Ray RemoteFunction.options() API docs.

GPU support

num_gpus is the only first-class GPU field because it also drives gpu_batch_format selection inside Feast. Set it directly rather than inside worker_task_options:

When num_gpus is set your transformation UDF runs on a GPU worker:

Full example — KubeRay with GPU + all common options

Checking cluster resources

Integration Examples

With Spark Offline Store

With Cloud Storage

With Feature Transformations

On-Demand Transformations

Ray Native Transformations

For distributed transformations that leverage Ray's dataset and parallel processing capabilities, use mode="ray" in your BatchFeatureView:

For more information, see the Ray documentation and Ray Data guide.

Last updated

Was this helpful?