Ray (contrib)
The Ray compute engine is a distributed compute implementation that leverages Ray for executing feature pipelines including transformations, aggregations, joins, and materializations. It provides scalable and efficient distributed processing for both materialize() and get_historical_features() operations.
Quick Start with Ray Template
Ray RAG Template - Batch Embedding at Scale
For RAG (Retrieval-Augmented Generation) applications with distributed embedding generation:
feast init -t ray_rag my_rag_project
cd my_rag_project/feature_repoThe Ray RAG template demonstrates:
Parallel Embedding Generation: Uses Ray compute engine to generate embeddings across multiple workers
Vector Search Integration: Works with Milvus for semantic similarity search
Complete RAG Pipeline: Data → Embeddings → Search workflow
The Ray compute engine automatically distributes the embedding generation across available workers, making it ideal for processing large datasets efficiently.
Overview
The Ray compute engine provides:
Distributed DAG Execution: Executes feature computation DAGs across Ray clusters
Intelligent Join Strategies: Automatic selection between broadcast and distributed joins
Lazy Evaluation: Deferred execution for optimal performance
Resource Management: Automatic scaling and resource optimization
Point-in-Time Joins: Efficient temporal joins for historical feature retrieval
GPU Support: Schedule transformation workers on GPU nodes via
num_gpusconfig (all modes including KubeRay)
Architecture
The Ray compute engine follows Feast's DAG-based architecture:
Core Components
RayComputeEngine
Main engine implementing ComputeEngine interface
RayFeatureBuilder
Constructs DAG from Feature View definitions
RayDAGNode
Ray-specific DAG node implementations
RayDAGRetrievalJob
Executes retrieval plans and returns results
RayMaterializationJob
Handles materialization job tracking
Configuration
Configure the Ray compute engine in your feature_store.yaml:
Configuration Options
type
string
"ray.engine"
Must be ray.engine
max_workers
int
None (uses all cores)
Maximum number of Ray workers
enable_optimization
boolean
true
Enable performance optimizations
broadcast_join_threshold_mb
int
100
Size threshold for broadcast joins (MB)
max_parallelism_multiplier
int
2
Parallelism as multiple of CPU cores
target_partition_size_mb
int
64
Target partition size (MB)
window_size_for_joins
string
"1H"
Time window for distributed joins
ray_address
string
None
Ray cluster address (triggers REMOTE mode)
use_kuberay
boolean
None
Enable KubeRay mode (overrides ray_address)
kuberay_conf
dict
None
KubeRay configuration dict with keys: cluster_name (required), namespace (default: "default"), auth_token, auth_server, skip_tls (default: false)
enable_ray_logging
boolean
false
Enable Ray progress bars and logging
enable_distributed_joins
boolean
true
Enable distributed joins for large datasets
staging_location
string
None
Remote path for batch materialization jobs
ray_conf
dict
None
Ray configuration parameters (memory, CPU limits)
num_gpus
float
None
Number of GPUs to request per worker task. Requires GPU nodes in the Ray cluster. Fractional values (e.g. 0.5) are supported. Supported in all modes including KubeRay.
gpu_batch_format
string
"pandas"
Batch format for map_batches when num_gpus is set. Use "numpy" or "pyarrow" for GPU-native libraries (e.g. cuDF, PyTorch).
worker_task_options
dict
None
Arbitrary Ray .options() kwargs applied to every worker task. See Worker Resource Scheduling for the full reference.
Mode Detection Precedence
The Ray compute engine automatically detects the execution mode:
Environment Variables → KubeRay mode (if
FEAST_RAY_USE_KUBERAY=true)Config
kuberay_conf→ KubeRay modeConfig
ray_address→ Remote modeDefault → Local mode
Usage Examples
Basic Historical Feature Retrieval
Batch Materialization
Large-Scale Feature Retrieval
Advanced Configuration
Complete Example Configuration
Here's a complete example configuration showing how to use Ray offline store with Ray compute engine:
DAG Node Types
The Ray compute engine implements several specialized DAG nodes:
RayReadNode
Reads data from Ray-compatible sources:
Supports Parquet, CSV, and other formats
Handles partitioning and schema inference
Applies field mappings and filters
RayJoinNode
Performs distributed joins:
Broadcast Join: For small datasets (<100MB)
Distributed Join: For large datasets with time-based windowing
Automatic Strategy Selection: Based on dataset size and cluster resources
RayFilterNode
Applies filters and time-based constraints:
TTL-based filtering
Timestamp range filtering
Custom predicate filtering
RayAggregationNode
Handles feature aggregations:
Windowed aggregations
Grouped aggregations
Custom aggregation functions
RayTransformationNode
Applies feature transformations:
Row-level transformations
Column-level transformations
Custom transformation functions
RayWriteNode
Writes results to various targets:
Online stores
Offline stores
Temporary storage
Join Strategies
The Ray compute engine automatically selects optimal join strategies:
Broadcast Join
Used for small feature datasets:
Automatically selected when feature data < 100MB
Features are cached in Ray's object store
Entities are distributed across cluster
Each worker gets a copy of feature data
Distributed Windowed Join
Used for large feature datasets:
Automatically selected when feature data > 100MB
Data is partitioned by time windows
Point-in-time joins within each window
Results are combined across windows
Strategy Selection Logic
Performance Optimization
Automatic Optimization
The Ray compute engine includes several automatic optimizations:
Partition Optimization: Automatically determines optimal partition sizes
Join Strategy Selection: Chooses between broadcast and distributed joins
Resource Allocation: Scales workers based on available resources
Memory Management: Handles out-of-core processing for large datasets
Manual Tuning
For specific workloads, you can fine-tune performance:
Monitoring and Metrics
Monitor Ray compute engine performance:
Worker Resource Scheduling
worker_task_options is a passthrough dict of Ray .options() kwargs applied to every worker task Feast dispatches. It pairs with ray_conf (cluster-level ray.init options) — worker_task_options targets individual worker tasks. Options are forwarded at two levels so Ray schedules correctly:
On the
@ray.remoteorchestration task via.options(**worker_task_options)— controls node selection.Inside
map_batchesfor the scheduling-relevant subset (num_gpus,num_cpus,accelerator_type,resources) — controls which nodes run the data workers.
This is supported across all execution modes: local, remote, and KubeRay.
Common worker_task_options keys
worker_task_options keysnum_cpus
float
CPUs per task (default: 1). Fractional values supported.
memory
int
Heap memory in bytes (e.g. 8589934592 for 8 GB).
accelerator_type
string
Pin tasks to a specific GPU model — "A100", "T4", "V100", etc. Useful on KubeRay clusters with mixed GPU node pools.
resources
dict
Custom/Kubernetes extended resource labels, e.g. {"intel.com/gpu": 1}.
runtime_env
dict
Per-task Ray runtime environment — pip, conda, env_vars, working_dir, etc. For KubeRay, use this to install packages on worker pods without rebuilding images.
max_retries
int
Task retry count on worker failure (default: 3).
scheduling_strategy
string
"DEFAULT", "SPREAD", or a placement group strategy.
For the full list of supported keys see the Ray RemoteFunction.options() API docs.
GPU support
num_gpus is the only first-class GPU field because it also drives gpu_batch_format selection inside Feast. Set it directly rather than inside worker_task_options:
When num_gpus is set your transformation UDF runs on a GPU worker:
Full example — KubeRay with GPU + all common options
Checking cluster resources
Integration Examples
With Spark Offline Store
With Cloud Storage
With Feature Transformations
On-Demand Transformations
Ray Native Transformations
For distributed transformations that leverage Ray's dataset and parallel processing capabilities, use mode="ray" in your BatchFeatureView:
For more information, see the Ray documentation and Ray Data guide.
Last updated
Was this helpful?