Skip to main content

Ray Cluster

Ray is a flexible, high-performance distributed computing framework. This guide helps you set up and run distributed training with PyTorch on a Ray cluster.


Topology

Cluster Setup

  • Environment: Ensure all nodes have identical software environments, including PyTorch, Ray, CUDA, and NCCL versions.
  • Communication: Passwordless SSH is required for seamless communication between nodes.

Connect

ssh $hostname

Shared Storage

  • Use NFS or Dataset storage systems accessible by all nodes in the cluster to store datasets, logs, and checkpoints.

Training Guide

Getting Started

  1. Install Ray and PyTorch on all nodes:
pip install ray[default] torch torchvision
  1. Start a Ray cluster:
  • Head Node:
ray start --head --port=6379
  • Worker Nodes:
ray start --address=\'head_node_ip:6379\'
  1. Write a distributed training script using Ray. For example:
import ray
import torch
import torch.nn as nn
import torch.optim as optim
from ray.util.sgd.torch import TorchTrainer
from ray.util.sgd.torch import TrainingOperator

#Define the training logic
class MyTrainingOperator(TrainingOperator):
def setup(self, config):
model = nn.Linear(10, 1).to("cuda")
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()
self.model, self.optimizer = self.register(models=model, optimizers=optimizer)
self.criterion = criterion

def train_batch(self, batch, batch_idx):
inputs, targets = batch
inputs, targets = inputs.to("cuda"), targets.to("cuda")
outputs = self.model(inputs)
loss = self.criterion(outputs, targets)
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
return \{\"loss\": loss.item\(\)\}

#Initialize Ray
ray.init()

#Configure the trainer
trainer = TorchTrainer(
training_operator_cls=MyTrainingOperator,
num_workers=4,
use_gpu=True,
config=\{\"batch_size\": 32\},
)

#Run training
trainer.train()
trainer.shutdown()

Import/Export Data

  • Import: Place datasets in shared storage (/mnt/shared) or use Ray’s object store.
  • Export: Save model checkpoints and logs to the shared file system for easy access across nodes.

Training Metrics

You can integrate monitoring tools like TensorBoard or Weights & Biases for real-time insights into your training process.

TensorBoard

  • Write logs to the shared file system (/mnt/shared) for combined metrics from all workers.
  • Monitor training progress using:
ssh - L 6006:localhost:6006 $hostname
tensorboard --logdir /mnt/shared/logs

Weights & Biases

  • For offline mode, store metrics in /mnt/shared to access them from any node. This ensures seamless analysis and reproducibility.

Administration Guide

  1. User Management: Ensure all users have passwordless SSH set up and identical environments on all nodes.
  2. Cluster Health Monitoring: Use ray status to monitor cluster health and resource usage.
  3. Storage Management: Regularly check disk usage on /mnt/shared to ensure enough space for logs and checkpoints.

Troubleshooting Guide

Common Issues and Solutions

Ray Initialization Errors

  • Error: "Ray cannot connect to the head node."
  • Solution: Verify that the head_node_ip is correct and that the worker nodes can connect to it.

NCCL Initialization Errors

  • Error: "NCCL connection timed out."
  • Solution: Verify network connectivity and ensure the NCCL backend is set up correctly.

Disk Space Errors

  • Error: "No space left on device."
  • Solution: Clean up old checkpoints and logs from /mnt/shared.

FAQ

Q: How do I ensure the same environment across nodes?

A: Use Docker containers or environment management tools like Conda to replicate the setup across all nodes.

Q: How do I configure Ray for distributed training?

A: Install Ray on all nodes, start the head node with ray start --head, and connect worker nodes using ray start --address.

Q: How do I access logs stored on shared storage?

A: Mount the /mnt/shared file system on any node or access it directly via SSH.


With this guide, you’re ready to leverage Ray for scalable and efficient PyTorch distributed training on your cluster!