Ray (contrib)
The Ray compute engine is a distributed compute implementation that leverages Ray for executing feature pipelines including transformations, aggregations, joins, and materializations. It provides scalable and efficient distributed processing for both materialize() and get_historical_features() operations.
Quick Start with Ray Template
Ray RAG Template - Batch Embedding at Scale
For RAG (Retrieval-Augmented Generation) applications with distributed embedding generation:
feast init -t ray_rag my_rag_project
cd my_rag_project/feature_repoThe Ray RAG template demonstrates:
Parallel Embedding Generation: Uses Ray compute engine to generate embeddings across multiple workers
Vector Search Integration: Works with Milvus for semantic similarity search
Complete RAG Pipeline: Data → Embeddings → Search workflow
The Ray compute engine automatically distributes the embedding generation across available workers, making it ideal for processing large datasets efficiently.
Overview
The Ray compute engine provides:
Distributed DAG Execution: Executes feature computation DAGs across Ray clusters
Intelligent Join Strategies: Automatic selection between broadcast and distributed joins
Lazy Evaluation: Deferred execution for optimal performance
Resource Management: Automatic scaling and resource optimization
Point-in-Time Joins: Efficient temporal joins for historical feature retrieval
Architecture
The Ray compute engine follows Feast's DAG-based architecture:
Core Components
RayComputeEngine
Main engine implementing ComputeEngine interface
RayFeatureBuilder
Constructs DAG from Feature View definitions
RayDAGNode
Ray-specific DAG node implementations
RayDAGRetrievalJob
Executes retrieval plans and returns results
RayMaterializationJob
Handles materialization job tracking
Configuration
Configure the Ray compute engine in your feature_store.yaml:
Configuration Options
type
string
"ray.engine"
Must be ray.engine
max_workers
int
None (uses all cores)
Maximum number of Ray workers
enable_optimization
boolean
true
Enable performance optimizations
broadcast_join_threshold_mb
int
100
Size threshold for broadcast joins (MB)
max_parallelism_multiplier
int
2
Parallelism as multiple of CPU cores
target_partition_size_mb
int
64
Target partition size (MB)
window_size_for_joins
string
"1H"
Time window for distributed joins
ray_address
string
None
Ray cluster address (triggers REMOTE mode)
use_kuberay
boolean
None
Enable KubeRay mode (overrides ray_address)
kuberay_conf
dict
None
KubeRay configuration dict with keys: cluster_name (required), namespace (default: "default"), auth_token, auth_server, skip_tls (default: false)
enable_ray_logging
boolean
false
Enable Ray progress bars and logging
enable_distributed_joins
boolean
true
Enable distributed joins for large datasets
staging_location
string
None
Remote path for batch materialization jobs
ray_conf
dict
None
Ray configuration parameters (memory, CPU limits)
Mode Detection Precedence
The Ray compute engine automatically detects the execution mode:
Environment Variables → KubeRay mode (if
FEAST_RAY_USE_KUBERAY=true)Config
kuberay_conf→ KubeRay modeConfig
ray_address→ Remote modeDefault → Local mode
Usage Examples
Basic Historical Feature Retrieval
Batch Materialization
Large-Scale Feature Retrieval
Advanced Configuration
Complete Example Configuration
Here's a complete example configuration showing how to use Ray offline store with Ray compute engine:
DAG Node Types
The Ray compute engine implements several specialized DAG nodes:
RayReadNode
Reads data from Ray-compatible sources:
Supports Parquet, CSV, and other formats
Handles partitioning and schema inference
Applies field mappings and filters
RayJoinNode
Performs distributed joins:
Broadcast Join: For small datasets (<100MB)
Distributed Join: For large datasets with time-based windowing
Automatic Strategy Selection: Based on dataset size and cluster resources
RayFilterNode
Applies filters and time-based constraints:
TTL-based filtering
Timestamp range filtering
Custom predicate filtering
RayAggregationNode
Handles feature aggregations:
Windowed aggregations
Grouped aggregations
Custom aggregation functions
RayTransformationNode
Applies feature transformations:
Row-level transformations
Column-level transformations
Custom transformation functions
RayWriteNode
Writes results to various targets:
Online stores
Offline stores
Temporary storage
Join Strategies
The Ray compute engine automatically selects optimal join strategies:
Broadcast Join
Used for small feature datasets:
Automatically selected when feature data < 100MB
Features are cached in Ray's object store
Entities are distributed across cluster
Each worker gets a copy of feature data
Distributed Windowed Join
Used for large feature datasets:
Automatically selected when feature data > 100MB
Data is partitioned by time windows
Point-in-time joins within each window
Results are combined across windows
Strategy Selection Logic
Performance Optimization
Automatic Optimization
The Ray compute engine includes several automatic optimizations:
Partition Optimization: Automatically determines optimal partition sizes
Join Strategy Selection: Chooses between broadcast and distributed joins
Resource Allocation: Scales workers based on available resources
Memory Management: Handles out-of-core processing for large datasets
Manual Tuning
For specific workloads, you can fine-tune performance:
Monitoring and Metrics
Monitor Ray compute engine performance:
Integration Examples
With Spark Offline Store
With Cloud Storage
With Feature Transformations
On-Demand Transformations
Ray Native Transformations
For distributed transformations that leverage Ray's dataset and parallel processing capabilities, use mode="ray" in your BatchFeatureView:
For more information, see the Ray documentation and Ray Data guide.
Last updated
Was this helpful?