Ray (contrib)
The Ray compute engine is a distributed compute implementation that leverages Ray for executing feature pipelines including transformations, aggregations, joins, and materializations. It provides scalable and efficient distributed processing for both materialize() and get_historical_features() operations.
Overview
The Ray compute engine provides:
Distributed DAG Execution: Executes feature computation DAGs across Ray clusters
Intelligent Join Strategies: Automatic selection between broadcast and distributed joins
Lazy Evaluation: Deferred execution for optimal performance
Resource Management: Automatic scaling and resource optimization
Point-in-Time Joins: Efficient temporal joins for historical feature retrieval
Architecture
The Ray compute engine follows Feast's DAG-based architecture:
EntityDF → RayReadNode → RayJoinNode → RayFilterNode → RayAggregationNode → RayTransformationNode → OutputCore Components
RayComputeEngine
Main engine implementing ComputeEngine interface
RayFeatureBuilder
Constructs DAG from Feature View definitions
RayDAGNode
Ray-specific DAG node implementations
RayDAGRetrievalJob
Executes retrieval plans and returns results
RayMaterializationJob
Handles materialization job tracking
Configuration
Configure the Ray compute engine in your feature_store.yaml:
Configuration Options
type
string
"ray.engine"
Must be ray.engine
max_workers
int
None (uses all cores)
Maximum number of Ray workers
enable_optimization
boolean
true
Enable performance optimizations
broadcast_join_threshold_mb
int
100
Size threshold for broadcast joins (MB)
max_parallelism_multiplier
int
2
Parallelism as multiple of CPU cores
target_partition_size_mb
int
64
Target partition size (MB)
window_size_for_joins
string
"1H"
Time window for distributed joins
ray_address
string
None
Ray cluster address (None = local Ray)
enable_distributed_joins
boolean
true
Enable distributed joins for large datasets
staging_location
string
None
Remote path for batch materialization jobs
ray_conf
dict
None
Ray configuration parameters
execution_timeout_seconds
int
None
Timeout for job execution in seconds
Usage Examples
Basic Historical Feature Retrieval
Batch Materialization
Large-Scale Feature Retrieval
Advanced Configuration
Complete Example Configuration
Here's a complete example configuration showing how to use Ray offline store with Ray compute engine:
DAG Node Types
The Ray compute engine implements several specialized DAG nodes:
RayReadNode
Reads data from Ray-compatible sources:
Supports Parquet, CSV, and other formats
Handles partitioning and schema inference
Applies field mappings and filters
RayJoinNode
Performs distributed joins:
Broadcast Join: For small datasets (<100MB)
Distributed Join: For large datasets with time-based windowing
Automatic Strategy Selection: Based on dataset size and cluster resources
RayFilterNode
Applies filters and time-based constraints:
TTL-based filtering
Timestamp range filtering
Custom predicate filtering
RayAggregationNode
Handles feature aggregations:
Windowed aggregations
Grouped aggregations
Custom aggregation functions
RayTransformationNode
Applies feature transformations:
Row-level transformations
Column-level transformations
Custom transformation functions
RayWriteNode
Writes results to various targets:
Online stores
Offline stores
Temporary storage
Join Strategies
The Ray compute engine automatically selects optimal join strategies:
Broadcast Join
Used for small feature datasets:
Automatically selected when feature data < 100MB
Features are cached in Ray's object store
Entities are distributed across cluster
Each worker gets a copy of feature data
Distributed Windowed Join
Used for large feature datasets:
Automatically selected when feature data > 100MB
Data is partitioned by time windows
Point-in-time joins within each window
Results are combined across windows
Strategy Selection Logic
Performance Optimization
Automatic Optimization
The Ray compute engine includes several automatic optimizations:
Partition Optimization: Automatically determines optimal partition sizes
Join Strategy Selection: Chooses between broadcast and distributed joins
Resource Allocation: Scales workers based on available resources
Memory Management: Handles out-of-core processing for large datasets
Manual Tuning
For specific workloads, you can fine-tune performance:
Monitoring and Metrics
Monitor Ray compute engine performance:
Integration Examples
With Spark Offline Store
With Cloud Storage
With Feature Transformations
For more information, see the Ray documentation and Ray Data guide.
Last updated
Was this helpful?