Skip to content

Commit 6d41810

Browse files
committed
Report additional ClearML progress via user properties
1 parent 531934a commit 6d41810

File tree

4 files changed

+52
-75
lines changed

4 files changed

+52
-75
lines changed

machine/jobs/build_clearml_helper.py

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@
44
from datetime import datetime
55
from typing import Callable, Optional, Union, cast
66

7-
import aiohttp
87
from clearml import Task
98
from dynaconf.base import Settings
109

1110
from ..utils.canceled_error import CanceledError
1211
from ..utils.phased_progress_reporter import PhaseProgressStatus
1312
from ..utils.progress_status import ProgressStatus
14-
from .async_scheduler import AsyncScheduler
1513

1614

1715
class ProgressInfo:
@@ -37,7 +35,7 @@ def clearml_check_canceled() -> None:
3735

3836

3937
def get_clearml_progress_caller(
40-
progress_info: ProgressInfo, task: Task, scheduler: AsyncScheduler, logger: logging.Logger
38+
progress_info: ProgressInfo, task: Task, logger: logging.Logger
4139
) -> Callable[[ProgressStatus], None]:
4240
def clearml_progress(progress_status: ProgressStatus) -> None:
4341
percent_completed: Optional[int] = None
@@ -51,11 +49,8 @@ def clearml_progress(progress_status: ProgressStatus) -> None:
5149
progress_info.last_progress_time is None
5250
or (current_time - progress_info.last_progress_time).seconds > 1
5351
):
54-
scheduler.schedule(
55-
update_runtime_properties(
56-
task,
57-
create_runtime_properties(task, percent_completed, message, progress_status),
58-
)
52+
report_clearml_progress(
53+
task=task, percent_completed=percent_completed, message=message, progress_status=progress_status
5954
)
6055
progress_info.last_progress_time = current_time
6156
progress_info.last_percent_completed = percent_completed
@@ -64,6 +59,42 @@ def clearml_progress(progress_status: ProgressStatus) -> None:
6459
return clearml_progress
6560

6661

62+
def report_clearml_progress(
63+
task: Task,
64+
percent_completed: Optional[int] = None,
65+
message: Optional[str] = None,
66+
progress_status: Optional[ProgressStatus] = None,
67+
) -> None:
68+
if percent_completed is not None:
69+
task.set_progress(percent_completed)
70+
props = []
71+
if message is not None:
72+
props.append({"type": str, "name": "message", "description": "Build Message", "value": message})
73+
# Report the step within the phase
74+
if progress_status is not None and isinstance(progress_status, PhaseProgressStatus):
75+
if progress_status.phase_stage is not None:
76+
if progress_status.phase_step is not None:
77+
props.append(
78+
{
79+
"type": int,
80+
"name": f"{progress_status.phase_stage}_step",
81+
"description": "Phase Step",
82+
"value": progress_status.phase_step,
83+
}
84+
)
85+
if progress_status.step_count is not None:
86+
props.append(
87+
{
88+
"type": int,
89+
"name": f"{progress_status.phase_stage}_step_count",
90+
"description": "Maximum Phase Step",
91+
"value": progress_status.step_count,
92+
}
93+
)
94+
if len(props) > 0:
95+
task.set_user_properties(*props)
96+
97+
6798
def get_local_progress_caller(progress_info: ProgressInfo, logger: logging.Logger) -> Callable[[ProgressStatus], None]:
6899

69100
def local_progress(progress_status: ProgressStatus) -> None:
@@ -91,32 +122,3 @@ def update_settings(settings: Settings, args: dict):
91122
raise TypeError(f"Build options could not be parsed: {e}") from e
92123
settings.update({settings.model_type: build_options})
93124
settings.data_dir = os.path.expanduser(cast(str, settings.data_dir))
94-
95-
96-
async def update_runtime_properties(task, runtime_props: dict) -> None:
97-
current_runtime_properties = task.data.runtime or {}
98-
current_runtime_properties.update(runtime_props)
99-
async with aiohttp.ClientSession(
100-
base_url=task.session.host, headers={"Authorization": f"Bearer {task.session.token}"}
101-
) as session:
102-
json = {"task": task.id, "runtime": runtime_props, "force": True}
103-
async with session.post("/tasks.edit", json=json) as response:
104-
response.raise_for_status()
105-
106-
107-
def create_runtime_properties(
108-
task, percent_completed: Optional[int], message: Optional[str], status: Optional[ProgressStatus]
109-
) -> dict:
110-
runtime_props = task.data.runtime.copy() or {}
111-
if percent_completed is not None:
112-
runtime_props["progress"] = str(percent_completed)
113-
if message is not None:
114-
runtime_props["message"] = message
115-
# Report the step within the phase
116-
if status is not None and isinstance(status, PhaseProgressStatus):
117-
if status.phase_stage is not None:
118-
if status.phase_step is not None:
119-
runtime_props[f"{status.phase_stage}_step"] = str(status.phase_step)
120-
if status.step_count is not None:
121-
runtime_props[f"{status.phase_stage}_step_count"] = str(status.step_count)
122-
return runtime_props

machine/jobs/build_nmt_engine.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88

99
from ..utils.canceled_error import CanceledError
1010
from ..utils.progress_status import ProgressStatus
11-
from .async_scheduler import AsyncScheduler
12-
from .build_clearml_helper import create_runtime_properties, update_runtime_properties
11+
from .build_clearml_helper import report_clearml_progress
1312
from .config import SETTINGS
1413
from .nmt_engine_build_job import NmtEngineBuildJob
1514
from .nmt_model_factory import NmtModelFactory
@@ -31,7 +30,6 @@ def run(args: dict) -> None:
3130
task = None
3231
if args["clearml"]:
3332
task = Task.init()
34-
scheduler = AsyncScheduler()
3533

3634
def clearml_check_canceled() -> None:
3735
if task.get_status() == "stopped":
@@ -41,11 +39,8 @@ def clearml_check_canceled() -> None:
4139

4240
def clearml_progress(status: ProgressStatus) -> None:
4341
if status.percent_completed is not None:
44-
scheduler.schedule(
45-
update_runtime_properties(
46-
task,
47-
create_runtime_properties(task, round(status.percent_completed * 100), None, status),
48-
)
42+
report_clearml_progress(
43+
task=task, percent_completed=round(status.percent_completed * 100), progress_status=status
4944
)
5045

5146
progress = clearml_progress

machine/jobs/build_smt_engine.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,11 @@
55
from clearml import Task
66

77
from ..utils.progress_status import ProgressStatus
8-
from .async_scheduler import AsyncScheduler
98
from .build_clearml_helper import (
109
ProgressInfo,
11-
create_runtime_properties,
1210
get_clearml_check_canceled,
1311
get_clearml_progress_caller,
1412
get_local_progress_caller,
15-
update_runtime_properties,
1613
update_settings,
1714
)
1815
from .config import SETTINGS
@@ -34,17 +31,15 @@ def run(args: dict) -> None:
3431
progress: Callable[[ProgressStatus], None]
3532
check_canceled: Optional[Callable[[], None]] = None
3633
task = None
37-
scheduler: Optional[AsyncScheduler] = None
3834
progress_info = ProgressInfo()
3935
if args["clearml"]:
4036
task = Task.init()
41-
scheduler = AsyncScheduler()
4237

4338
check_canceled = get_clearml_check_canceled(progress_info, task)
4439

4540
task.reload()
4641

47-
progress = get_clearml_progress_caller(progress_info, task, scheduler, logger)
42+
progress = get_clearml_progress_caller(progress_info, task, logger)
4843

4944
else:
5045
progress = get_local_progress_caller(ProgressInfo(), logger)
@@ -66,12 +61,10 @@ def run(args: dict) -> None:
6661

6762
smt_engine_build_job = SmtEngineBuildJob(SETTINGS, smt_model_factory, shared_file_service)
6863
train_corpus_size, confidence = smt_engine_build_job.run(progress, check_canceled)
69-
if scheduler is not None and task is not None:
70-
scheduler.schedule(
71-
update_runtime_properties(
72-
task,
73-
create_runtime_properties(task, 100, "Completed", None),
74-
)
64+
if task is not None:
65+
task.set_progress(100)
66+
task.set_user_properties(
67+
{"type": str, "name": "message", "description": "Build Message", "value": "Completed"}
7568
)
7669
task.get_logger().report_single_value(name="train_corpus_size", value=train_corpus_size)
7770
task.get_logger().report_single_value(name="confidence", value=round(confidence, 4))
@@ -83,9 +76,6 @@ def run(args: dict) -> None:
8376
else:
8477
task.mark_failed(status_reason=type(e).__name__, status_message=str(e))
8578
raise e
86-
finally:
87-
if scheduler is not None:
88-
scheduler.stop()
8979

9080

9181
def main() -> None:

machine/jobs/build_word_alignment_model.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,11 @@
55
from clearml import Task
66

77
from ..utils.progress_status import ProgressStatus
8-
from .async_scheduler import AsyncScheduler
98
from .build_clearml_helper import (
109
ProgressInfo,
11-
create_runtime_properties,
1210
get_clearml_check_canceled,
1311
get_clearml_progress_caller,
1412
get_local_progress_caller,
15-
update_runtime_properties,
1613
update_settings,
1714
)
1815
from .config import SETTINGS
@@ -35,17 +32,15 @@ def run(args: dict):
3532
progress: Callable[[ProgressStatus], None]
3633
check_canceled: Optional[Callable[[], None]] = None
3734
task = None
38-
scheduler: Optional[AsyncScheduler] = None
3935
progress_info = ProgressInfo()
4036
if args["clearml"]:
4137
task = Task.init()
42-
scheduler = AsyncScheduler()
4338

4439
check_canceled = get_clearml_check_canceled(progress_info, task)
4540

4641
task.reload()
4742

48-
progress = get_clearml_progress_caller(progress_info, task, scheduler, logger)
43+
progress = get_clearml_progress_caller(progress_info, task, logger)
4944

5045
else:
5146
progress = get_local_progress_caller(ProgressInfo(), logger)
@@ -67,12 +62,10 @@ def run(args: dict):
6762
SETTINGS, word_alignment_model_factory, word_alignment_file_service
6863
)
6964
train_corpus_size = word_alignment_build_job.run(progress, check_canceled)
70-
if scheduler is not None and task is not None:
71-
scheduler.schedule(
72-
update_runtime_properties(
73-
task,
74-
create_runtime_properties(task, 100, "Completed", None),
75-
)
65+
if task is not None:
66+
task.set_progress(100)
67+
task.set_user_properties(
68+
{"type": str, "name": "message", "description": "Build Message", "value": "Completed"}
7669
)
7770
task.get_logger().report_single_value(name="train_corpus_size", value=train_corpus_size)
7871
logger.info("Finished")
@@ -83,9 +76,6 @@ def run(args: dict):
8376
else:
8477
task.mark_failed(status_reason=type(e).__name__, status_message=str(e))
8578
raise e
86-
finally:
87-
if scheduler is not None:
88-
scheduler.stop()
8979
return
9080

9181

0 commit comments

Comments
 (0)