Skip to content

Commit

Permalink
update to latest ray doc
Browse files Browse the repository at this point in the history
Signed-off-by: Zhi Lin <[email protected]>
  • Loading branch information
kira-lin committed Apr 10, 2024
1 parent 0d86b0e commit eff6b64
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 12 deletions.
11 changes: 5 additions & 6 deletions python/raydp/tests/test_tf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ def test_tf_estimator(spark_on_ray_small, use_fs_directory):
spark = spark_on_ray_small

# ---------------- data process with Spark ------------
# calculate z = 3 * x + 4 * y + 5
# calculate y = 3 * x + 4
df: pyspark.sql.DataFrame = spark.range(0, 100000)
df = df.withColumn("x", rand() * 100) # add x column
df = df.withColumn("y", rand() * 1000) # ad y column
df = df.withColumn("z", df.x * 3 + df.y * 4 + rand() + 5) # ad z column
df = df.select(df.x, df.y, df.z)
df = df.withColumn("y", df.x * 3 + rand() + 4) # add y column
df = df.select(df.x, df.y)

train_df, test_df = random_split(df, [0.7, 0.3])

Expand All @@ -59,8 +58,8 @@ def test_tf_estimator(spark_on_ray_small, use_fs_directory):
optimizer=optimizer,
loss=loss,
metrics=["accuracy", "mse"],
feature_columns=["x", "y"],
label_columns="z",
feature_columns=["x"],
label_columns="y",
batch_size=1000,
num_epochs=2,
use_gpu=False)
Expand Down
15 changes: 13 additions & 2 deletions python/raydp/tf/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
# limitations under the License.
#

import json
import os
import tempfile
from typing import Any, List, NoReturn, Optional, Union, Dict

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow import DType, TensorShape
from tensorflow.keras.callbacks import Callback

from ray.train import Checkpoint
from ray.train.tensorflow import TensorflowTrainer, TensorflowCheckpoint, prepare_dataset_shard
from ray.air import session
from ray.air.config import ScalingConfig, RunConfig, FailureConfig
Expand All @@ -43,7 +47,7 @@ def __init__(self,
metrics: Union[List[keras.metrics.Metric], List[str]] = None,
feature_columns: Union[str, List[str]] = None,
label_columns: Union[str, List[str]] = None,
merge_feature_columns: bool = True,
merge_feature_columns: bool = False,
batch_size: int = 128,
drop_last: bool = False,
num_epochs: int = 1,
Expand Down Expand Up @@ -184,7 +188,14 @@ def train_func(config):
if config["evaluate"]:
test_history = multi_worker_model.evaluate(eval_tf_dataset, callbacks=callbacks)
results.append(test_history)
session.report({}, checkpoint=TensorflowCheckpoint.from_model(multi_worker_model))
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
multi_worker_model.save(os.path.join(temp_checkpoint_dir, "model.keras"))
checkpoint_dict = os.path.join(temp_checkpoint_dir, "checkpoint.json")
with open(checkpoint_dict, "w") as f:
json.dump({"epoch": config["num_epochs"]}, f)
checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)

session.report({}, checkpoint=checkpoint)

def fit(self,
train_ds: Dataset,
Expand Down
16 changes: 15 additions & 1 deletion python/raydp/torch/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.
#

import os
import tempfile
import inspect
from typing import Any, Callable, List, NoReturn, Optional, Union, Dict

Expand All @@ -30,6 +32,7 @@

import ray
from ray import train
from ray.train import Checkpoint
from ray.train.torch import TorchTrainer, TorchCheckpoint
from ray.air.config import ScalingConfig, RunConfig, FailureConfig
from ray.air import session
Expand Down Expand Up @@ -254,7 +257,18 @@ def train_func(config):
else:
# if num_workers = 1, model is not wrapped
states = model.state_dict()
session.report({}, checkpoint=TorchCheckpoint.from_state_dict(states))
with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
checkpoint = None
# In standard DDP training, where the model is the same across all ranks,
# only the global rank 0 worker needs to save and report the checkpoint
if train.get_context().get_world_rank() == 0:
torch.save(
states,
os.path.join(temp_checkpoint_dir, "model.pt"),
)
checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)

session.report({}, checkpoint=checkpoint)

@staticmethod
def train_epoch(dataset, model, criterion, optimizer, metrics, scheduler=None):
Expand Down
14 changes: 11 additions & 3 deletions python/raydp/xgboost/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from raydp.spark import spark_dataframe_to_ray_dataset, get_raydp_master_owner

import ray
from ray.air.config import ScalingConfig, RunConfig, FailureConfig
from ray.air.config import ScalingConfig, RunConfig, FailureConfig, CheckpointConfig
from ray.data.dataset import Dataset
from ray.train.xgboost import XGBoostTrainer, XGBoostCheckpoint

Expand Down Expand Up @@ -58,7 +58,15 @@ def fit(self,
max_retries=3) -> NoReturn:
scaling_config = ScalingConfig(num_workers=self._num_workers,
resources_per_worker=self._resources_per_worker)
run_config = RunConfig(failure_config=FailureConfig(max_failures=max_retries))
run_config = RunConfig(
checkpoint_config=CheckpointConfig(
# Checkpoint every iteration.
checkpoint_frequency=1,
# Only keep the latest checkpoint and delete the others.
num_to_keep=1,
),
failure_config=FailureConfig(max_failures=max_retries)
)
if self._shuffle:
train_ds = train_ds.random_shuffle()
if evaluate_ds:
Expand Down Expand Up @@ -109,4 +117,4 @@ def fit_on_spark(self,
train_ds, evaluate_ds, max_retries)

def get_model(self):
return XGBoostCheckpoint(self._results.checkpoint.to_directory()).get_model()
return XGBoostTrainer.get_model(self._results.checkpoint)

0 comments on commit eff6b64

Please sign in to comment.