Ray (contrib)

The Ray compute engine is a distributed compute implementation that leverages Ray for executing feature pipelines including transformations, aggregations, joins, and materializations. It provides scalable and efficient distributed processing for both materialize() and get_historical_features() operations.

Overview

The Ray compute engine provides:

  • Distributed DAG Execution: Executes feature computation DAGs across Ray clusters

  • Intelligent Join Strategies: Automatic selection between broadcast and distributed joins

  • Lazy Evaluation: Deferred execution for optimal performance

  • Resource Management: Automatic scaling and resource optimization

  • Point-in-Time Joins: Efficient temporal joins for historical feature retrieval

Architecture

The Ray compute engine follows Feast's DAG-based architecture:

EntityDF → RayReadNode → RayJoinNode → RayFilterNode → RayAggregationNode → RayTransformationNode → Output

Core Components

Component
Description

RayComputeEngine

Main engine implementing ComputeEngine interface

RayFeatureBuilder

Constructs DAG from Feature View definitions

RayDAGNode

Ray-specific DAG node implementations

RayDAGRetrievalJob

Executes retrieval plans and returns results

RayMaterializationJob

Handles materialization job tracking

Configuration

Configure the Ray compute engine in your feature_store.yaml:

Configuration Options

Option
Type
Default
Description

type

string

"ray.engine"

Must be ray.engine

max_workers

int

None (uses all cores)

Maximum number of Ray workers

enable_optimization

boolean

true

Enable performance optimizations

broadcast_join_threshold_mb

int

100

Size threshold for broadcast joins (MB)

max_parallelism_multiplier

int

2

Parallelism as multiple of CPU cores

target_partition_size_mb

int

64

Target partition size (MB)

window_size_for_joins

string

"1H"

Time window for distributed joins

ray_address

string

None

Ray cluster address (None = local Ray)

enable_distributed_joins

boolean

true

Enable distributed joins for large datasets

staging_location

string

None

Remote path for batch materialization jobs

ray_conf

dict

None

Ray configuration parameters

execution_timeout_seconds

int

None

Timeout for job execution in seconds

Usage Examples

Basic Historical Feature Retrieval

Batch Materialization

Large-Scale Feature Retrieval

Advanced Configuration

Complete Example Configuration

Here's a complete example configuration showing how to use Ray offline store with Ray compute engine:

DAG Node Types

The Ray compute engine implements several specialized DAG nodes:

RayReadNode

Reads data from Ray-compatible sources:

  • Supports Parquet, CSV, and other formats

  • Handles partitioning and schema inference

  • Applies field mappings and filters

RayJoinNode

Performs distributed joins:

  • Broadcast Join: For small datasets (<100MB)

  • Distributed Join: For large datasets with time-based windowing

  • Automatic Strategy Selection: Based on dataset size and cluster resources

RayFilterNode

Applies filters and time-based constraints:

  • TTL-based filtering

  • Timestamp range filtering

  • Custom predicate filtering

RayAggregationNode

Handles feature aggregations:

  • Windowed aggregations

  • Grouped aggregations

  • Custom aggregation functions

RayTransformationNode

Applies feature transformations:

  • Row-level transformations

  • Column-level transformations

  • Custom transformation functions

RayWriteNode

Writes results to various targets:

  • Online stores

  • Offline stores

  • Temporary storage

Join Strategies

The Ray compute engine automatically selects optimal join strategies:

Broadcast Join

Used for small feature datasets:

  • Automatically selected when feature data < 100MB

  • Features are cached in Ray's object store

  • Entities are distributed across cluster

  • Each worker gets a copy of feature data

Distributed Windowed Join

Used for large feature datasets:

  • Automatically selected when feature data > 100MB

  • Data is partitioned by time windows

  • Point-in-time joins within each window

  • Results are combined across windows

Strategy Selection Logic

Performance Optimization

Automatic Optimization

The Ray compute engine includes several automatic optimizations:

  1. Partition Optimization: Automatically determines optimal partition sizes

  2. Join Strategy Selection: Chooses between broadcast and distributed joins

  3. Resource Allocation: Scales workers based on available resources

  4. Memory Management: Handles out-of-core processing for large datasets

Manual Tuning

For specific workloads, you can fine-tune performance:

Monitoring and Metrics

Monitor Ray compute engine performance:

Integration Examples

With Spark Offline Store

With Cloud Storage

With Feature Transformations

For more information, see the Ray documentation and Ray Data guide.

Last updated

Was this helpful?