From a545e8668a96cbdf93b529ae98732df610227872 Mon Sep 17 00:00:00 2001 From: Gary Miguel Date: Fri, 14 Feb 2025 16:59:37 -0800 Subject: [PATCH] Support old-style TensorFlow events (tensorboard) (#2467) * Support old-style TensorFlow events (tensorboard) Fixes: https://github.com/kubeflow/katib/issues/2466 Signed-off-by: Gary Miguel * format Signed-off-by: Gary Miguel * test Signed-off-by: Gary Miguel * don't continue loops Signed-off-by: Gary Miguel * format Signed-off-by: Gary Miguel --------- Signed-off-by: Gary Miguel --- .../tfevent_loader.py | 65 ++++++++++++------- .../test_tfevent_metricscollector.py | 52 ++++++++++++--- test/unit/v1beta1/requirements.txt | 1 + 3 files changed, 84 insertions(+), 34 deletions(-) diff --git a/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py b/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py index f41597f9237..d50bbe19b7d 100644 --- a/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py +++ b/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py @@ -30,11 +30,23 @@ import rfc3339 import tensorflow as tf from tensorboard.backend.event_processing.event_accumulator import EventAccumulator -from tensorboard.backend.event_processing.tag_types import TENSORS +from tensorboard.backend.event_processing.tag_types import SCALARS, TENSORS from pkg.metricscollector.v1beta1.common import const +def _should_consider(tag: str, metric_name: str, tfefile: str) -> bool: + tfefile_parent_dir = ( + os.path.dirname(metric_name) + if len(metric_name.split("/")) >= 2 + else os.path.dirname(tfefile) + ) + basedir_name = os.path.dirname(tfefile) + return tag.startswith(metric_name.split("/")[-1]) and basedir_name.endswith( + tfefile_parent_dir + ) + + class TFEventFileParser: def __init__(self, metric_names): self.metric_names = metric_names @@ -47,31 +59,36 @@ def find_all_files(directory): def parse_summary(self, tfefile): metric_logs = [] - event_accumulator = EventAccumulator(tfefile, size_guidance={TENSORS: 0}) + event_accumulator = EventAccumulator( + tfefile, size_guidance={SCALARS: 0, TENSORS: 0} + ) event_accumulator.Reload() - for tag in event_accumulator.Tags()[TENSORS]: + tags = event_accumulator.Tags() + for tag in tags[TENSORS]: for m in self.metric_names: - tfefile_parent_dir = ( - os.path.dirname(m) - if len(m.split("/")) >= 2 - else os.path.dirname(tfefile) - ) - basedir_name = os.path.dirname(tfefile) - if not tag.startswith(m.split("/")[-1]) or not basedir_name.endswith( - tfefile_parent_dir - ): - continue - - for tensor in event_accumulator.Tensors(tag): - ml = api_pb2.MetricLog( - time_stamp=rfc3339.rfc3339( - datetime.fromtimestamp(tensor.wall_time) - ), - metric=api_pb2.Metric( - name=m, value=str(tf.make_ndarray(tensor.tensor_proto)) - ), - ) - metric_logs.append(ml) + if _should_consider(tag, m, tfefile): + for tensor in event_accumulator.Tensors(tag): + ml = api_pb2.MetricLog( + time_stamp=rfc3339.rfc3339( + datetime.fromtimestamp(tensor.wall_time) + ), + metric=api_pb2.Metric( + name=m, value=str(tf.make_ndarray(tensor.tensor_proto)) + ), + ) + metric_logs.append(ml) + # support old-style tensorboard metrics too + for tag in tags[SCALARS]: + for m in self.metric_names: + if _should_consider(tag, m, tfefile): + for scalar in event_accumulator.Scalars(tag): + ml = api_pb2.MetricLog( + time_stamp=rfc3339.rfc3339( + datetime.fromtimestamp(scalar.wall_time) + ), + metric=api_pb2.Metric(name=m, value=str(scalar.value)), + ) + metric_logs.append(ml) return metric_logs diff --git a/test/unit/v1beta1/metricscollector/test_tfevent_metricscollector.py b/test/unit/v1beta1/metricscollector/test_tfevent_metricscollector.py index 305954e9081..8fd8ae0d1cc 100644 --- a/test/unit/v1beta1/metricscollector/test_tfevent_metricscollector.py +++ b/test/unit/v1beta1/metricscollector/test_tfevent_metricscollector.py @@ -13,10 +13,19 @@ # limitations under the License. import os +import tempfile import unittest +import tensorboardX import utils +METRIC_DIR_NAMES = ("train", "test") +METRIC_NAMES = ("accuracy", "loss") +QUALIFIED_METRIC_NAMES = tuple( + f"{dir}/{metric}" + for dir in METRIC_DIR_NAMES + for metric in METRIC_NAMES +) class TestTFEventMetricsCollector(unittest.TestCase): def test_parse_file(self): @@ -24,24 +33,47 @@ def test_parse_file(self): current_dir = os.path.dirname(os.path.abspath(__file__)) logs_dir = os.path.join(current_dir, "testdata/tfevent-metricscollector/logs") - # Metric format is "{{dirname}}/{{metrics name}}" - metric_names = ["train/accuracy", "train/loss", "test/loss", "test/accuracy"] - metric_logs = utils.get_metric_logs(logs_dir, metric_names) + + metric_logs = utils.get_metric_logs(logs_dir, QUALIFIED_METRIC_NAMES) self.assertEqual(20, len(metric_logs)) for log in metric_logs: actual = log["metric"]["name"] - self.assertIn(actual, metric_names) + self.assertIn(actual, QUALIFIED_METRIC_NAMES) + + train_metric_logs = utils.get_metric_logs( + os.path.join(logs_dir, "train"), METRIC_NAMES) + self.assertEqual(10, len(train_metric_logs)) + + for log in train_metric_logs: + actual = log["metric"]["name"] + self.assertIn(actual, METRIC_NAMES) + + def test_parse_file_with_tensorboardX(self): + logs_dir = tempfile.mkdtemp() + num_iters = 3 - # Metric format is "{{metrics name}}" - metric_names = ["accuracy", "loss"] - metrics_file_dir = os.path.join(logs_dir, "train") - metric_logs = utils.get_metric_logs(metrics_file_dir, metric_names) - self.assertEqual(10, len(metric_logs)) + for dir_name in METRIC_DIR_NAMES: + with tensorboardX.SummaryWriter(os.path.join(logs_dir, dir_name)) as writer: + for metric_name in METRIC_NAMES: + for iter in range(num_iters): + writer.add_scalar(metric_name, 0.1, iter) + + + metric_logs = utils.get_metric_logs(logs_dir, QUALIFIED_METRIC_NAMES) + self.assertEqual(num_iters * len(QUALIFIED_METRIC_NAMES), len(metric_logs)) for log in metric_logs: actual = log["metric"]["name"] - self.assertIn(actual, metric_names) + self.assertIn(actual, QUALIFIED_METRIC_NAMES) + + train_metric_logs = utils.get_metric_logs( + os.path.join(logs_dir, "train"), METRIC_NAMES) + self.assertEqual(num_iters * len(METRIC_NAMES), len(train_metric_logs)) + + for log in train_metric_logs: + actual = log["metric"]["name"] + self.assertIn(actual, METRIC_NAMES) if __name__ == '__main__': diff --git a/test/unit/v1beta1/requirements.txt b/test/unit/v1beta1/requirements.txt index 440f28297e1..94fde5638e8 100644 --- a/test/unit/v1beta1/requirements.txt +++ b/test/unit/v1beta1/requirements.txt @@ -1,3 +1,4 @@ grpcio-testing==1.64.1 pytest==7.2.0 +tensorboardX==2.6.2.2 kubeflow-training[huggingface]==1.9.0