-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Ray Train & Pytorch Lightning demo #559
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"In this notebook we are going to run a Ray Train & Pytorch Lightning script using the CodeFlare SDK and Ray Job Submission.\n", | ||
"\n", | ||
"NOTE: For distributed training an external persistent storage option should be set in the `run_config`.\n", | ||
"You can find examples in the `pytorch_lightning.py` script." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# Import pieces from codeflare-sdk\n", | ||
"from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# Create authentication object for user permissions\n", | ||
"# IF unused, SDK will automatically check for default kubeconfig, then in-cluster config\n", | ||
"# KubeConfigFileAuthentication can also be used to specify kubeconfig path manually\n", | ||
"auth = TokenAuthentication(\n", | ||
" token = \"XXXXX\",\n", | ||
" server = \"XXXXX\",\n", | ||
" skip_tls=False\n", | ||
")\n", | ||
"auth.login()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Once again, let's start by running through the same cluster setup as before:\n", | ||
"\n", | ||
"NOTE: We must specify the `image` which will be used in our RayCluster, we recommend you bring your own image which suits your purposes. \n", | ||
"The example here is a community image." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# Create and configure our cluster object\n", | ||
"# The SDK will try to find the name of your default local queue based on the annotation \"kueue.x-k8s.io/default-queue\": \"true\" unless you specify the local queue manually below\n", | ||
"cluster = Cluster(ClusterConfiguration(\n", | ||
" name='raytest',\n", | ||
" namespace='default', # Update to your namespace\n", | ||
" num_workers=2,\n", | ||
" min_cpus=2,\n", | ||
" max_cpus=2,\n", | ||
" min_memory=8,\n", | ||
" max_memory=8,\n", | ||
" num_gpus=1,\n", | ||
" head_gpus=1,\n", | ||
" image=\"quay.io/project-codeflare/ray:2.20.0-py39-cu118\",\n", | ||
" write_to_file=True, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources \n", | ||
" # local_queue=\"local-queue-name\" # Specify the local queue manually\n", | ||
"))" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# Bring up the cluster\n", | ||
"cluster.up()\n", | ||
"cluster.wait_ready()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"cluster.details()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Note: For this example external S3 compatible storage is required. Please refer to our [documentation](https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/s3-compatible-storage.md) for steps on how to configure this training script." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# Initialize the Job Submission Client\n", | ||
"\"\"\"\n", | ||
"The SDK will automatically gather the dashboard address and authenticate using the Ray Job Submission Client\n", | ||
"\"\"\"\n", | ||
"client = cluster.job_client" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# Submit an example mnist job using the Job Submission Client\n", | ||
"submission_id = client.submit_job(\n", | ||
" entrypoint=\"python pytorch_lightning.py\",\n", | ||
" runtime_env={\"working_dir\": \"./\",\"pip\": \"requirements_lightning.txt\"},\n", | ||
")\n", | ||
"print(submission_id)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# Get the job's logs\n", | ||
"client.get_job_logs(submission_id)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"# Get the job's status\n", | ||
"client.get_job_status(submission_id)" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"cluster.down()" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"auth.logout()" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"language_info": { | ||
"name": "python" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import os | ||
import tempfile | ||
|
||
import torch | ||
from torch.utils.data import DataLoader, DistributedSampler | ||
from torchvision.models import resnet18 | ||
from torchvision.datasets import FashionMNIST | ||
from torchvision.transforms import ToTensor, Normalize, Compose | ||
import lightning.pytorch as pl | ||
|
||
import ray.train.lightning | ||
from ray.train.torch import TorchTrainer | ||
|
||
# Based on https://docs.ray.io/en/latest/train/getting-started-pytorch-lightning.html | ||
|
||
""" | ||
Note: This example requires an S3 compatible storage bucket for distributed training. Please visit our documentation for more information -> https://github.com/project-codeflare/codeflare-sdk/blob/main/docs/s3-compatible-storage.md | ||
""" | ||
|
||
|
||
# Model, Loss, Optimizer | ||
class ImageClassifier(pl.LightningModule): | ||
def __init__(self): | ||
super(ImageClassifier, self).__init__() | ||
self.model = resnet18(num_classes=10) | ||
self.model.conv1 = torch.nn.Conv2d( | ||
1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False | ||
) | ||
self.criterion = torch.nn.CrossEntropyLoss() | ||
|
||
def forward(self, x): | ||
return self.model(x) | ||
|
||
def training_step(self, batch, batch_idx): | ||
x, y = batch | ||
outputs = self.forward(x) | ||
loss = self.criterion(outputs, y) | ||
self.log("loss", loss, on_step=True, prog_bar=True) | ||
return loss | ||
|
||
def configure_optimizers(self): | ||
return torch.optim.Adam(self.model.parameters(), lr=0.001) | ||
|
||
|
||
def train_func(): | ||
# Data | ||
transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))]) | ||
data_dir = os.path.join(tempfile.gettempdir(), "data") | ||
train_data = FashionMNIST( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure the data is shared across workers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looked into this and I would say probably not after finding out that the DistributedSampler exists. I will update this script and the llama2 one to make use of the DistrbutedSampler 👍 |
||
root=data_dir, train=True, download=True, transform=transform | ||
) | ||
|
||
# Training | ||
model = ImageClassifier() | ||
|
||
sampler = DistributedSampler( | ||
train_data, | ||
num_replicas=ray.train.get_context().get_world_size(), | ||
rank=ray.train.get_context().get_world_rank(), | ||
) | ||
|
||
train_dataloader = DataLoader( | ||
train_data, batch_size=128, shuffle=False, sampler=sampler | ||
) | ||
# [1] Configure PyTorch Lightning Trainer. | ||
trainer = pl.Trainer( | ||
max_epochs=10, | ||
devices="auto", | ||
accelerator="auto", | ||
strategy=ray.train.lightning.RayDDPStrategy(), | ||
plugins=[ray.train.lightning.RayLightningEnvironment()], | ||
callbacks=[ray.train.lightning.RayTrainReportCallback()], | ||
# [1a] Optionally, disable the default checkpointing behavior | ||
# in favor of the `RayTrainReportCallback` above. | ||
enable_checkpointing=False, | ||
) | ||
trainer = ray.train.lightning.prepare_trainer(trainer) | ||
trainer.fit(model, train_dataloaders=train_dataloader) | ||
|
||
|
||
# [2] Configure scaling and resource requirements. Set the number of workers to the total number of GPUs on your Ray Cluster. | ||
scaling_config = ray.train.ScalingConfig(num_workers=3, use_gpu=True) | ||
|
||
# [3] Launch distributed training job. | ||
trainer = TorchTrainer( | ||
train_func, | ||
scaling_config=scaling_config, | ||
) | ||
result: ray.train.Result = trainer.fit() | ||
|
||
# [4] Load the trained model. | ||
with result.checkpoint.as_directory() as checkpoint_dir: | ||
model = ImageClassifier.load_from_checkpoint( | ||
os.path.join( | ||
checkpoint_dir, | ||
ray.train.lightning.RayTrainReportCallback.CHECKPOINT_NAME, | ||
), | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
torch==2.3.0 | ||
torchvision==0.18.0 | ||
lightning==2.2.5 | ||
ray[train]==2.20.0 | ||
s3fs==2024.6.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do I configure what path to actually use within the bucket for the distributed training?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created my own bucket with its own path via AWS and gathered the URI using the UI.
s3://mark-bucket/data/
I was not aware we had a shared bucket but you could create a new folder within it and then copy the URI from there.