OpenMPI
OpenMPI is a widely used implementation of the Message Passing Interface (MPI) standard for distributed computing. On TIR, an OpenMPI deployment provisions multiple workers with identical environments and passwordless SSH configured between them — you launch distributed training with a single mpirun command from the Master Worker.
Environment
Cluster Setup
- All nodes share the same TIR-provided container image with PyTorch, OpenMPI, CUDA, and NCCL pre-installed.
- Passwordless SSH between workers is configured automatically by TIR.
Connect to the Master Worker
ssh $hostname
Run all mpirun commands from the Master Worker.
Shared Storage
All nodes access a common storage path. Place training scripts, datasets, and checkpoints here:
/mnt/shared
Training Guide
Step 1: Install Dependencies
The TIR-provided image includes PyTorch and OpenMPI. To install manually in a custom environment:
pip install torch torchvision
Step 2: Write a Training Script
Save the following as /mnt/shared/train_mpi.py. It uses PyTorch's distributed process group with NCCL, launched via MPI:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def main():
# MPI sets OMPI_COMM_WORLD_RANK and OMPI_COMM_WORLD_SIZE
rank = int(os.environ.get("OMPI_COMM_WORLD_RANK", 0))
world_size = int(os.environ.get("OMPI_COMM_WORLD_SIZE", 1))
local_rank = rank % torch.cuda.device_count()
os.environ["MASTER_ADDR"] = os.environ.get("MASTER_ADDR", "127.0.0.1")
os.environ["MASTER_PORT"] = os.environ.get("MASTER_PORT", "29500")
os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)
dist.init_process_group(backend="nccl")
torch.cuda.set_device(local_rank)
# Model, optimizer, loss
model = nn.Linear(10, 1).cuda()
model = DDP(model, device_ids=[local_rank])
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()
for epoch in range(10):
optimizer.zero_grad()
inputs = torch.randn(32, 10).cuda()
targets = torch.randn(32, 1).cuda()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
if rank == 0:
print(f"Epoch {epoch} | Loss: {loss.item():.4f}")
dist.destroy_process_group()
if __name__ == "__main__":
main()
Step 3: Create a Hostfile
List all worker nodes with their slot counts (typically equal to the number of GPUs per node). Run this on the Master Worker to discover hostnames:
scontrol show hostnames "$SLURM_JOB_NODELIST"
Then create /mnt/shared/hostfile:
node1 slots=8
node2 slots=8
Step 4: Launch Training
Run mpirun from the Master Worker, referencing the hostfile:
mpirun \
--hostfile /mnt/shared/hostfile \
-np 16 \
-x MASTER_ADDR=node1 \
-x MASTER_PORT=29500 \
-x NCCL_DEBUG=INFO \
python /mnt/shared/train_mpi.py
| Flag | Description |
|---|---|
--hostfile | Path to the file listing cluster nodes and slot counts |
-np | Total number of processes (nodes × GPUs per node) |
-x VAR=VALUE | Pass environment variables to all workers |
Data Management
Import Datasets
Place all datasets in shared storage before launching training:
/mnt/shared/datasets
Save Checkpoints
Save from rank 0 only to avoid write conflicts:
if dist.get_rank() == 0:
torch.save(model.state_dict(), "/mnt/shared/checkpoints/model_epoch_10.pt")
Monitoring
TensorBoard
# On the worker node
tensorboard --logdir /mnt/shared/logs --port 6006
# On your local machine
ssh -L 6006:localhost:6006 $hostname
Weights & Biases
import wandb
wandb.init(project="pytorch-mpi", mode="offline", dir="/mnt/shared/wandb")
wandb.log({"loss": loss.item()})
GPU & System Utilization
nvidia-smi # GPU utilization
htop # CPU and memory usage
Troubleshooting
| Issue | Cause | Resolution |
|---|---|---|
| Permission denied (publickey) | SSH not set up between nodes | TIR configures passwordless SSH automatically — verify worker hostnames are correct in the hostfile |
| MPI connection refused | Port blocked by security group | Ensure the security group allows inter-node communication |
| NCCL timeout | Network issue between nodes | Set NCCL_DEBUG=INFO for verbose output; verify all workers are reachable |
| Process group init failure | Wrong MASTER_ADDR or MASTER_PORT | Pass the master node's hostname via -x MASTER_ADDR in mpirun |
| Disk full | Checkpoints or logs filling /mnt/shared | Delete old files or increase storage quota |
FAQ
Q: How do I find the hostnames of my workers?
From the Master Worker, run:
cat /etc/hosts
Or use scontrol show hostnames if Slurm is available.
Q: How do I ensure identical environments across all nodes?
TIR provisions all workers from the same container image, so environments are identical by default. If you install additional packages, run the installation on every worker or include it in a Lifecycle Configuration Script at deployment creation.
Q: Where are logs stored?
Write logs to /mnt/shared/logs to keep them accessible from all nodes after the training job completes.