--- title: OpenMPI --- **OpenMPI** is a widely used implementation of the Message Passing Interface (MPI) standard for distributed computing. On TIR, an OpenMPI deployment provisions multiple workers with identical environments and passwordless SSH configured between them — you launch distributed training with a single `mpirun` command from the Master Worker. --- ## Environment ### Cluster Setup * All nodes share the same TIR-provided container image with **PyTorch**, **OpenMPI**, **CUDA**, and **NCCL** pre-installed. * Passwordless SSH between workers is configured automatically by TIR. ### Connect to the Master Worker ```bash ssh $hostname ``` Run all `mpirun` commands from the Master Worker. ### Shared Storage All nodes access a common storage path. Place training scripts, datasets, and checkpoints here: ```bash /mnt/shared ``` --- ## Training Guide ### Step 1: Install Dependencies The TIR-provided image includes PyTorch and OpenMPI. To install manually in a custom environment: ```bash pip install torch torchvision ``` --- ### Step 2: Write a Training Script Save the following as `/mnt/shared/train_mpi.py`. It uses PyTorch's distributed process group with NCCL, launched via MPI: ```python import os import torch import torch.nn as nn import torch.optim as optim import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP def main(): # MPI sets OMPI_COMM_WORLD_RANK and OMPI_COMM_WORLD_SIZE rank = int(os.environ.get("OMPI_COMM_WORLD_RANK", 0)) world_size = int(os.environ.get("OMPI_COMM_WORLD_SIZE", 1)) local_rank = rank % torch.cuda.device_count() os.environ["MASTER_ADDR"] = os.environ.get("MASTER_ADDR", "127.0.0.1") os.environ["MASTER_PORT"] = os.environ.get("MASTER_PORT", "29500") os.environ["RANK"] = str(rank) os.environ["WORLD_SIZE"] = str(world_size) dist.init_process_group(backend="nccl") torch.cuda.set_device(local_rank) # Model, optimizer, loss model = nn.Linear(10, 1).cuda() model = DDP(model, device_ids=[local_rank]) optimizer = optim.SGD(model.parameters(), lr=0.01) criterion = nn.MSELoss() for epoch in range(10): optimizer.zero_grad() inputs = torch.randn(32, 10).cuda() targets = torch.randn(32, 1).cuda() outputs = model(inputs) loss = criterion(outputs, targets) loss.backward() optimizer.step() if rank == 0: print(f"Epoch {epoch} | Loss: {loss.item():.4f}") dist.destroy_process_group() if __name__ == "__main__": main() ``` --- ### Step 3: Create a Hostfile List all worker nodes with their slot counts (typically equal to the number of GPUs per node). Run this on the Master Worker to discover hostnames: ```bash scontrol show hostnames "$SLURM_JOB_NODELIST" ``` Then create `/mnt/shared/hostfile`: ``` node1 slots=8 node2 slots=8 ``` --- ### Step 4: Launch Training Run `mpirun` from the Master Worker, referencing the hostfile: ```bash mpirun \ --hostfile /mnt/shared/hostfile \ -np 16 \ -x MASTER_ADDR=node1 \ -x MASTER_PORT=29500 \ -x NCCL_DEBUG=INFO \ python /mnt/shared/train_mpi.py ``` | Flag | Description | |------|-------------| | `--hostfile` | Path to the file listing cluster nodes and slot counts | | `-np` | Total number of processes (nodes × GPUs per node) | | `-x VAR=VALUE` | Pass environment variables to all workers | --- ## Data Management ### Import Datasets Place all datasets in shared storage before launching training: ```bash /mnt/shared/datasets ``` ### Save Checkpoints Save from rank 0 only to avoid write conflicts: ```python if dist.get_rank() == 0: torch.save(model.state_dict(), "/mnt/shared/checkpoints/model_epoch_10.pt") ``` --- ## Monitoring ### TensorBoard ```bash # On the worker node tensorboard --logdir /mnt/shared/logs --port 6006 # On your local machine ssh -L 6006:localhost:6006 $hostname ``` ### Weights & Biases ```python import wandb wandb.init(project="pytorch-mpi", mode="offline", dir="/mnt/shared/wandb") wandb.log({"loss": loss.item()}) ``` ### GPU & System Utilization ```bash nvidia-smi # GPU utilization htop # CPU and memory usage ``` --- ## Troubleshooting | Issue | Cause | Resolution | |-------|-------|------------| | **Permission denied (publickey)** | SSH not set up between nodes | TIR configures passwordless SSH automatically — verify worker hostnames are correct in the hostfile | | **MPI connection refused** | Port blocked by security group | Ensure the security group allows inter-node communication | | **NCCL timeout** | Network issue between nodes | Set `NCCL_DEBUG=INFO` for verbose output; verify all workers are reachable | | **Process group init failure** | Wrong `MASTER_ADDR` or `MASTER_PORT` | Pass the master node's hostname via `-x MASTER_ADDR` in `mpirun` | | **Disk full** | Checkpoints or logs filling `/mnt/shared` | Delete old files or increase storage quota | --- ## FAQ **Q: How do I find the hostnames of my workers?** From the Master Worker, run: ```bash cat /etc/hosts ``` Or use `scontrol show hostnames` if Slurm is available. **Q: How do I ensure identical environments across all nodes?** TIR provisions all workers from the same container image, so environments are identical by default. If you install additional packages, run the installation on every worker or include it in a Lifecycle Configuration Script at deployment creation. **Q: Where are logs stored?** Write logs to `/mnt/shared/logs` to keep them accessible from all nodes after the training job completes. ---