Skip to main content

OpenMPI

OpenMPI is a widely used implementation of the Message Passing Interface (MPI) standard for distributed computing. On TIR, an OpenMPI deployment provisions multiple workers with identical environments and passwordless SSH configured between them — you launch distributed training with a single mpirun command from the Master Worker.


Environment

Cluster Setup

  • All nodes share the same TIR-provided container image with PyTorch, OpenMPI, CUDA, and NCCL pre-installed.
  • Passwordless SSH between workers is configured automatically by TIR.

Connect to the Master Worker

ssh $hostname

Run all mpirun commands from the Master Worker.

Shared Storage

All nodes access a common storage path. Place training scripts, datasets, and checkpoints here:

/mnt/shared

Training Guide

Step 1: Install Dependencies

The TIR-provided image includes PyTorch and OpenMPI. To install manually in a custom environment:

pip install torch torchvision

Step 2: Write a Training Script

Save the following as /mnt/shared/train_mpi.py. It uses PyTorch's distributed process group with NCCL, launched via MPI:

import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP


def main():
# MPI sets OMPI_COMM_WORLD_RANK and OMPI_COMM_WORLD_SIZE
rank = int(os.environ.get("OMPI_COMM_WORLD_RANK", 0))
world_size = int(os.environ.get("OMPI_COMM_WORLD_SIZE", 1))
local_rank = rank % torch.cuda.device_count()

os.environ["MASTER_ADDR"] = os.environ.get("MASTER_ADDR", "127.0.0.1")
os.environ["MASTER_PORT"] = os.environ.get("MASTER_PORT", "29500")
os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)

dist.init_process_group(backend="nccl")
torch.cuda.set_device(local_rank)

# Model, optimizer, loss
model = nn.Linear(10, 1).cuda()
model = DDP(model, device_ids=[local_rank])

optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()

for epoch in range(10):
optimizer.zero_grad()
inputs = torch.randn(32, 10).cuda()
targets = torch.randn(32, 1).cuda()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()

if rank == 0:
print(f"Epoch {epoch} | Loss: {loss.item():.4f}")

dist.destroy_process_group()


if __name__ == "__main__":
main()

Step 3: Create a Hostfile

List all worker nodes with their slot counts (typically equal to the number of GPUs per node). Run this on the Master Worker to discover hostnames:

scontrol show hostnames "$SLURM_JOB_NODELIST"

Then create /mnt/shared/hostfile:

node1 slots=8
node2 slots=8

Step 4: Launch Training

Run mpirun from the Master Worker, referencing the hostfile:

mpirun \
--hostfile /mnt/shared/hostfile \
-np 16 \
-x MASTER_ADDR=node1 \
-x MASTER_PORT=29500 \
-x NCCL_DEBUG=INFO \
python /mnt/shared/train_mpi.py
FlagDescription
--hostfilePath to the file listing cluster nodes and slot counts
-npTotal number of processes (nodes × GPUs per node)
-x VAR=VALUEPass environment variables to all workers

Data Management

Import Datasets

Place all datasets in shared storage before launching training:

/mnt/shared/datasets

Save Checkpoints

Save from rank 0 only to avoid write conflicts:

if dist.get_rank() == 0:
torch.save(model.state_dict(), "/mnt/shared/checkpoints/model_epoch_10.pt")

Monitoring

TensorBoard

# On the worker node
tensorboard --logdir /mnt/shared/logs --port 6006

# On your local machine
ssh -L 6006:localhost:6006 $hostname

Weights & Biases

import wandb
wandb.init(project="pytorch-mpi", mode="offline", dir="/mnt/shared/wandb")
wandb.log({"loss": loss.item()})

GPU & System Utilization

nvidia-smi      # GPU utilization
htop # CPU and memory usage

Troubleshooting

IssueCauseResolution
Permission denied (publickey)SSH not set up between nodesTIR configures passwordless SSH automatically — verify worker hostnames are correct in the hostfile
MPI connection refusedPort blocked by security groupEnsure the security group allows inter-node communication
NCCL timeoutNetwork issue between nodesSet NCCL_DEBUG=INFO for verbose output; verify all workers are reachable
Process group init failureWrong MASTER_ADDR or MASTER_PORTPass the master node's hostname via -x MASTER_ADDR in mpirun
Disk fullCheckpoints or logs filling /mnt/sharedDelete old files or increase storage quota

FAQ

Q: How do I find the hostnames of my workers?

From the Master Worker, run:

cat /etc/hosts

Or use scontrol show hostnames if Slurm is available.

Q: How do I ensure identical environments across all nodes?

TIR provisions all workers from the same container image, so environments are identical by default. If you install additional packages, run the installation on every worker or include it in a Lifecycle Configuration Script at deployment creation.

Q: Where are logs stored?

Write logs to /mnt/shared/logs to keep them accessible from all nodes after the training job completes.