diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 93afd2616..f8e418ac7 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -84,9 +84,14 @@ private static class TaskContext { private static class TaskOperation { - public String currentOperationDesc = ""; + public String operationDesc = ""; public long operationStartNS = 0; + public long operationEndNS = 0; + public boolean isActive = true; + public boolean success = false; + + private List childOperations = new ArrayList(); public List errorMessages = new ArrayList(); public List warnMessages = new ArrayList(); @@ -99,29 +104,61 @@ private static class TaskOperation public Span operationSpan = null; - public JSONObject end(boolean success) + public void addChildOperation(TaskOperation op) { - if (success) + synchronized(childOperations) { - operationSpan.setStatus(StatusCode.OK); + childOperations.add(op); } - else + } + + public JSONObject end(boolean _success) + { + if (isActive) { - operationSpan.setStatus(StatusCode.ERROR); - } + success = _success; + if (operationSpan != null) + { + if (success) + { + operationSpan.setStatus(StatusCode.OK); + } + else + { + operationSpan.setStatus(StatusCode.ERROR); + } - operationSpan.end(); + operationSpan.end(); + } - long totalOperationTime = System.nanoTime(); - totalOperationTime -= operationStartNS; + operationEndNS = System.nanoTime(); + isActive = false; + } + + long totalOperationTime = operationEndNS - operationStartNS; double timeInSeconds = (double) totalOperationTime / 1_000_000_000.0; JSONObject results = new JSONObject(); - results.put("operation", currentOperationDesc); + results.put("operation", operationDesc); results.put("successful", success); + JSONArray childResults = new JSONArray(); + synchronized(childOperations) + { + for (TaskOperation childOp : childOperations) + { + if (childOp.isActive) + { + warnMessages.add("Child operation: " + childOp.operationDesc + " did not complete."); + } + + childResults.put(childOp.end(success)); + } + } + results.put("childOperations", childResults); + JSONArray errors = new JSONArray(); for (String err : errorMessages) { @@ -289,7 +326,7 @@ private void setCurrentOperation(TaskOperation op) public void startOperation(String operationName) { TaskOperation op = new TaskOperation(); - op.currentOperationDesc = operationName; + op.operationDesc = operationName; op.operationStartNS = System.nanoTime(); Span parentSpan = null; @@ -303,6 +340,23 @@ public void startOperation(String operationName) setCurrentOperation(op); } + public TaskOperation startChildOperation(String operationName) + { + if (!hasCurrentOperation()) + { + return null; + } + + TaskOperation parentOp = getCurrentOperation(); + + TaskOperation childOp = new TaskOperation(); + childOp.operationDesc = operationName; + childOp.operationStartNS = System.nanoTime(); + + parentOp.addChildOperation(childOp); + return childOp; + } + public void endOperation() { endOperation(true); @@ -954,6 +1008,8 @@ public void run() { try { + TaskContext.TaskOperation fileReadOperation = context.startChildOperation("File Part: " + filePart.getThisPart()); + HpccRemoteFileReader.FileReadContext readContext = new HpccRemoteFileReader.FileReadContext(); readContext.parentSpan = context.getCurrentOperation().operationSpan; readContext.originalRD = recordDef; @@ -971,10 +1027,16 @@ public void run() HPCCRecord record = fileReader.next(); recCount++; } + + fileReadOperation.recordsRead.addAndGet(recCount); context.getCurrentOperation().recordsRead.addAndGet(recCount); fileReader.close(); + + fileReadOperation.bytesRead.addAndGet(fileReader.getStreamPosition()); context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); + + fileReadOperation.end(true); } catch (Exception e) {