Source: examples/pools_batch_inference
Batch Text Classification with vLLM and SkyPilot Pools#
This example demonstrates how to use SkyPilot’s Pools for efficient offline batch inference. We’ll classify sentiment from movie reviews using gpt-oss-20b running on vLLM.
What is a Pool?#
A pool is a collection of pre-configured workers that can process multiple jobs without cold starts. Key benefits:
Fast job execution: Workers are pre-provisioned with models loaded
No setup overhead: Each job starts immediately without reinstalling dependencies or downloading models
Simple parallelism: Submit multiple jobs with a single command using
--num-jobs, limit concurrency to the number of workers in the pool (queueing is handled by SkyPilot)
Using Pools for Batch Inference#
This example:
Creates a pool of workers with environments pre-configured for batch inference
Submits 10 classification jobs that process the IMDB movie reviews dataset in parallel
Each job uses vLLM’s Python SDK to classify reviews in batches
Results are saved to a cloud storage bucket
Files in this example:
pool.yaml: Pool configuration with vLLM setupclassify.yaml: Job definition that runs on pool workersclassify.py: Python script that performs the classification using vLLM’s Python SDK
Step 1: Create the worker pool#
Create a pool named text-classify with 2 workers:
# Set your unique bucket name for the results (must be globally unique)
export BUCKET_NAME=batch-inference-results-${USER}
# Create the pool
sky jobs pool apply --env BUCKET_NAME -p text-classify pool.yaml
This will:
Launch 2 workers with H100 GPUs
Install vLLM and dependencies on each worker
Mount your cloud storage bucket at
/resultsDownload and cache gpt-oss-20b
Check pool status with:
sky jobs pool status text-classify
Example output
Pools NAME VERSION UPTIME STATUS WORKERS text-classify 1 5m 39s READY 2/2 Pool Workers POOL_NAME ID VERSION LAUNCHED INFRA RESOURCES STATUS USED_BY text-classify 1 1 6 mins ago Kubernetes 1x(gpus=H200:1, cpus=4, mem=16, ...) READY - text-classify 2 1 6 mins ago Kubernetes 1x(gpus=H200:1, cpus=4, mem=16, ...) READY -
Step 2: Submit text classification jobs#
Submit 10 parallel jobs to process the IMDB movie reviews dataset:
sky jobs launch -p text-classify --num-jobs 10 classify.yaml
This command:
Submits 10 jobs to the pool
Each job gets a unique
$SKYPILOT_JOB_RANK(0-9)Each job processes a partition of the dataset based on the job rank
Results are saved to your configured cloud storage bucket
Note: You can adjust the number of jobs with --num-jobs N. More jobs = more parallelism (up to the number of workers).
Step 3: Monitor progress#
View all jobs:
sky jobs queue
Example output
Fetching managed job statuses... Managed jobs In progress tasks: 4 PENDING, 2 RUNNING ID TASK NAME REQUESTED SUBMITTED TOT. DURATION JOB DURATION #RECOVERIES STATUS POOL 10 - batch-classify 1x[H200:1] 2 mins ago 2m 35s - 0 PENDING text-classify 9 - batch-classify 1x[H200:1] 2 mins ago 2m 35s - 0 PENDING text-classify 8 - batch-classify 1x[H200:1] 2 mins ago 2m 37s - 0 PENDING text-classify 7 - batch-classify 1x[H200:1] 2 mins ago 2m 10s 49s 0 SUCCEEDED text-classify 6 - batch-classify 1x[H200:1] 2 mins ago 2m 37s 17s 0 RUNNING text-classify (worker=1) 5 - batch-classify 1x[H200:1] 2 mins ago 2m 42s 14s 0 RUNNING text-classify (worker=2) 4 - batch-classify 1x[H200:1] 2 mins ago 2m 17s 49s 0 SUCCEEDED text-classify 3 - batch-classify 1x[H200:1] 2 mins ago 2m 45s - 0 PENDING text-classify 2 - batch-classify 1x[H200:1] 2 mins ago 1m 19s 1m 18s 0 SUCCEEDED text-classify 1 - batch-classify 1x[H200:1] 2 mins ago 1m 20s 1m 19s 0 SUCCEEDED text-classify
Stream logs from a specific job:
sky jobs logs <job-id>
Or use the SkyPilot dashboard to view the progress of the jobs:
sky dashboard
Step 4: View Results#
Once jobs complete, results are in the cloud storage bucket. Each job creates two files:
results_rank_N.jsonl: Detailed predictions for each reviewsummary_rank_N.json: Accuracy and performance metrics
View results using cloud CLI (replace BUCKET_NAME with your bucket name):
BUCKET_NAME=batch-inference-results-${USER} # Replace with your bucket name
# AWS
aws s3 ls s3://${BUCKET_NAME}/
aws s3 cp s3://${BUCKET_NAME}/summary_rank_0.json -
# GCP
gsutil ls gs://${BUCKET_NAME}/
gsutil cat gs://${BUCKET_NAME}/summary_rank_0.json
Step 5: Clean Up#
When done, terminate the pool to stop incurring costs:
sky jobs pool down text-classify
This will stop all workers in the pool and clean up all cloud resources. Results are preserved in the storage bucket.
Appendix: Running Without Cloud Buckets#
Don’t have access to cloud storage buckets? You can use local worker storage instead.
In
pool.yaml, comment out the entirefile_mountssectionIn
classify.yaml, change the--output-dirfrom/resultsto~/sky_workdir
Learn More#
Included files#
#!/usr/bin/env python3
"""Batch text classification script using vLLM for sentiment analysis.
This script processes a partition of the IMDB dataset based on job rank.
"""
import argparse
import json
from pathlib import Path
import sys
import time
from datasets import load_dataset
from tqdm import tqdm
from vllm import LLM
from vllm import SamplingParams
CLASSIFICATION_PROMPT = (
'You are a sentiment classifier. Classify the following movie review '
'as either "positive", "negative", or "neutral". '
'Output ONLY the label, nothing else.\n\n'
'Review: {text}\n'
'Classification:')
def calculate_partition(total_items: int, job_rank: int,
num_jobs: int) -> tuple:
"""Calculate the start and end indices for this job's partition."""
items_per_job = total_items // num_jobs
remainder = total_items % num_jobs
# Distribute remainder across first few jobs
start_idx = job_rank * items_per_job + min(job_rank, remainder)
end_idx = start_idx + items_per_job + (1 if job_rank < remainder else 0)
return start_idx, end_idx
def parse_classification(text: str) -> str:
"""Parse the classification result from the model output.
Args:
text: Raw text output from the model
Returns:
Classification label (positive/negative/neutral)
"""
text = text.strip().lower()
# Normalize the result
if 'positive' in text:
return 'positive'
elif 'negative' in text:
return 'negative'
elif 'neutral' in text:
return 'neutral'
else:
# Default to the raw result if it doesn't match expected labels
return text[:20] # Truncate to avoid long error messages
def main():
parser = argparse.ArgumentParser(
description='Batch text classification with vLLM')
parser.add_argument('--job-rank',
type=int,
required=True,
help='Rank of this job (0-indexed)')
parser.add_argument('--num-jobs',
type=int,
required=True,
help='Total number of jobs')
parser.add_argument('--output-dir',
type=str,
required=True,
help='Directory to save results')
parser.add_argument('--model-path',
type=str,
required=True,
help='Path to the vLLM model')
parser.add_argument(
'--dataset-size',
type=int,
default=5000,
help='Total number of reviews to process across all jobs')
parser.add_argument('--batch-size',
type=int,
default=32,
help='Number of texts to process in each batch')
args = parser.parse_args()
print('=' * 80)
print(
f'Batch Text Classification - Job Rank {args.job_rank}/{args.num_jobs}')
print('=' * 80)
print(f'Model Path: {args.model_path}')
print(f'Output Directory: {args.output_dir}')
print(f'Batch Size: {args.batch_size}')
print()
# Load IMDB dataset
print('Loading IMDB dataset...')
try:
dataset = load_dataset('imdb', split='test')
print(f'✓ Loaded {len(dataset)} reviews from IMDB dataset')
except Exception as e: # pylint: disable=broad-except
print(f'✗ Failed to load dataset: {e}')
sys.exit(1)
# Calculate partition for this job
total_items = len(dataset)
start_idx, end_idx = calculate_partition(total_items, args.job_rank,
args.num_jobs)
partition_size = end_idx - start_idx
print(f'Total Dataset Size: {total_items}')
print(f'\nProcessing partition: {start_idx} to {end_idx} '
f'({partition_size} reviews)')
print()
# Initialize vLLM
print('Initializing vLLM...')
try:
sampling_params = SamplingParams(
temperature=0.0, # Deterministic for classification
top_p=1.0,
max_tokens=10,
)
llm = LLM(
model=args.model_path,
dtype='auto',
max_model_len=2048,
)
print('✓ vLLM initialized successfully')
except Exception as e: # pylint: disable=broad-except
print(f'✗ Failed to initialize vLLM: {e}')
sys.exit(1)
print()
# Prepare all reviews and prompts for this partition
print('Preparing prompts...')
reviews = []
true_labels = []
prompts = []
for idx in range(start_idx, end_idx):
review = dataset[idx]
text = review['text']
true_label = 'positive' if review['label'] == 1 else 'negative'
reviews.append({
'index': idx,
'text': text,
'true_label': true_label,
})
true_labels.append(true_label)
# Truncate review text to avoid very long prompts
truncated_text = text[:1000] if len(text) > 1000 else text
prompt = CLASSIFICATION_PROMPT.format(text=truncated_text)
prompts.append(prompt)
print(f'✓ Prepared {len(prompts)} prompts')
print()
# Process in batches for better efficiency
print('Running batch inference...')
start_time = time.time()
all_results = []
# Process all prompts at once (vLLM handles batching internally)
try:
outputs = llm.generate(prompts, sampling_params)
# Parse results
for i, output in enumerate(tqdm(outputs, desc=f'Job {args.job_rank}')):
generated_text = output.outputs[0].text
predicted_label = parse_classification(generated_text)
review_text = reviews[i]['text']
truncated_review = (review_text[:200] + '...'
if len(review_text) > 200 else review_text)
result = {
'index': reviews[i]['index'],
'text': truncated_review,
'true_label': reviews[i]['true_label'],
'predicted_label': predicted_label,
'correct': predicted_label == reviews[i]['true_label'],
}
all_results.append(result)
except Exception as e: # pylint: disable=broad-except
print(f'✗ Error during inference: {e}')
sys.exit(1)
elapsed_time = time.time() - start_time
# Calculate statistics
correct_predictions = sum(1 for r in all_results if r['correct'])
accuracy = (correct_predictions / len(all_results) *
100) if all_results else 0
throughput = len(all_results) / elapsed_time if elapsed_time > 0 else 0
print()
print('=' * 80)
print(f'Job {args.job_rank} Complete!')
print('=' * 80)
print(f'Processed: {len(all_results)} reviews')
print(
f'Accuracy: {accuracy:.1f}% ({correct_predictions}/{len(all_results)})')
print(f'Time: {elapsed_time:.1f}s')
print(f'Throughput: {throughput:.2f} reviews/sec')
print()
# Save results
output_path = Path(args.output_dir) / f'results_rank_{args.job_rank}.jsonl'
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
for result in all_results:
f.write(json.dumps(result) + '\n')
# Also save summary statistics
summary_path = Path(args.output_dir) / f'summary_rank_{args.job_rank}.json'
summary = {
'job_rank': args.job_rank,
'num_jobs': args.num_jobs,
'start_idx': start_idx,
'end_idx': end_idx,
'total_processed': len(all_results),
'correct_predictions': correct_predictions,
'accuracy': accuracy,
'elapsed_time_seconds': elapsed_time,
'throughput_reviews_per_sec': throughput,
}
with open(summary_path, 'w', encoding='utf-8') as f:
json.dump(summary, f, indent=2)
print(f'✓ Results saved to {output_path}')
print(f'✓ Summary saved to {summary_path}')
print()
if __name__ == '__main__':
main()
classify.yaml
# Job configuration for batch text classification
# This runs on the pool workers to classify a partition of the dataset
#
# Usage:
# sky jobs launch -p text-classify --num-jobs 10 classify.yaml
resources:
accelerators: H100:1
name: batch-classify
run: |
echo "Starting classification job (rank ${SKYPILOT_JOB_RANK} of ${SKYPILOT_NUM_JOBS})"
source .venv/bin/activate
# Run the classification script
# Note: If you don't have cloud storage, change --output-dir to ~/sky_workdir
python classify.py \
--job-rank ${SKYPILOT_JOB_RANK} \
--num-jobs ${SKYPILOT_NUM_JOBS} \
--output-dir /results \
--model-path /tmp/model
echo "Classification complete! Results saved to S3 bucket mounted at /results/results_rank_${SKYPILOT_JOB_RANK}.jsonl"
pool.yaml
# Pool configuration for batch text classification
# This creates a pool of workers with openai/gpt-oss-20b pre-downloaded
#
# Usage:
# sky jobs pool apply -p text-classify pool.yaml
workdir: .
envs:
MODEL_NAME: openai/gpt-oss-20b
BUCKET_NAME: # Set your unique bucket name (must be globally unique)
# Cloud storage bucket for results (optional)
# If you don't have access to cloud storage buckets:
# 1. Comment out this entire file_mounts section
# 2. Update --output-dir in classify.yaml to use local storage (e.g., ~/sky_workdir)
file_mounts:
/results:
name: ${BUCKET_NAME}
mode: MOUNT
resources:
accelerators: H100:1
disk_size: 100
setup: |
# Set up virtual env
uv venv --python 3.10 --seed
source .venv/bin/activate
# Install vLLM and dependencies
uv pip install vllm==0.11.0 transformers==4.57.1
# Install dependencies for the classification script
uv pip install datasets==2.18.0 tqdm==4.66.1
# Download the model to cache it for all jobs
echo "Downloading model ${MODEL_NAME}..."
huggingface-cli download ${MODEL_NAME} --local-dir /tmp/model
echo "Pool worker setup complete!"
pool:
# Number of workers in the pool
workers: 2