Skip to content

Commit

Permalink
cherry-picked c18035e1 (#2517)
Browse files Browse the repository at this point in the history
* Support old-style TensorFlow events (tensorboard)

Fixes: #2466


* format



* test



* don't continue loops



* format



---------

Signed-off-by: Gary Miguel <[email protected]>
Co-authored-by: Gary Miguel <[email protected]>
  • Loading branch information
mahdikhashan and garymm authored Feb 17, 2025
1 parent 2daece4 commit 7dcdde7
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,23 @@
import rfc3339
import tensorflow as tf
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator
from tensorboard.backend.event_processing.tag_types import TENSORS
from tensorboard.backend.event_processing.tag_types import SCALARS, TENSORS

from pkg.metricscollector.v1beta1.common import const


def _should_consider(tag: str, metric_name: str, tfefile: str) -> bool:
tfefile_parent_dir = (
os.path.dirname(metric_name)
if len(metric_name.split("/")) >= 2
else os.path.dirname(tfefile)
)
basedir_name = os.path.dirname(tfefile)
return tag.startswith(metric_name.split("/")[-1]) and basedir_name.endswith(
tfefile_parent_dir
)


class TFEventFileParser:
def __init__(self, metric_names):
self.metric_names = metric_names
Expand All @@ -47,31 +59,36 @@ def find_all_files(directory):

def parse_summary(self, tfefile):
metric_logs = []
event_accumulator = EventAccumulator(tfefile, size_guidance={TENSORS: 0})
event_accumulator = EventAccumulator(
tfefile, size_guidance={SCALARS: 0, TENSORS: 0}
)
event_accumulator.Reload()
for tag in event_accumulator.Tags()[TENSORS]:
tags = event_accumulator.Tags()
for tag in tags[TENSORS]:
for m in self.metric_names:
tfefile_parent_dir = (
os.path.dirname(m)
if len(m.split("/")) >= 2
else os.path.dirname(tfefile)
)
basedir_name = os.path.dirname(tfefile)
if not tag.startswith(m.split("/")[-1]) or not basedir_name.endswith(
tfefile_parent_dir
):
continue

for tensor in event_accumulator.Tensors(tag):
ml = api_pb2.MetricLog(
time_stamp=rfc3339.rfc3339(
datetime.fromtimestamp(tensor.wall_time)
),
metric=api_pb2.Metric(
name=m, value=str(tf.make_ndarray(tensor.tensor_proto))
),
)
metric_logs.append(ml)
if _should_consider(tag, m, tfefile):
for tensor in event_accumulator.Tensors(tag):
ml = api_pb2.MetricLog(
time_stamp=rfc3339.rfc3339(
datetime.fromtimestamp(tensor.wall_time)
),
metric=api_pb2.Metric(
name=m, value=str(tf.make_ndarray(tensor.tensor_proto))
),
)
metric_logs.append(ml)
# support old-style tensorboard metrics too
for tag in tags[SCALARS]:
for m in self.metric_names:
if _should_consider(tag, m, tfefile):
for scalar in event_accumulator.Scalars(tag):
ml = api_pb2.MetricLog(
time_stamp=rfc3339.rfc3339(
datetime.fromtimestamp(scalar.wall_time)
),
metric=api_pb2.Metric(name=m, value=str(scalar.value)),
)
metric_logs.append(ml)

return metric_logs

Expand Down
52 changes: 42 additions & 10 deletions test/unit/v1beta1/metricscollector/test_tfevent_metricscollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,67 @@
# limitations under the License.

import os
import tempfile
import unittest

import tensorboardX
import utils

METRIC_DIR_NAMES = ("train", "test")
METRIC_NAMES = ("accuracy", "loss")
QUALIFIED_METRIC_NAMES = tuple(
f"{dir}/{metric}"
for dir in METRIC_DIR_NAMES
for metric in METRIC_NAMES
)

class TestTFEventMetricsCollector(unittest.TestCase):
def test_parse_file(self):

current_dir = os.path.dirname(os.path.abspath(__file__))
logs_dir = os.path.join(current_dir, "testdata/tfevent-metricscollector/logs")

# Metric format is "{{dirname}}/{{metrics name}}"
metric_names = ["train/accuracy", "train/loss", "test/loss", "test/accuracy"]
metric_logs = utils.get_metric_logs(logs_dir, metric_names)

metric_logs = utils.get_metric_logs(logs_dir, QUALIFIED_METRIC_NAMES)
self.assertEqual(20, len(metric_logs))

for log in metric_logs:
actual = log["metric"]["name"]
self.assertIn(actual, metric_names)
self.assertIn(actual, QUALIFIED_METRIC_NAMES)

train_metric_logs = utils.get_metric_logs(
os.path.join(logs_dir, "train"), METRIC_NAMES)
self.assertEqual(10, len(train_metric_logs))

for log in train_metric_logs:
actual = log["metric"]["name"]
self.assertIn(actual, METRIC_NAMES)

def test_parse_file_with_tensorboardX(self):
logs_dir = tempfile.mkdtemp()
num_iters = 3

# Metric format is "{{metrics name}}"
metric_names = ["accuracy", "loss"]
metrics_file_dir = os.path.join(logs_dir, "train")
metric_logs = utils.get_metric_logs(metrics_file_dir, metric_names)
self.assertEqual(10, len(metric_logs))
for dir_name in METRIC_DIR_NAMES:
with tensorboardX.SummaryWriter(os.path.join(logs_dir, dir_name)) as writer:
for metric_name in METRIC_NAMES:
for iter in range(num_iters):
writer.add_scalar(metric_name, 0.1, iter)


metric_logs = utils.get_metric_logs(logs_dir, QUALIFIED_METRIC_NAMES)
self.assertEqual(num_iters * len(QUALIFIED_METRIC_NAMES), len(metric_logs))

for log in metric_logs:
actual = log["metric"]["name"]
self.assertIn(actual, metric_names)
self.assertIn(actual, QUALIFIED_METRIC_NAMES)

train_metric_logs = utils.get_metric_logs(
os.path.join(logs_dir, "train"), METRIC_NAMES)
self.assertEqual(num_iters * len(METRIC_NAMES), len(train_metric_logs))

for log in train_metric_logs:
actual = log["metric"]["name"]
self.assertIn(actual, METRIC_NAMES)


if __name__ == '__main__':
Expand Down
1 change: 1 addition & 0 deletions test/unit/v1beta1/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
grpcio-testing==1.64.1
pytest==7.2.0
tensorboardX==2.6.2.2
kubeflow-training[huggingface]==1.9.0

0 comments on commit 7dcdde7

Please sign in to comment.