Skip to content

Commit 65854b3

Browse files
New observer for each binding expression
1 parent 792d500 commit 65854b3

File tree

4 files changed

+71
-16
lines changed

4 files changed

+71
-16
lines changed

engine/runtime-instrument-common/src/main/java/org/enso/interpreter/service/ExecutionCallbacks.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.Map;
77
import java.util.UUID;
88
import java.util.function.Consumer;
9+
import org.enso.common.CachePreferences;
910
import org.enso.interpreter.instrument.ExpressionExecutionState;
1011
import org.enso.interpreter.instrument.MethodCallsCache;
1112
import org.enso.interpreter.instrument.OneshotExpression;
@@ -39,7 +40,7 @@ final class ExecutionCallbacks implements IdExecutionService.Callbacks {
3940
private final Consumer<ExpressionValue> onComputedCallback;
4041
private final Consumer<ExpressionCall> functionCallCallback;
4142
private final Consumer<ExecutedVisualization> onExecutedVisualizationCallback;
42-
private final ExecutionProgressObserver progressObserver = new ExecutionProgressObserver();
43+
private ExecutionProgressObserver progressObserver;
4344

4445
/**
4546
* Creates callbacks instance.
@@ -94,22 +95,37 @@ public Object findCachedResult(IdExecutionService.Info info) {
9495
callOnCachedCallback(nodeId, result);
9596
return result;
9697
} else {
97-
callOnNotCachedCallback(nodeId, -1.0);
98-
progressObserver.startComputation(
99-
nodeId,
100-
(progress) -> {
101-
callOnNotCachedCallback(nodeId, progress);
102-
});
98+
if (cache.getPreferences().get(nodeId) == CachePreferences.Kind.BINDING_EXPRESSION) {
99+
refreshObserver(
100+
ExecutionProgressObserver.startComputation(
101+
nodeId,
102+
(progress) -> {
103+
callOnNotCachedCallback(nodeId, progress);
104+
}));
105+
}
103106
}
104107

105108
return null;
106109
}
107110

111+
private void refreshObserver(ExecutionProgressObserver newObserverOrNull) {
112+
var o = progressObserver;
113+
if (o != null) {
114+
o.finishComputation();
115+
}
116+
this.progressObserver = newObserverOrNull;
117+
}
118+
108119
@Override
109120
public void updateCachedResult(IdExecutionService.Info info) {
110121
Object result = info.getResult();
111122
TypeInfo resultType = typeOf(result);
112123
UUID nodeId = info.getId();
124+
125+
if (progressObserver instanceof ExecutionProgressObserver o && nodeId.equals(o.nodeId())) {
126+
refreshObserver(null);
127+
}
128+
113129
TypeInfo cachedType = cache.getType(nodeId);
114130
FunctionCallInfo call = functionCallInfoById(nodeId);
115131
FunctionCallInfo cachedCall = cache.getCall(nodeId);
@@ -187,9 +203,7 @@ private void callOnCachedCallback(UUID nodeId, Object result) {
187203

188204
@CompilerDirectives.TruffleBoundary
189205
private void callOnNotCachedCallback(UUID nodeId, double amount) {
190-
ExpressionValue expressionValue =
191-
new ExpressionValue(nodeId, null, null, null, null, null, null, false);
192-
206+
var expressionValue = ExpressionValue.progress(nodeId, amount);
193207
onCachedCallback.accept(expressionValue);
194208
}
195209

engine/runtime-instrument-common/src/main/java/org/enso/interpreter/service/ExecutionProgressObserver.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,47 @@
99
final class ExecutionProgressObserver implements Consumer<ObservedMessage> {
1010
private static final Logger PROGRESS = LoggerFactory.getLogger("Standard.Base.Logging.Progress");
1111

12-
private AutoCloseable handle = ObservedMessage.observe(PROGRESS, this);
12+
private final UUID nodeId;
13+
private final Thread thread;
14+
private final AutoCloseable handle;
15+
private final Consumer<Double> consumer;
1316

14-
ExecutionProgressObserver() {}
17+
ExecutionProgressObserver(UUID nodeId, Consumer<Double> c) {
18+
this.nodeId = nodeId;
19+
this.handle = ObservedMessage.observe(PROGRESS, this);
20+
this.thread = Thread.currentThread();
21+
this.consumer = c;
22+
// indeterminate computation has just started
23+
c.accept(-1.0);
24+
System.err.println("Observing for " + nodeId);
25+
}
26+
27+
UUID nodeId() {
28+
return nodeId;
29+
}
1530

16-
final void startComputation(UUID nodeId, Consumer<Double> c) {}
31+
static ExecutionProgressObserver startComputation(UUID nodeId, Consumer<Double> c) {
32+
return new ExecutionProgressObserver(nodeId, c);
33+
}
1734

1835
@Override
1936
public void accept(ObservedMessage t) {
20-
System.err.println("seeing " + t.getMessage());
37+
if (Thread.currentThread() == thread) {
38+
System.err.println(" seeing " + t.getMessage() + " for " + nodeId);
39+
}
40+
}
41+
42+
final void finishComputation() {
43+
System.err.println("Finished computing " + nodeId);
44+
try {
45+
handle.close();
46+
} catch (Exception ex) {
47+
throw raise(RuntimeException.class, ex);
48+
}
49+
}
50+
51+
@SuppressWarnings("unchecked")
52+
private static <T extends Exception> T raise(Class<T> aClass, Exception ex) throws T {
53+
throw (T) ex;
2154
}
2255
}

engine/runtime-instrument-common/src/main/java/org/enso/interpreter/service/ExecutionService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -704,6 +704,14 @@ public ExpressionValue(
704704
this.wasCached = wasCached;
705705
}
706706

707+
static ExpressionValue progress(UUID nodeId, double amount) {
708+
return new ExpressionValue(nodeId, amount, null, null, null, null, null, false);
709+
}
710+
711+
public boolean isProgressUpdate() {
712+
return profilingInfo == null;
713+
}
714+
707715
@Override
708716
public String toString() {
709717
String profilingInfo = Arrays.toString(this.profilingInfo);

engine/runtime-instrument-common/src/main/scala/org/enso/interpreter/instrument/job/ProgramExecutionSupport.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,10 +447,10 @@ object ProgramExecutionSupport {
447447
value: ExpressionValue
448448
)(implicit ctx: RuntimeContext): Unit = {
449449
val expressionId = value.getExpressionId
450-
if (value.getValue == null && value.getProfilingInfo() == null) {
450+
if (value.isProgressUpdate()) {
451451
val p = Api.ExpressionUpdate.Payload.Pending(
452452
None,
453-
Some(-1.0)
453+
Some(value.getValue().asInstanceOf[Double])
454454
)
455455
ctx.endpoint.sendToClient(
456456
Api.Response(

0 commit comments

Comments
 (0)