1
1
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
2
2
# vi: set ft=python sts=4 ts=4 sw=4 et:
3
- """Tests for workflow callbacks
4
- """
3
+ """Tests for workflow callbacks."""
4
+ from pathlib import Path
5
5
from time import sleep
6
+ import json
6
7
import pytest
7
8
import nipype .interfaces .utility as niu
8
9
import nipype .pipeline .engine as pe
@@ -71,22 +72,22 @@ def test_callback_exception(tmpdir, plugin, stop_on_first_crash):
71
72
72
73
@pytest .mark .parametrize ("plugin" , ["Linear" , "MultiProc" , "LegacyMultiProc" ])
73
74
@pytest .mark .skipif (not has_pandas , reason = "Test requires pandas" )
74
- def test_callback_gantt (tmpdir , plugin ) :
75
+ def test_callback_gantt (tmp_path : Path , plugin : str ) -> None :
75
76
import logging
76
77
77
78
from os import path
78
79
79
80
from nipype .utils .profiler import log_nodes_cb
80
81
from nipype .utils .draw_gantt_chart import generate_gantt_chart
81
82
82
- log_filename = path . join ( tmpdir , "callback.log" )
83
+ log_filename = tmp_path / "callback.log"
83
84
logger = logging .getLogger ("callback" )
84
85
logger .setLevel (logging .DEBUG )
85
86
handler = logging .FileHandler (log_filename )
86
87
logger .addHandler (handler )
87
88
88
89
# create workflow
89
- wf = pe .Workflow (name = "test" , base_dir = tmpdir . strpath )
90
+ wf = pe .Workflow (name = "test" , base_dir = str ( tmp_path ) )
90
91
f_node = pe .Node (
91
92
niu .Function (function = func , input_names = [], output_names = []), name = "f_node"
92
93
)
@@ -98,7 +99,21 @@ def test_callback_gantt(tmpdir, plugin):
98
99
plugin_args ["n_procs" ] = 8
99
100
wf .run (plugin = plugin , plugin_args = plugin_args )
100
101
101
- generate_gantt_chart (
102
- path .join (tmpdir , "callback.log" ), 1 if plugin == "Linear" else 8
103
- )
104
- assert path .exists (path .join (tmpdir , "callback.log.html" ))
102
+ with open (log_filename , "r" ) as _f :
103
+ loglines = _f .readlines ()
104
+
105
+ # test missing duration
106
+ first_line = json .loads (loglines [0 ])
107
+ if "duration" in first_line :
108
+ del first_line ["duration" ]
109
+ loglines [0 ] = json .dumps (first_line )
110
+
111
+ # test duplicate timestamp warning
112
+ loglines .append (loglines [- 1 ])
113
+
114
+ with open (log_filename , "w" ) as _f :
115
+ _f .write ("" .join (loglines ))
116
+
117
+ with pytest .warns (Warning ):
118
+ generate_gantt_chart (str (log_filename ), 1 if plugin == "Linear" else 8 )
119
+ assert (tmp_path / "callback.log.html" ).exists ()
0 commit comments