Distributed Multi-Node Jobs#
SkyPilot supports multi-node cluster provisioning and distributed execution on many nodes.
For example, here is a simple example to train a GPT-like model (inspired by Karpathy’s minGPT) across 2 nodes with Distributed Data Parallel (DDP) in PyTorch.
name: minGPT-ddp
resources:
accelerators: A100:8
num_nodes: 2
setup: |
git clone --depth 1 https://github.com/pytorch/examples || true
cd examples
git filter-branch --prune-empty --subdirectory-filter distributed/minGPT-ddp
# SkyPilot's default image on AWS/GCP has CUDA 11.6 (Azure 11.5).
uv pip install -r requirements.txt "numpy<2" "torch==1.12.1+cu113" --extra-index-url https://download.pytorch.org/whl/cu113
run: |
cd examples/mingpt
export LOGLEVEL=INFO
MASTER_ADDR=$(echo "$SKYPILOT_NODE_IPS" | head -n1)
echo "Starting distributed training, head node: $MASTER_ADDR"
torchrun \
--nnodes=$SKYPILOT_NUM_NODES \
--nproc_per_node=$SKYPILOT_NUM_GPUS_PER_NODE \
--master_addr=$MASTER_ADDR \
--node_rank=${SKYPILOT_NODE_RANK} \
--master_port=8008 \
main.py
In the above,
num_nodes: 2
specifies that this job is to be run on 2 nodes, with each node having 8 A100s;The highlighted lines in the
run
section show common environment variables that are useful for launching distributed training, explained below.
Note
If you encounter the error [Errno 24] Too many open files
, this indicates that your process has exceeded the maximum number of open file descriptors allowed by the system. This often occurs in high-load scenarios, e.g., launching significant amount number of nodes, such as 100.
To resolve this issue, run the following command, and try again:
ulimit -n 65535
You can find more distributed training examples (including using rdvz backend for pytorch) in our GitHub repository.
Unless otherwise specified, descriptions below apply to both cluster jobs and managed jobs.
Environment variables#
SkyPilot exposes these environment variables that can be accessed in a job’s run
commands:
SKYPILOT_NODE_RANK
: rank (an integer ID from 0 tonum_nodes-1
) of the node executing the job.SKYPILOT_NODE_IPS
: a string of IP addresses of the nodes reserved to execute the job, where each line contains one IP address.SKYPILOT_NUM_NODES
: number of nodes reserved for the job, which can be specified bynum_nodes: <n>
. Same value asecho "$SKYPILOT_NODE_IPS" | wc -l
.SKYPILOT_NUM_GPUS_PER_NODE
: number of GPUs reserved on each node to execute the job; the same as the count inaccelerators: <name>:<count>
(rounded up if a fraction).
See SkyPilot environment variables for more details.
Launching a multi-node job (new cluster)#
When using sky launch
to launch a multi-node job on a new cluster, the following happens in sequence:
Nodes are provisioned. (barrier)
Workdir/file_mounts are synced to all nodes. (barrier)
setup
commands are executed on all nodes. (barrier)run
commands are executed on all nodes.
When using sky jobs launch
to launch a managed multi-node job, the same behavior applies.
Launching a multi-node job (existing cluster)#
When using sky launch
to launch a multi-node job on an existing cluster, the cluster may have more nodes than the current job’s num_nodes
requirement.
The following happens in sequence:
SkyPilot checks the runtime on all nodes are up-to-date. (barrier)
Workdir/file_mounts are synced to all nodes. (barrier)
setup
commands are executed on all nodes of the cluster. (barrier)run
commands are executed on the subset of nodes scheduled to execute the job, which may be fewer than the cluster size.
Tip
To skip rerunning the setup commands, use either sky launch --no-setup ...
(performs steps 1, 2, 4 above) or sky exec
(performs step 2 (workdir only)
and step 4).
Executing a job on the head node only#
To execute a job on the head node only (a common scenario for tools like
mpirun
), use the SKYPILOT_NODE_RANK
environment variable as follows:
...
num_nodes: <n>
run: |
if [ "${SKYPILOT_NODE_RANK}" == "0" ]; then
# Launch the head-only command here.
fi
SSH into worker nodes#
In addition to the head node, the SSH configurations for the worker nodes of a multi-node cluster are also added to ~/.ssh/config
as <cluster_name>-worker<n>
.
This allows you directly to SSH into the worker nodes, if required.
# Assuming 3 nodes in a cluster named mycluster
# Head node.
$ ssh mycluster
# Worker nodes.
$ ssh mycluster-worker1
$ ssh mycluster-worker2
SSH access is only available for clusters (designed for interactive development), not for managed jobs (designed for production, scale-out runs).
Executing a distributed Ray program#
To execute a distributed Ray program on many nodes, you can download the training script and launch the job yaml:
$ wget https://raw.githubusercontent.com/skypilot-org/skypilot/master/examples/distributed_ray_train/train.py
$ # Use a cluster (ideal for interactive development)
$ sky launch ray_train.yaml
$ # Use a managed job (ideal for production, scale-out runs)
$ sky jobs launch ray_train.yaml
resources:
accelerators: L4:2
memory: 64+
num_nodes: 2
workdir: .
setup: |
conda activate ray
if [ $? -ne 0 ]; then
conda create -n ray python=3.10 -y
conda activate ray
fi
pip install "ray[train]"
pip install tqdm
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
run: |
sudo chmod 777 -R /var/tmp
HEAD_IP=`echo "$SKYPILOT_NODE_IPS" | head -n1`
if [ "$SKYPILOT_NODE_RANK" == "0" ]; then
ps aux | grep ray | grep 6379 &> /dev/null || ray start --head --disable-usage-stats --port 6379
sleep 5
python train.py --num-workers $SKYPILOT_NUM_NODES
else
sleep 5
ps aux | grep ray | grep 6379 &> /dev/null || ray start --address $HEAD_IP:6379 --disable-usage-stats
# Add sleep to after `ray start` to give ray enough time to daemonize
sleep 5
fi
Warning
When using Ray, avoid calling ray stop
as that will also cause the SkyPilot runtime to be stopped.