Workload Management & Orchestration Series: Ray by Anyscale
If you've worked in high-performance computing (HPC), machine learning operations (MLOps), data science, data management or any scenario that involves vending shared compute resources to a user community, you've probably used various workload management and/or orchestration tools. This series explores some of these at a high level to help you understand each tool's characteristics beyond the marketing hype.
Ray: A scalable system for AI Python workloads
Ray by Anyscale is a flexible, high-performance distributed computing framework designed to scale Python and machine learning (ML) applications from a single laptop to a multi-node cluster. It was originally developed at UC Berkeley's RISELab and has gained widespread adoption across industry and research, particularly for building scalable AI workloads, reinforcement learning systems, hyperparameter tuning, and distributed training pipelines.
As a distributed computing framework and runtime that lets you write and scale Python code seamlessly across multiple nodes, it is not directly comparable with workload managers such as Slurm or Run:ai. Those solutions run "outside" the job, whereas Ray provides a programmatic, Python-native API for expressing parallelism (via tasks, actors, etc.) and provides tools for building distributed AI/ML pipelines (e.g., Ray Train, Tune, RLlib, Serve); it also includes a scheduler, object store, autoscaler, and runtime services.
Architecture
Ray operates using a cluster-based architecture with a centralized control plane and a set of worker nodes. Each Ray cluster has a single head node that coordinates scheduling, object management and task distribution, while worker nodes execute tasks and manage distributed memory. Clusters can be launched manually or managed through Kubernetes, on-prem clusters or major cloud providers using the Ray Cluster Launcher.
In Ray, a distributed computing API provides primitives like remote functions (@ray.remote
) and actors, allowing users to parallelize Python code with minimal changes. For structured AI workflows, Ray includes built-in libraries such as Ray Tune (hyperparameter tuning), Ray Train (distributed model training), Ray Serve (scalable model serving), and RLlib (reinforcement learning). These modules integrate natively with frameworks like PyTorch, TensorFlow and XGBoost.
Ray is not a traditional workload manager
Ray does not use job scripts in the traditional HPC sense (as in Slurm). Instead, tasks and actors are defined programmatically in Python. A user launches a Ray cluster ("ray start --head") and runs a Python driver script, which schedules tasks across the available nodes. Because Ray operates within the Python runtime, users benefit from interactive development tools such as Jupyter or IPython while still being able to scale out to multiple GPUs and nodes.
Ray's architecture uses multiple internal services to maintain system state and data consistency. Communication between nodes is done over gRPC or TCP, and the control plane ensures lineage-based fault recovery without centralized bottlenecks. Ray does not require SSH or shared filesystems between nodes.
Using Ray with workload and workflow managers
Ray integrates natively with container runtimes and orchestration platforms, and it can even run within containers. Users can compose Ray jobs with containerized environments to ensure consistency across platforms or for isolation purposes. When used with Docker or Kubernetes, each Ray pod or container runs the Ray runtime, and resource limits are enforced through relevant mechanisms of the container platform. When using Kubernetes, Ray operators can manage dynamic scaling of clusters and deploy Ray services alongside other workloads with fine-grained resource constraints.
Although Ray does not enforce a specific workflow engine, it integrates well with higher-level orchestration tools like Flyte, Prefect or Airflow, where Ray serves as the execution backend. It's common to use Ray in conjunction with tools like DVC or MLflow to manage ML lifecycle metadata.
Ray also supports pluggable backends for autoscaling, logging and monitoring via Prometheus, Grafana or Datadog. Its fault-tolerant design means failed tasks can be retried transparently, and long-running actors can be checkpointed for durability.
Ray in action
Here is a simple Ray program:
import ray
ray.init(address="auto") # connects to the running Ray cluster
@ray.remote
def f(x):
return x * x
futures = [f.remote(i) for i in range(4)]
results = ray.get(futures)
print(results) # [0, 1, 4, 9]
Ray handles resource management internally through a decentralized and fine-grained resource-aware scheduling system. Each remote function or actor can be explicitly annotated with resource constraints such as num_cpus
, num_gpus
, or custom-defined resources like memory
, TPUs
, or user-defined tags (e.g., {"accelerator": 1}
). These annotations guide the Ray scheduler in placing tasks on nodes that satisfy the specified requirements, allowing for deterministic control over task placement and enabling heterogeneous workloads across a cluster.
Importantly, Ray supports fractional resource requirements, allowing tasks or actors to request a fraction of a resource. This is useful for fine-grained resource allocation and maximizing cluster utilization.
@ray.remote(num_cpus=2, num_gpus=0.5, resources={"accelerator": 1})
def my_function():
# Function implementation
pass
Ray's resource tracking is dynamic and hierarchical: It monitors per-node resource availability and aggregates this information at the cluster level via the Ray GCS (Global Control Store). This allows it to make rapid, decentralized scheduling decisions without needing to serialize all scheduling through a central bottleneck. The system supports both task-based (stateless) and actor-based (stateful) concurrency models, making it well-suited for workloads that combine parallel data processing with long-running services, such as parameter servers or reinforcement learning environments.
Ray provides various scheduling strategies to control how tasks and actors are distributed across the cluster:
- Default strategy: Ray schedules tasks based on resource availability and load balancing.
- SPREAD strategy: Distributes tasks evenly across nodes to maximize resource utilization.
- Placement groups: Allows grouping of tasks and actors with specific placement constraints.
@ray.remote(scheduling_strategy="SPREAD")
def spread_task():
# Task implementation
pass
Unlike Slurm, which launches jobs as isolated OS processes and often relies on a shared filesystem or explicit communication (e.g., MPI), Ray includes a global distributed object store that supports in-memory data sharing across processes and nodes. This object store uses a shared memory segment within each node to store immutable objects, and transmits them efficiently between nodes via zero-copy serialization and asynchronous pipelining. As a result, data transfer between Ray tasks and actors is low-latency and avoids repeated disk or network I/O, significantly improving performance for data-intensive applications such as distributed training, model inference pipelines and simulation.
This tight integration of compute and memory management allows Ray to support complex, multi-stage workflows with interdependent tasks while maintaining high utilization and low coordination overhead.
More on the global distributed object store
The global distributed object store is a foundational component of the Ray architecture, enabling efficient, high-throughput data sharing across tasks and actors in a distributed setting. It is tightly integrated with Ray's task execution and scheduling systems, minimizing serialization, copying and communication overhead. Below is a deeper dive into its design and implementation. Ray's design is based on:
- Zero-copy local reads via shared memory.
- Efficient network transfers between nodes.
- Immutable object semantics for consistency and fault recovery.
- Integration with the scheduler to manage object lineage and availability.
- Object pinning: Objects can be pinned in memory to prevent eviction.
- Task locality optimization: Scheduler uses object locations to place tasks on nodes where required data is already available.
Ray uses a fork of Apache Arrow's Plasma to provide a shared-memory object store for immutable objects. Objects are serialized using pickle protocol 5 that supports efficient in-memory layout and zero-copy reads for any object that supports the out-of-band buffer protocol. On each node, a shared memory segment is created by the object store process. All Ray worker processes on that node map this memory into their address space. When a task puts data into the object store (e.g., ray.put()
or returning from a remote function), it's serialized once into this memory pool.
Reading data from the store doesn't involve deserialization or memory copying, as long as it's accessed from the same node.
The Raylet, Ray's per-node agent, coordinates with the Global Control Store (GCS) to track object ownership and availability. When a task on Node A needs an object created on Node B, the local Raylet requests metadata from GCS, and then initiates a direct object transfer over TCP. While Ray can operate over InfiniBand networks using IPoIB, it does not currently support native RDMA for object transfers. Users seeking to maximize performance on RDMA-capable networks may need to consider alternative solutions or contribute to ongoing efforts to integrate RDMA support into Ray. The object store uses object spilling to disk (or cloud storage) if memory usage exceeds a threshold, and can prefetch or reconstruct objects on demand.
Paraphrasing the discussion at that link, RDMA benefits shine when the network topology is more complex than what Ray currently supports, including multiple RDMA-capable links beneficially located adjacent to GPU cores (a feature of NVIDIA's most advanced architectures).
Ray uses efficient, type-aware serialization via:
- Apache Arrow for binary-compatible formats (e.g., NumPy, Pandas).
- Pickle (with enhancements) for arbitrary Python objects.
- A fallback to cloudpickle for non-serializable edge cases.
For large objects, serialization and deserialization are done out-of-band in background threads to avoid blocking the task executor. Ray tracks lineage metadata for all objects. If a task producing an object fails or is evicted, Ray can re-execute its lineage to reconstruct the lost object. This model supports fault tolerance without requiring full data replication.
See the Ray Architecture Whitepapers for more in-depth information about Ray's architecture.
Summary
In summary, Ray is optimized for modern AI and Python-based workflows, offering a programming model that emphasizes flexibility, autoscaling and composability. It trades the declarative job script model of traditional HPC for a fully programmable interface while still supporting high-throughput, parallel workloads, GPU acceleration and distributed memory management across clusters.