Source: examples/tensorflow_distributed
Distributed TensorFlow#
This example shows how to launch a distributed TensorFlow training job with SkyPilot.
Included files#
tf_distributed.yaml
# Distributed TensorFlow training on GCP with 2 V100 GPUs
#
# Uses MultiWorkerMirroredStrategy to distribute training across multiple nodes with SkyPilot.
#
# Usage:
# sky launch -c myclus tf_distributed.yaml
# sky down myclus
resources:
cloud: gcp
accelerators: V100:1 # Provision 1 V100 GPU per node
# Provision 2 nodes, giving us a total of 2 GPUs in the cluster
num_nodes: 2
# Copy files (train.py) from the current working directory to all nodes on the cluster
workdir: .
# Install dependencies on all nodes during the setup phase
setup:
pip install tensorflow==2.11.0
# The following shell commands are run on all nodes at execution
run: |
# Port to use for tensorflow. Can be any unused port.
PORT=2222
# ======== Construct TF_CONFIG ========
# Since we are using MultiWorkerMirroredStrategy, we need to construct a TF_CONFIG
# environment variable that contains the list of all worker IPs and the current node's rank.
# SkyPilot provides the SKYPILOT_NODE_IPS and SKYPILOT_NODE_RANK environment variables to get this information.
#
# Examples of envvars:
# SKYPILOT_NODE_IPS="192.168.0.1 192.168.0.2"
# SKYPILOT_NODE_RANK="1"
python -u <<- EOF
import json
import os
port = 2222
node_ips_str = os.environ.get('SKYPILOT_NODE_IPS')
node_ips = node_ips_str.strip().split('\n')
node_rank = os.environ.get('SKYPILOT_NODE_RANK')
worker_list = [f'{ip}:{port}' for ip in node_ips]
tf_config = {
'cluster': {
'worker': worker_list,
},
'task': {
'type': 'worker',
'index': node_rank
}
}
with open(f'/tmp/{os.environ.get("SKYPILOT_TASK_ID")}', 'w') as f:
json.dump(tf_config, f)
EOF
# Read and set TF_CONFIG from file
export TF_CONFIG=$(cat /tmp/$SKYPILOT_TASK_ID)
echo $TF_CONFIG
# ======== Run the training script ========
python train.py
# Example script that uses Keras and tf.distribute.MultiWorkerMirroredStrategy to train a model on multiple workers.
#
# Usage (two separate processes):
# TF_CONFIG='{"cluster":{"worker":["localhost:12345","localhost:23456"]},"task":{"type":"worker","index":0}}' python train.py
# TF_CONFIG='{"cluster":{"worker":["localhost:12345","localhost:23456"]},"task":{"type":"worker","index":1}}' python train.py
import json
import os
import numpy as np
import tensorflow as tf
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)
with strategy.scope():
multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=10, steps_per_epoch=70)