From 712ae255b29949114ca0126fc6a2ad8037e526db Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Fri, 26 Apr 2024 19:55:12 +0000 Subject: [PATCH 1/8] add sample YAML for pytorch distributed training with GCSFuse for checkpointing Signed-off-by: Andrew Sy Kim --- .../fine-tune-pytorch-text-classifier.py | 7 ++ ...v1alpha1.pytorch-distributed-training.yaml | 4 + ....pytorch-distributed-training-gcsfuse.yaml | 108 ++++++++++++++++++ .../ray-job.pytorch-distributed-training.yaml | 4 + 4 files changed, 123 insertions(+) create mode 100644 ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml diff --git a/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py b/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py index 0617deb03cb..b003853b74f 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py +++ b/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py @@ -101,6 +101,12 @@ def train_func(config): # Model model = SentimentModel(lr=lr, eps=eps) + # load model from latest checkpoint if it already exists + checkpoint = ray.train.get_checkpoint() + if checkpoint: + with checkpoint.as_directory() as checkpoint_dir: + model.load_from_checkpoint(os.path.join(checkpoint_dir, "checkpoint.ckpt")) + trainer = pl.Trainer( max_epochs=max_epochs, accelerator="auto", @@ -131,6 +137,7 @@ def train_func(config): # The checkpoints and metrics are reported by `RayTrainReportCallback` run_config = RunConfig( name="ptl-sent-classification", + storage_path="/mnt/cluster_storage", checkpoint_config=CheckpointConfig( num_to_keep=2, checkpoint_score_attribute="matthews_correlation", diff --git a/ray-operator/config/samples/pytorch-text-classifier/ray-job-v1alpha1.pytorch-distributed-training.yaml b/ray-operator/config/samples/pytorch-text-classifier/ray-job-v1alpha1.pytorch-distributed-training.yaml index 94e52fd1b6b..ec27c52d61c 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/ray-job-v1alpha1.pytorch-distributed-training.yaml +++ b/ray-operator/config/samples/pytorch-text-classifier/ray-job-v1alpha1.pytorch-distributed-training.yaml @@ -36,6 +36,10 @@ spec: volumeMounts: - mountPath: /tmp/ray name: ray-logs + - mountPath: /data + name: data volumes: - name: ray-logs emptyDir: {} + - name: data + emptyDir: {} diff --git a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml new file mode 100644 index 00000000000..7d7008df7fe --- /dev/null +++ b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml @@ -0,0 +1,108 @@ +# This RayJob is based on the "Fine-tune a PyTorch Lightning Text Classifier with Ray Data" example in the Ray documentation. +# See https://docs.ray.io/en/master/train/examples/lightning/lightning_cola_advanced.html for more details. +apiVersion: ray.io/v1 +kind: RayJob +metadata: + generateName: pytorch-text-classifier- +spec: + shutdownAfterJobFinishes: true + entrypoint: python ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py + runtimeEnvYAML: | + pip: + - numpy + - datasets + - transformers>=4.19.1 + - pytorch-lightning==1.6.5 + working_dir: "https://github.com/andrewsykim/kuberay/archive/pytorch-lightning-image-classifier.zip" + rayClusterSpec: + rayVersion: '2.9.0' + headGroupSpec: + rayStartParams: + dashboard-host: '0.0.0.0' + template: + metadata: + annotations: + gke-gcsfuse/volumes: "true" + gke-gcsfuse/cpu-limit: "0" + gke-gcsfuse/memory-limit: 5Gi + gke-gcsfuse/ephemeral-storage-limit: 10Gi + spec: + serviceAccountName: pytorch-distributed-training + containers: + - name: ray-head + image: rayproject/ray:2.9.0 + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + resources: + limits: + cpu: "1" + memory: "8G" + requests: + cpu: "1" + memory: "8G" + volumeMounts: + - mountPath: /tmp/ray + name: ray-logs + - mountPath: /mnt/cluster_storage + name: cluster-storage + volumes: + - name: ray-logs + emptyDir: {} + - name: cluster-storage + csi: + driver: gcsfuse.csi.storage.gke.io + volumeAttributes: + bucketName: andrewsy-gke-bucket + mountOptions: "implicit-dirs,uid=1000,gid=100" + workerGroupSpecs: + - replicas: 2 + groupName: gpu-group + rayStartParams: + dashboard-host: '0.0.0.0' + template: + metadata: + annotations: + gke-gcsfuse/volumes: "true" + gke-gcsfuse/cpu-limit: "0" + gke-gcsfuse/memory-limit: 5Gi + gke-gcsfuse/ephemeral-storage-limit: 10Gi + spec: + serviceAccountName: pytorch-distributed-training + tolerations: + - key: "nvidia.com/gpu" + operator: "Exists" + effect: "NoSchedule" + containers: + - name: ray-worker + image: rayproject/ray-ml:2.9.0-gpu + lifecycle: + preStop: + exec: + command: [ "/bin/sh","-c","ray stop" ] + resources: + limits: + memory: "8G" + nvidia.com/gpu: "1" + requests: + cpu: "1" + memory: "8G" + nvidia.com/gpu: "1" + volumeMounts: + - mountPath: /tmp/ray + name: ray-logs + - mountPath: /mnt/cluster_storage + name: cluster-storage + volumes: + - name: ray-logs + emptyDir: {} + - name: cluster-storage + csi: + driver: gcsfuse.csi.storage.gke.io + volumeAttributes: + bucketName: andrewsy-gke-bucket + mountOptions: "implicit-dirs,uid=1000,gid=100" diff --git a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training.yaml b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training.yaml index b21a25a401b..d7a35a9e9c3 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training.yaml +++ b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training.yaml @@ -47,6 +47,10 @@ spec: volumeMounts: - mountPath: /tmp/ray name: ray-logs + - mountPath: /mnt/cluster_storage + name: cluster-storage volumes: - name: ray-logs emptyDir: {} + - name: cluster-storage + emptyDir: {} From 06eb03bd48ed3370267352275816ff41c4dbda66 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Mon, 29 Apr 2024 02:10:56 +0000 Subject: [PATCH 2/8] add resume from checkpoint Signed-off-by: Andrew Sy Kim --- .../fine-tune-pytorch-text-classifier.py | 22 ++++++++++--------- ....pytorch-distributed-training-gcsfuse.yaml | 7 ++++++ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py b/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py index b003853b74f..0df1c0abec5 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py +++ b/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py @@ -1,5 +1,6 @@ import ray import torch +import os import numpy as np import pytorch_lightning as pl import torch.nn.functional as F @@ -14,7 +15,7 @@ RayTrainReportCallback, ) from ray.train.torch import TorchTrainer -from ray.train import RunConfig, ScalingConfig, CheckpointConfig, DataConfig +from ray.train import RunConfig, ScalingConfig, CheckpointConfig, DataConfig, Checkpoint class SentimentModel(pl.LightningModule): def __init__(self, lr=2e-5, eps=1e-8): @@ -101,12 +102,6 @@ def train_func(config): # Model model = SentimentModel(lr=lr, eps=eps) - # load model from latest checkpoint if it already exists - checkpoint = ray.train.get_checkpoint() - if checkpoint: - with checkpoint.as_directory() as checkpoint_dir: - model.load_from_checkpoint(os.path.join(checkpoint_dir, "checkpoint.ckpt")) - trainer = pl.Trainer( max_epochs=max_epochs, accelerator="auto", @@ -119,7 +114,14 @@ def train_func(config): trainer = prepare_trainer(trainer) - trainer.fit(model, train_dataloaders=train_ds_loader, val_dataloaders=val_ds_loader) + # load model from latest checkpoint if it already exists + checkpoint = ray.train.get_checkpoint() + if checkpoint: + with checkpoint.as_directory() as ckpt_dir: + ckpt_path = os.path.join(ckpt_dir, "checkpoint.ckpt") + trainer.fit(model, train_dataloaders=train_ds_loader, val_dataloaders=val_ds_loader, ckpt_path=ckpt_path) + else: + trainer.fit(model, train_dataloaders=train_ds_loader, val_dataloaders=val_ds_loader) if __name__ == "__main__": @@ -145,8 +147,8 @@ def train_func(config): ), ) - # Schedule 2 workers for DDP training (1 GPU/worker by default) - scaling_config = ScalingConfig(num_workers=1, use_gpu=True) + num_workers = int(os.environ.get("NUM_WORKERS", "1")) + scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=True) trainer = TorchTrainer( train_loop_per_worker=train_func, diff --git a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml index 7d7008df7fe..fc0624a1ed8 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml +++ b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml @@ -31,6 +31,13 @@ spec: containers: - name: ray-head image: rayproject/ray:2.9.0 + env: + - name: NUM_WORKERS + value: "2" + # Set CHECKPOINT_DIR to reload checkpoint from a previous run. + # Example: /mnt/cluster_storage/ptl-sent-classification/TorchTrainer_bf43d_00000_0_2024-04-28_19-54-07/checkpoint_000004 + # - name: CHECKPOINT_DIR + # value: "" ports: - containerPort: 6379 name: gcs-server From 9377db677c472b0f33faf33c00da4d11b194826b Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Mon, 29 Apr 2024 15:36:11 +0000 Subject: [PATCH 3/8] add pytorch resnet image classifier example Signed-off-by: Andrew Sy Kim --- ...ne-tune-pytorch-resnet-image-classifier.py | 230 ++++++++++++++++++ .../ray-job.pytorch-image-classifier.yaml | 116 +++++++++ .../fine-tune-pytorch-text-classifier.py | 10 +- ....pytorch-distributed-training-gcsfuse.yaml | 2 - 4 files changed, 347 insertions(+), 11 deletions(-) create mode 100644 ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py create mode 100644 ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml diff --git a/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py b/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py new file mode 100644 index 00000000000..7b275d3c8fd --- /dev/null +++ b/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py @@ -0,0 +1,230 @@ +import os +from tempfile import TemporaryDirectory + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader +from torchvision import datasets, models, transforms +import numpy as np + +import ray.train as train +from ray.train.torch import TorchTrainer +from ray.train import ScalingConfig, RunConfig, CheckpointConfig, Checkpoint + +# Data augmentation and normalization for training +# Just normalization for validation +data_transforms = { + "train": transforms.Compose( + [ + transforms.RandomResizedCrop(224), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), + ] + ), + "val": transforms.Compose( + [ + transforms.Resize(224), + transforms.CenterCrop(224), + transforms.ToTensor(), + transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), + ] + ), +} + +def download_datasets(): + os.system( + "wget https://download.pytorch.org/tutorial/hymenoptera_data.zip >/dev/null 2>&1" + ) + os.system("unzip hymenoptera_data.zip >/dev/null 2>&1") + +# Download and build torch datasets +def build_datasets(): + torch_datasets = {} + for split in ["train", "val"]: + torch_datasets[split] = datasets.ImageFolder( + os.path.join("./hymenoptera_data", split), data_transforms[split] + ) + return torch_datasets + +def initialize_model(): + # Load pretrained model params + model = models.resnet50(pretrained=True) + + # Replace the original classifier with a new Linear layer + num_features = model.fc.in_features + model.fc = nn.Linear(num_features, 2) + + # Ensure all params get updated during finetuning + for param in model.parameters(): + param.requires_grad = True + return model + + +def initialize_model_from_checkpoint(checkpoint: Checkpoint): + with checkpoint.as_directory() as tmpdir: + state_dict = torch.load(os.path.join(tmpdir, "checkpoint.pt")) + resnet50 = initialize_model() + resnet50.load_state_dict(state_dict["model"]) + return resnet50 + +def evaluate(logits, labels): + _, preds = torch.max(logits, 1) + corrects = torch.sum(preds == labels).item() + return corrects + +train_loop_config = { + "input_size": 224, # Input image size (224 x 224) + "batch_size": 32, # Batch size for training + "num_epochs": 10, # Number of epochs to train for + "lr": 0.001, # Learning Rate + "momentum": 0.9, # SGD optimizer momentum +} + +def train_loop_per_worker(configs): + import warnings + + warnings.filterwarnings("ignore") + + # Calculate the batch size for a single worker + worker_batch_size = configs["batch_size"] + + # Download dataset once on local rank 0 worker + if train.get_context().get_local_rank() == 0: + download_datasets() + torch.distributed.barrier() + + # Build datasets on each worker + torch_datasets = build_datasets() + + # Prepare dataloader for each worker + dataloaders = dict() + dataloaders["train"] = DataLoader( + torch_datasets["train"], batch_size=worker_batch_size, shuffle=True + ) + dataloaders["val"] = DataLoader( + torch_datasets["val"], batch_size=worker_batch_size, shuffle=False + ) + + # Distribute + dataloaders["train"] = train.torch.prepare_data_loader(dataloaders["train"]) + dataloaders["val"] = train.torch.prepare_data_loader(dataloaders["val"]) + + device = train.torch.get_device() + + # Prepare DDP Model, optimizer, and loss function. + model = initialize_model() + + # Reload from checkpoint if exists. + start_epoch = 0 + checkpoint = train.get_checkpoint() + if checkpoint: + with checkpoint.as_directory() as tmpdir: + state_dict = torch.load(os.path.join(tmpdir, "checkpoint.pt")) + model.load_state_dict(state_dict['model']) + + start_epoch = state_dict['epoch'] + 1 + + model = train.torch.prepare_model(model) + + optimizer = optim.SGD( + model.parameters(), lr=configs["lr"], momentum=configs["momentum"] + ) + criterion = nn.CrossEntropyLoss() + + # Start training loops + for epoch in range(start_epoch, configs["num_epochs"]): + # Each epoch has a training and validation phase + for phase in ["train", "val"]: + if phase == "train": + model.train() # Set model to training mode + else: + model.eval() # Set model to evaluate mode + + running_loss = 0.0 + running_corrects = 0 + + if train.get_context().get_world_size() > 1: + dataloaders[phase].sampler.set_epoch(epoch) + + for inputs, labels in dataloaders[phase]: + inputs = inputs.to(device) + labels = labels.to(device) + + # zero the parameter gradients + optimizer.zero_grad() + + # forward + with torch.set_grad_enabled(phase == "train"): + # Get model outputs and calculate loss + outputs = model(inputs) + loss = criterion(outputs, labels) + + # backward + optimize only if in training phase + if phase == "train": + loss.backward() + optimizer.step() + + # calculate statistics + running_loss += loss.item() * inputs.size(0) + running_corrects += evaluate(outputs, labels) + + size = len(torch_datasets[phase]) + epoch_loss = running_loss / size + epoch_acc = running_corrects / size + + if train.get_context().get_world_rank() == 0: + print( + "Epoch {}-{} Loss: {:.4f} Acc: {:.4f}".format( + epoch, phase, epoch_loss, epoch_acc + ) + ) + + # Report metrics and checkpoint every epoch + if phase == "val": + with TemporaryDirectory() as tmpdir: + state_dict = { + "epoch": epoch, + "model": model.module.state_dict(), + "optimizer_state_dict": optimizer.state_dict(), + } + torch.save(state_dict, os.path.join(tmpdir, "checkpoint.pt")) + train.report( + metrics={"loss": epoch_loss, "acc": epoch_acc}, + checkpoint=Checkpoint.from_directory(tmpdir), + ) + +if __name__ == "__main__": + num_workers = int(os.environ.get("NUM_WORKERS", "4")) + scaling_config = ScalingConfig( + num_workers=num_workers, use_gpu=True, resources_per_worker={"CPU": 1, "GPU": 1} + ) + + checkpoint_config = CheckpointConfig(num_to_keep=1) + run_config = RunConfig( + name="finetune-resnet", + storage_path="/mnt/cluster_storage", + checkpoint_config=checkpoint_config, + ) + + checkpoint_dir = os.environ.get('CHECKPOINT_DIR') + if checkpoint_dir is not None: + checkpoint = Checkpoint.from_directory(checkpoint_dir) + trainer = TorchTrainer( + train_loop_per_worker=train_loop_per_worker, + train_loop_config=train_loop_config, + scaling_config=scaling_config, + run_config=run_config, + resume_from_checkpoint=checkpoint, + ) + else: + trainer = TorchTrainer( + train_loop_per_worker=train_loop_per_worker, + train_loop_config=train_loop_config, + scaling_config=scaling_config, + run_config=run_config, + ) + + result = trainer.fit() + print(result) \ No newline at end of file diff --git a/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml b/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml new file mode 100644 index 00000000000..571a0c05316 --- /dev/null +++ b/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml @@ -0,0 +1,116 @@ +# This RayJob is based on the "Fine-tune a PyTorch Lightning Text Classifier with Ray Data" example in the Ray documentation. +# See https://docs.ray.io/en/master/train/examples/lightning/lightning_cola_advanced.html for more details. +apiVersion: ray.io/v1 +kind: RayJob +metadata: + generateName: pytorch-image-classifier- +spec: + shutdownAfterJobFinishes: true + entrypoint: python ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py + runtimeEnvYAML: | + pip: + - numpy + - datasets + - torch + - torchvision + - transformers>=4.19.1 + working_dir: "https://github.com/andrewsykim/kuberay/archive/pytorch-lightning-image-classifier.zip" + rayClusterSpec: + rayVersion: '2.9.0' + headGroupSpec: + rayStartParams: + dashboard-host: '0.0.0.0' + template: + metadata: + annotations: + gke-gcsfuse/volumes: "true" + gke-gcsfuse/cpu-limit: "0" + gke-gcsfuse/memory-limit: 5Gi + gke-gcsfuse/ephemeral-storage-limit: 10Gi + spec: + serviceAccountName: pytorch-distributed-training + containers: + - name: ray-head + image: rayproject/ray:2.9.0 + env: + - name: NUM_WORKERS + value: "4" + # Set CHECKPOINT_DIR to reload checkpoint from a previous run. + # Example: /mnt/cluster_storage/ptl-sent-classification/TorchTrainer_bf43d_00000_0_2024-04-28_19-54-07/checkpoint_000004 + - name: CHECKPOINT_DIR + value: "/mnt/cluster_storage/finetune-resnet/TorchTrainer_cbb82_00000_0_2024-04-29_10-20-37/checkpoint_000009" + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + resources: + limits: + cpu: "1" + memory: "8G" + requests: + cpu: "1" + memory: "8G" + volumeMounts: + - mountPath: /tmp/ray + name: ray-logs + - mountPath: /mnt/cluster_storage + name: cluster-storage + volumes: + - name: ray-logs + emptyDir: {} + - name: cluster-storage + csi: + driver: gcsfuse.csi.storage.gke.io + volumeAttributes: + bucketName: andrewsy-gke-bucket + mountOptions: "implicit-dirs,uid=1000,gid=100" + workerGroupSpecs: + - replicas: 4 + groupName: gpu-group + rayStartParams: + dashboard-host: '0.0.0.0' + template: + metadata: + annotations: + gke-gcsfuse/volumes: "true" + gke-gcsfuse/cpu-limit: "0" + gke-gcsfuse/memory-limit: 5Gi + gke-gcsfuse/ephemeral-storage-limit: 10Gi + spec: + serviceAccountName: pytorch-distributed-training + tolerations: + - key: "nvidia.com/gpu" + operator: "Exists" + effect: "NoSchedule" + containers: + - name: ray-worker + image: rayproject/ray-ml:2.9.0-gpu + lifecycle: + preStop: + exec: + command: [ "/bin/sh","-c","ray stop" ] + resources: + limits: + memory: "8G" + nvidia.com/gpu: "1" + requests: + cpu: "1" + memory: "8G" + nvidia.com/gpu: "1" + volumeMounts: + - mountPath: /tmp/ray + name: ray-logs + - mountPath: /mnt/cluster_storage + name: cluster-storage + volumes: + - name: ray-logs + emptyDir: {} + - name: cluster-storage + csi: + driver: gcsfuse.csi.storage.gke.io + volumeAttributes: + bucketName: andrewsy-gke-bucket + mountOptions: "implicit-dirs,uid=1000,gid=100" diff --git a/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py b/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py index 0df1c0abec5..56077fef088 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py +++ b/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py @@ -113,15 +113,7 @@ def train_func(config): ) trainer = prepare_trainer(trainer) - - # load model from latest checkpoint if it already exists - checkpoint = ray.train.get_checkpoint() - if checkpoint: - with checkpoint.as_directory() as ckpt_dir: - ckpt_path = os.path.join(ckpt_dir, "checkpoint.ckpt") - trainer.fit(model, train_dataloaders=train_ds_loader, val_dataloaders=val_ds_loader, ckpt_path=ckpt_path) - else: - trainer.fit(model, train_dataloaders=train_ds_loader, val_dataloaders=val_ds_loader) + trainer.fit(model, train_dataloaders=train_ds_loader, val_dataloaders=val_ds_loader) if __name__ == "__main__": diff --git a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml index fc0624a1ed8..c2c1be8bdee 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml +++ b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml @@ -32,8 +32,6 @@ spec: - name: ray-head image: rayproject/ray:2.9.0 env: - - name: NUM_WORKERS - value: "2" # Set CHECKPOINT_DIR to reload checkpoint from a previous run. # Example: /mnt/cluster_storage/ptl-sent-classification/TorchTrainer_bf43d_00000_0_2024-04-28_19-54-07/checkpoint_000004 # - name: CHECKPOINT_DIR From a948b8f6563066c4d0d61d2a94cbf3389d7d5570 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Mon, 29 Apr 2024 22:40:49 +0000 Subject: [PATCH 4/8] use TorchTrainer.can_restore Signed-off-by: Andrew Sy Kim --- ...ne-tune-pytorch-resnet-image-classifier.py | 30 ++++++++----------- .../ray-job.pytorch-image-classifier.yaml | 6 +--- ....pytorch-distributed-training-gcsfuse.yaml | 6 +--- 3 files changed, 15 insertions(+), 27 deletions(-) diff --git a/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py b/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py index 7b275d3c8fd..966f228b5d1 100644 --- a/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py +++ b/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py @@ -62,13 +62,6 @@ def initialize_model(): return model -def initialize_model_from_checkpoint(checkpoint: Checkpoint): - with checkpoint.as_directory() as tmpdir: - state_dict = torch.load(os.path.join(tmpdir, "checkpoint.pt")) - resnet50 = initialize_model() - resnet50.load_state_dict(state_dict["model"]) - return resnet50 - def evaluate(logits, labels): _, preds = torch.max(logits, 1) corrects = torch.sum(preds == labels).item() @@ -120,8 +113,8 @@ def train_loop_per_worker(configs): start_epoch = 0 checkpoint = train.get_checkpoint() if checkpoint: - with checkpoint.as_directory() as tmpdir: - state_dict = torch.load(os.path.join(tmpdir, "checkpoint.pt")) + with checkpoint.as_directory() as checkpoint_dir: + state_dict = torch.load(os.path.join(checkpoint_dir, "checkpoint.pt")) model.load_state_dict(state_dict['model']) start_epoch = state_dict['epoch'] + 1 @@ -189,7 +182,12 @@ def train_loop_per_worker(configs): "model": model.module.state_dict(), "optimizer_state_dict": optimizer.state_dict(), } - torch.save(state_dict, os.path.join(tmpdir, "checkpoint.pt")) + + # In standard DDP training, where the model is the same across all ranks, + # only the global rank 0 worker needs to save and report the checkpoint + if train.get_context().get_world_rank() == 0: + torch.save(state_dict, os.path.join(tmpdir, "checkpoint.pt")) + train.report( metrics={"loss": epoch_loss, "acc": epoch_acc}, checkpoint=Checkpoint.from_directory(tmpdir), @@ -201,22 +199,20 @@ def train_loop_per_worker(configs): num_workers=num_workers, use_gpu=True, resources_per_worker={"CPU": 1, "GPU": 1} ) - checkpoint_config = CheckpointConfig(num_to_keep=1) + checkpoint_config = CheckpointConfig(num_to_keep=3) run_config = RunConfig( name="finetune-resnet", storage_path="/mnt/cluster_storage", checkpoint_config=checkpoint_config, ) - checkpoint_dir = os.environ.get('CHECKPOINT_DIR') - if checkpoint_dir is not None: - checkpoint = Checkpoint.from_directory(checkpoint_dir) - trainer = TorchTrainer( + experiment_path = os.path.expanduser("/mnt/cluster_storage/finetune-resnet") + if TorchTrainer.can_restore(experiment_path): + trainer = TorchTrainer.restore(experiment_path, train_loop_per_worker=train_loop_per_worker, train_loop_config=train_loop_config, scaling_config=scaling_config, run_config=run_config, - resume_from_checkpoint=checkpoint, ) else: trainer = TorchTrainer( @@ -227,4 +223,4 @@ def train_loop_per_worker(configs): ) result = trainer.fit() - print(result) \ No newline at end of file + print(result) diff --git a/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml b/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml index 571a0c05316..c3492d8d2b7 100644 --- a/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml +++ b/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml @@ -14,7 +14,7 @@ spec: - torch - torchvision - transformers>=4.19.1 - working_dir: "https://github.com/andrewsykim/kuberay/archive/pytorch-lightning-image-classifier.zip" + working_dir: "https://github.com/ray-project/kuberay/archive/master.zip" rayClusterSpec: rayVersion: '2.9.0' headGroupSpec: @@ -35,10 +35,6 @@ spec: env: - name: NUM_WORKERS value: "4" - # Set CHECKPOINT_DIR to reload checkpoint from a previous run. - # Example: /mnt/cluster_storage/ptl-sent-classification/TorchTrainer_bf43d_00000_0_2024-04-28_19-54-07/checkpoint_000004 - - name: CHECKPOINT_DIR - value: "/mnt/cluster_storage/finetune-resnet/TorchTrainer_cbb82_00000_0_2024-04-29_10-20-37/checkpoint_000009" ports: - containerPort: 6379 name: gcs-server diff --git a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml index c2c1be8bdee..d296716c4aa 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml +++ b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml @@ -13,7 +13,7 @@ spec: - datasets - transformers>=4.19.1 - pytorch-lightning==1.6.5 - working_dir: "https://github.com/andrewsykim/kuberay/archive/pytorch-lightning-image-classifier.zip" + working_dir: "https://github.com/ray-project/kuberay/archive/master.zip" rayClusterSpec: rayVersion: '2.9.0' headGroupSpec: @@ -32,10 +32,6 @@ spec: - name: ray-head image: rayproject/ray:2.9.0 env: - # Set CHECKPOINT_DIR to reload checkpoint from a previous run. - # Example: /mnt/cluster_storage/ptl-sent-classification/TorchTrainer_bf43d_00000_0_2024-04-28_19-54-07/checkpoint_000004 - # - name: CHECKPOINT_DIR - # value: "" ports: - containerPort: 6379 name: gcs-server From 03cf52db67c62882e71c56814da696c35bbc53be Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Tue, 30 Apr 2024 01:41:19 +0000 Subject: [PATCH 5/8] update placeholders Signed-off-by: Andrew Sy Kim --- .../fine-tune-pytorch-resnet-image-classifier.py | 3 +-- .../ray-job.pytorch-image-classifier.yaml | 8 ++++---- .../ray-job.pytorch-distributed-training-gcsfuse.yaml | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py b/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py index 966f228b5d1..f43e7d66087 100644 --- a/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py +++ b/ray-operator/config/samples/pytorch-resnet-image-classifier/fine-tune-pytorch-resnet-image-classifier.py @@ -1,4 +1,5 @@ import os +import warnings from tempfile import TemporaryDirectory import torch @@ -76,8 +77,6 @@ def evaluate(logits, labels): } def train_loop_per_worker(configs): - import warnings - warnings.filterwarnings("ignore") # Calculate the batch size for a single worker diff --git a/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml b/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml index c3492d8d2b7..85d34421305 100644 --- a/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml +++ b/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml @@ -1,5 +1,5 @@ -# This RayJob is based on the "Fine-tune a PyTorch Lightning Text Classifier with Ray Data" example in the Ray documentation. -# See https://docs.ray.io/en/master/train/examples/lightning/lightning_cola_advanced.html for more details. +# This RayJob is based on the "Finetuning a Pytorch Image Classifier with Ray Train" example in the Ray documentation. +# See https://docs.ray.io/en/latest/train/examples/pytorch/pytorch_resnet_finetune.html for more details. apiVersion: ray.io/v1 kind: RayJob metadata: @@ -61,7 +61,7 @@ spec: csi: driver: gcsfuse.csi.storage.gke.io volumeAttributes: - bucketName: andrewsy-gke-bucket + bucketName: GCS_BUCKET mountOptions: "implicit-dirs,uid=1000,gid=100" workerGroupSpecs: - replicas: 4 @@ -108,5 +108,5 @@ spec: csi: driver: gcsfuse.csi.storage.gke.io volumeAttributes: - bucketName: andrewsy-gke-bucket + bucketName: GCS_BUCKET mountOptions: "implicit-dirs,uid=1000,gid=100" diff --git a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml index d296716c4aa..fc8e3b41be4 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml +++ b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml @@ -58,7 +58,7 @@ spec: csi: driver: gcsfuse.csi.storage.gke.io volumeAttributes: - bucketName: andrewsy-gke-bucket + bucketName: GCS_BUCKET mountOptions: "implicit-dirs,uid=1000,gid=100" workerGroupSpecs: - replicas: 2 @@ -105,5 +105,5 @@ spec: csi: driver: gcsfuse.csi.storage.gke.io volumeAttributes: - bucketName: andrewsy-gke-bucket + bucketName: GCS_BUCKET mountOptions: "implicit-dirs,uid=1000,gid=100" From 7b9759d6d3c38e20497f6d1c495b8d7f4ba2e028 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 16 May 2024 16:49:59 +0000 Subject: [PATCH 6/8] address comments from Kai-Hsun Signed-off-by: Andrew Sy Kim --- .../ray-job.pytorch-image-classifier.yaml | 4 - ...v1alpha1.pytorch-distributed-training.yaml | 4 - ....pytorch-distributed-training-gcsfuse.yaml | 109 ------------------ .../ray-job.pytorch-distributed-training.yaml | 4 - 4 files changed, 121 deletions(-) delete mode 100644 ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml diff --git a/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml b/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml index 85d34421305..db76535e3d1 100644 --- a/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml +++ b/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml @@ -84,10 +84,6 @@ spec: containers: - name: ray-worker image: rayproject/ray-ml:2.9.0-gpu - lifecycle: - preStop: - exec: - command: [ "/bin/sh","-c","ray stop" ] resources: limits: memory: "8G" diff --git a/ray-operator/config/samples/pytorch-text-classifier/ray-job-v1alpha1.pytorch-distributed-training.yaml b/ray-operator/config/samples/pytorch-text-classifier/ray-job-v1alpha1.pytorch-distributed-training.yaml index ec27c52d61c..94e52fd1b6b 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/ray-job-v1alpha1.pytorch-distributed-training.yaml +++ b/ray-operator/config/samples/pytorch-text-classifier/ray-job-v1alpha1.pytorch-distributed-training.yaml @@ -36,10 +36,6 @@ spec: volumeMounts: - mountPath: /tmp/ray name: ray-logs - - mountPath: /data - name: data volumes: - name: ray-logs emptyDir: {} - - name: data - emptyDir: {} diff --git a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml deleted file mode 100644 index fc8e3b41be4..00000000000 --- a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training-gcsfuse.yaml +++ /dev/null @@ -1,109 +0,0 @@ -# This RayJob is based on the "Fine-tune a PyTorch Lightning Text Classifier with Ray Data" example in the Ray documentation. -# See https://docs.ray.io/en/master/train/examples/lightning/lightning_cola_advanced.html for more details. -apiVersion: ray.io/v1 -kind: RayJob -metadata: - generateName: pytorch-text-classifier- -spec: - shutdownAfterJobFinishes: true - entrypoint: python ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py - runtimeEnvYAML: | - pip: - - numpy - - datasets - - transformers>=4.19.1 - - pytorch-lightning==1.6.5 - working_dir: "https://github.com/ray-project/kuberay/archive/master.zip" - rayClusterSpec: - rayVersion: '2.9.0' - headGroupSpec: - rayStartParams: - dashboard-host: '0.0.0.0' - template: - metadata: - annotations: - gke-gcsfuse/volumes: "true" - gke-gcsfuse/cpu-limit: "0" - gke-gcsfuse/memory-limit: 5Gi - gke-gcsfuse/ephemeral-storage-limit: 10Gi - spec: - serviceAccountName: pytorch-distributed-training - containers: - - name: ray-head - image: rayproject/ray:2.9.0 - env: - ports: - - containerPort: 6379 - name: gcs-server - - containerPort: 8265 - name: dashboard - - containerPort: 10001 - name: client - resources: - limits: - cpu: "1" - memory: "8G" - requests: - cpu: "1" - memory: "8G" - volumeMounts: - - mountPath: /tmp/ray - name: ray-logs - - mountPath: /mnt/cluster_storage - name: cluster-storage - volumes: - - name: ray-logs - emptyDir: {} - - name: cluster-storage - csi: - driver: gcsfuse.csi.storage.gke.io - volumeAttributes: - bucketName: GCS_BUCKET - mountOptions: "implicit-dirs,uid=1000,gid=100" - workerGroupSpecs: - - replicas: 2 - groupName: gpu-group - rayStartParams: - dashboard-host: '0.0.0.0' - template: - metadata: - annotations: - gke-gcsfuse/volumes: "true" - gke-gcsfuse/cpu-limit: "0" - gke-gcsfuse/memory-limit: 5Gi - gke-gcsfuse/ephemeral-storage-limit: 10Gi - spec: - serviceAccountName: pytorch-distributed-training - tolerations: - - key: "nvidia.com/gpu" - operator: "Exists" - effect: "NoSchedule" - containers: - - name: ray-worker - image: rayproject/ray-ml:2.9.0-gpu - lifecycle: - preStop: - exec: - command: [ "/bin/sh","-c","ray stop" ] - resources: - limits: - memory: "8G" - nvidia.com/gpu: "1" - requests: - cpu: "1" - memory: "8G" - nvidia.com/gpu: "1" - volumeMounts: - - mountPath: /tmp/ray - name: ray-logs - - mountPath: /mnt/cluster_storage - name: cluster-storage - volumes: - - name: ray-logs - emptyDir: {} - - name: cluster-storage - csi: - driver: gcsfuse.csi.storage.gke.io - volumeAttributes: - bucketName: GCS_BUCKET - mountOptions: "implicit-dirs,uid=1000,gid=100" diff --git a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training.yaml b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training.yaml index d7a35a9e9c3..b21a25a401b 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training.yaml +++ b/ray-operator/config/samples/pytorch-text-classifier/ray-job.pytorch-distributed-training.yaml @@ -47,10 +47,6 @@ spec: volumeMounts: - mountPath: /tmp/ray name: ray-logs - - mountPath: /mnt/cluster_storage - name: cluster-storage volumes: - name: ray-logs emptyDir: {} - - name: cluster-storage - emptyDir: {} From d8799fc29b9dd2f6f733a5ce952a9bcbc27b1862 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 16 May 2024 16:52:53 +0000 Subject: [PATCH 7/8] fix missing minReplicas and maxRelicas Signed-off-by: Andrew Sy Kim --- .../ray-job.pytorch-image-classifier.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml b/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml index db76535e3d1..67fb7727e2a 100644 --- a/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml +++ b/ray-operator/config/samples/pytorch-resnet-image-classifier/ray-job.pytorch-image-classifier.yaml @@ -65,6 +65,8 @@ spec: mountOptions: "implicit-dirs,uid=1000,gid=100" workerGroupSpecs: - replicas: 4 + minReplicas: 4 + maxReplicas: 4 groupName: gpu-group rayStartParams: dashboard-host: '0.0.0.0' From b13a91b656fe4139aaaff60dc9f084f0046c80c2 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Thu, 16 May 2024 17:12:16 +0000 Subject: [PATCH 8/8] revert changes to text classifier example Signed-off-by: Andrew Sy Kim --- .../fine-tune-pytorch-text-classifier.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py b/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py index 56077fef088..0617deb03cb 100644 --- a/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py +++ b/ray-operator/config/samples/pytorch-text-classifier/fine-tune-pytorch-text-classifier.py @@ -1,6 +1,5 @@ import ray import torch -import os import numpy as np import pytorch_lightning as pl import torch.nn.functional as F @@ -15,7 +14,7 @@ RayTrainReportCallback, ) from ray.train.torch import TorchTrainer -from ray.train import RunConfig, ScalingConfig, CheckpointConfig, DataConfig, Checkpoint +from ray.train import RunConfig, ScalingConfig, CheckpointConfig, DataConfig class SentimentModel(pl.LightningModule): def __init__(self, lr=2e-5, eps=1e-8): @@ -113,6 +112,7 @@ def train_func(config): ) trainer = prepare_trainer(trainer) + trainer.fit(model, train_dataloaders=train_ds_loader, val_dataloaders=val_ds_loader) @@ -131,7 +131,6 @@ def train_func(config): # The checkpoints and metrics are reported by `RayTrainReportCallback` run_config = RunConfig( name="ptl-sent-classification", - storage_path="/mnt/cluster_storage", checkpoint_config=CheckpointConfig( num_to_keep=2, checkpoint_score_attribute="matthews_correlation", @@ -139,8 +138,8 @@ def train_func(config): ), ) - num_workers = int(os.environ.get("NUM_WORKERS", "1")) - scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=True) + # Schedule 2 workers for DDP training (1 GPU/worker by default) + scaling_config = ScalingConfig(num_workers=1, use_gpu=True) trainer = TorchTrainer( train_loop_per_worker=train_func,