@@ -84,9 +84,14 @@ private static class TaskContext
84
84
{
85
85
private static class TaskOperation
86
86
{
87
- public String currentOperationDesc = "" ;
87
+ public String operationDesc = "" ;
88
88
public long operationStartNS = 0 ;
89
+ public long operationEndNS = 0 ;
89
90
91
+ public boolean isActive = true ;
92
+ public boolean success = false ;
93
+
94
+ private List <TaskOperation > childOperations = new ArrayList <TaskOperation >();
90
95
91
96
public List <String > errorMessages = new ArrayList <String >();
92
97
public List <String > warnMessages = new ArrayList <String >();
@@ -99,29 +104,61 @@ private static class TaskOperation
99
104
100
105
public Span operationSpan = null ;
101
106
102
- public JSONObject end ( boolean success )
107
+ public void addChildOperation ( TaskOperation op )
103
108
{
104
- if ( success )
109
+ synchronized ( childOperations )
105
110
{
106
- operationSpan . setStatus ( StatusCode . OK );
111
+ childOperations . add ( op );
107
112
}
108
- else
113
+ }
114
+
115
+ public JSONObject end (boolean _success )
116
+ {
117
+ if (isActive )
109
118
{
110
- operationSpan .setStatus (StatusCode .ERROR );
111
- }
119
+ success = _success ;
120
+ if (operationSpan != null )
121
+ {
122
+ if (success )
123
+ {
124
+ operationSpan .setStatus (StatusCode .OK );
125
+ }
126
+ else
127
+ {
128
+ operationSpan .setStatus (StatusCode .ERROR );
129
+ }
112
130
113
- operationSpan .end ();
131
+ operationSpan .end ();
132
+ }
114
133
115
- long totalOperationTime = System .nanoTime ();
116
- totalOperationTime -= operationStartNS ;
134
+ operationEndNS = System .nanoTime ();
117
135
136
+ isActive = false ;
137
+ }
138
+
139
+ long totalOperationTime = operationEndNS - operationStartNS ;
118
140
double timeInSeconds = (double ) totalOperationTime / 1_000_000_000.0 ;
119
141
120
142
JSONObject results = new JSONObject ();
121
143
122
- results .put ("operation" , currentOperationDesc );
144
+ results .put ("operation" , operationDesc );
123
145
results .put ("successful" , success );
124
146
147
+ JSONArray childResults = new JSONArray ();
148
+ synchronized (childOperations )
149
+ {
150
+ for (TaskOperation childOp : childOperations )
151
+ {
152
+ if (childOp .isActive )
153
+ {
154
+ warnMessages .add ("Child operation: " + childOp .operationDesc + " did not complete." );
155
+ }
156
+
157
+ childResults .put (childOp .end (success ));
158
+ }
159
+ }
160
+ results .put ("childOperations" , childResults );
161
+
125
162
JSONArray errors = new JSONArray ();
126
163
for (String err : errorMessages )
127
164
{
@@ -289,7 +326,7 @@ private void setCurrentOperation(TaskOperation op)
289
326
public void startOperation (String operationName )
290
327
{
291
328
TaskOperation op = new TaskOperation ();
292
- op .currentOperationDesc = operationName ;
329
+ op .operationDesc = operationName ;
293
330
op .operationStartNS = System .nanoTime ();
294
331
295
332
Span parentSpan = null ;
@@ -303,6 +340,23 @@ public void startOperation(String operationName)
303
340
setCurrentOperation (op );
304
341
}
305
342
343
+ public TaskOperation startChildOperation (String operationName )
344
+ {
345
+ if (!hasCurrentOperation ())
346
+ {
347
+ return null ;
348
+ }
349
+
350
+ TaskOperation parentOp = getCurrentOperation ();
351
+
352
+ TaskOperation childOp = new TaskOperation ();
353
+ childOp .operationDesc = operationName ;
354
+ childOp .operationStartNS = System .nanoTime ();
355
+
356
+ parentOp .addChildOperation (childOp );
357
+ return childOp ;
358
+ }
359
+
306
360
public void endOperation ()
307
361
{
308
362
endOperation (true );
@@ -954,6 +1008,8 @@ public void run()
954
1008
{
955
1009
try
956
1010
{
1011
+ TaskContext .TaskOperation fileReadOperation = context .startChildOperation ("File Part: " + filePart .getThisPart ());
1012
+
957
1013
HpccRemoteFileReader .FileReadContext readContext = new HpccRemoteFileReader .FileReadContext ();
958
1014
readContext .parentSpan = context .getCurrentOperation ().operationSpan ;
959
1015
readContext .originalRD = recordDef ;
@@ -971,10 +1027,16 @@ public void run()
971
1027
HPCCRecord record = fileReader .next ();
972
1028
recCount ++;
973
1029
}
1030
+
1031
+ fileReadOperation .recordsRead .addAndGet (recCount );
974
1032
context .getCurrentOperation ().recordsRead .addAndGet (recCount );
975
1033
976
1034
fileReader .close ();
1035
+
1036
+ fileReadOperation .bytesRead .addAndGet (fileReader .getStreamPosition ());
977
1037
context .getCurrentOperation ().bytesRead .addAndGet (fileReader .getStreamPosition ());
1038
+
1039
+ fileReadOperation .end (true );
978
1040
}
979
1041
catch (Exception e )
980
1042
{
0 commit comments