Skip to content

Commit 3e88802

Browse files
authored
ESQL: Add description to status and profile (#121783) (#127942)
This adds a `task_description` field to `profile` output and task `status`. This looks like: ``` ... "profile" : { "drivers" : [ { "task_description" : "final", "start_millis" : 1738768795349, "stop_millis" : 1738768795405, ... "task_description" : "node_reduce", "start_millis" : 1738768795392, "stop_millis" : 1738768795406, ... "task_description" : "data", "start_millis" : 1738768795391, "stop_millis" : 1738768795404, ... ``` Previously you had to look at the signature of the operators in the driver to figure out what the driver is *doing*. You had to know enough about how ESQL works to guess. Now you can look at this description to see what the server *thinks* it is doing. No more manual classification. This will be useful when debugging failures and performance regressions because it is much easier to use `jq` to group on it: ``` | jq '.profile[] | group_by(.task_description)[]' ```
1 parent 768434d commit 3e88802

File tree

49 files changed

+416
-161
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+416
-161
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ static TransportVersion def(int id) {
218218
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO_BACKPORT_8_19 = def(8_841_0_27);
219219
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT_8_19 = def(8_841_0_28);
220220
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING_8_19 = def(8_841_0_29);
221+
public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION_8_19 = def(8_841_0_30);
221222

222223
/*
223224
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public class Driver implements Releasable, Describable {
5252

5353
private final String sessionId;
5454

55+
/**
56+
* Description of the task this driver is running. This description should be
57+
* short and meaningful as a grouping identifier. We use the phase of the
58+
* query right now: "data", "node_reduce", "final".
59+
*/
60+
private final String taskDescription;
61+
5562
/**
5663
* The wall clock time when this driver was created in milliseconds since epoch.
5764
* Compared to {@link #startNanos} this is less accurate and is measured by a
@@ -96,6 +103,10 @@ public class Driver implements Releasable, Describable {
96103
/**
97104
* Creates a new driver with a chain of operators.
98105
* @param sessionId session Id
106+
* @param taskDescription Description of the task this driver is running. This
107+
* description should be short and meaningful as a grouping
108+
* identifier. We use the phase of the query right now:
109+
* "data", "node_reduce", "final".
99110
* @param driverContext the driver context
100111
* @param source source operator
101112
* @param intermediateOperators the chain of operators to execute
@@ -105,6 +116,7 @@ public class Driver implements Releasable, Describable {
105116
*/
106117
public Driver(
107118
String sessionId,
119+
String taskDescription,
108120
long startTime,
109121
long startNanos,
110122
DriverContext driverContext,
@@ -116,6 +128,7 @@ public Driver(
116128
Releasable releasable
117129
) {
118130
this.sessionId = sessionId;
131+
this.taskDescription = taskDescription;
119132
this.startTime = startTime;
120133
this.startNanos = startNanos;
121134
this.driverContext = driverContext;
@@ -129,6 +142,7 @@ public Driver(
129142
this.status = new AtomicReference<>(
130143
new DriverStatus(
131144
sessionId,
145+
taskDescription,
132146
startTime,
133147
System.currentTimeMillis(),
134148
0,
@@ -150,6 +164,7 @@ public Driver(
150164
* @param releasable a {@link Releasable} to invoked once the chain of operators has run to completion
151165
*/
152166
public Driver(
167+
String taskDescription,
153168
DriverContext driverContext,
154169
SourceOperator source,
155170
List<Operator> intermediateOperators,
@@ -158,6 +173,7 @@ public Driver(
158173
) {
159174
this(
160175
"unset",
176+
taskDescription,
161177
System.currentTimeMillis(),
162178
System.nanoTime(),
163179
driverContext,
@@ -497,6 +513,7 @@ public DriverProfile profile() {
497513
throw new IllegalStateException("can only get profile from finished driver");
498514
}
499515
return new DriverProfile(
516+
status.taskDescription(),
500517
status.started(),
501518
status.lastUpdated(),
502519
finishNanos - startNanos,
@@ -543,6 +560,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.
543560

544561
return new DriverStatus(
545562
sessionId,
563+
taskDescription,
546564
startTime,
547565
now,
548566
prev.cpuNanos() + extraCpuNanos,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@
2626
* Profile results from a single {@link Driver}.
2727
*/
2828
public class DriverProfile implements Writeable, ChunkedToXContentObject {
29+
/**
30+
* Description of the task this driver is running. This description should be
31+
* short and meaningful as a grouping identifier. We use the phase of the
32+
* query right now: "data", "node_reduce", "final".
33+
*/
34+
private final String taskDescription;
35+
2936
/**
3037
* Millis since epoch when the driver started.
3138
*/
@@ -61,6 +68,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
6168
private final DriverSleeps sleeps;
6269

6370
public DriverProfile(
71+
String taskDescription,
6472
long startMillis,
6573
long stopMillis,
6674
long tookNanos,
@@ -69,6 +77,7 @@ public DriverProfile(
6977
List<DriverStatus.OperatorStatus> operators,
7078
DriverSleeps sleeps
7179
) {
80+
this.taskDescription = taskDescription;
7281
this.startMillis = startMillis;
7382
this.stopMillis = stopMillis;
7483
this.tookNanos = tookNanos;
@@ -79,6 +88,9 @@ public DriverProfile(
7988
}
8089

8190
public DriverProfile(StreamInput in) throws IOException {
91+
this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19)
92+
? in.readString()
93+
: "";
8294
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
8395
this.startMillis = in.readVLong();
8496
this.stopMillis = in.readVLong();
@@ -101,6 +113,9 @@ public DriverProfile(StreamInput in) throws IOException {
101113

102114
@Override
103115
public void writeTo(StreamOutput out) throws IOException {
116+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19)) {
117+
out.writeString(taskDescription);
118+
}
104119
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
105120
out.writeVLong(startMillis);
106121
out.writeVLong(stopMillis);
@@ -114,6 +129,13 @@ public void writeTo(StreamOutput out) throws IOException {
114129
sleeps.writeTo(out);
115130
}
116131

132+
/**
133+
* Description of the task this driver is running.
134+
*/
135+
public String taskDescription() {
136+
return taskDescription;
137+
}
138+
117139
/**
118140
* Millis since epoch when the driver started.
119141
*/
@@ -169,6 +191,7 @@ public DriverSleeps sleeps() {
169191
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
170192
return ChunkedToXContent.builder(params).object(ob -> {
171193
ob.append((b, p) -> {
194+
b.field("task_description", taskDescription);
172195
b.timestampFieldsFromUnixEpochMillis("start_millis", "start", startMillis);
173196
b.timestampFieldsFromUnixEpochMillis("stop_millis", "stop", stopMillis);
174197
b.field("took_nanos", tookNanos);
@@ -196,7 +219,8 @@ public boolean equals(Object o) {
196219
return false;
197220
}
198221
DriverProfile that = (DriverProfile) o;
199-
return startMillis == that.startMillis
222+
return taskDescription.equals(that.taskDescription)
223+
&& startMillis == that.startMillis
200224
&& stopMillis == that.stopMillis
201225
&& tookNanos == that.tookNanos
202226
&& cpuNanos == that.cpuNanos
@@ -207,7 +231,7 @@ public boolean equals(Object o) {
207231

208232
@Override
209233
public int hashCode() {
210-
return Objects.hash(startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps);
234+
return Objects.hash(taskDescription, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps);
211235
}
212236

213237
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public class DriverStatus implements Task.Status {
4242
*/
4343
private final String sessionId;
4444

45+
/**
46+
* Description of the task this driver is running.
47+
*/
48+
private final String taskDescription;
49+
4550
/**
4651
* Milliseconds since epoch when this driver started.
4752
*/
@@ -83,6 +88,7 @@ public class DriverStatus implements Task.Status {
8388

8489
DriverStatus(
8590
String sessionId,
91+
String taskDescription,
8692
long started,
8793
long lastUpdated,
8894
long cpuTime,
@@ -93,6 +99,7 @@ public class DriverStatus implements Task.Status {
9399
DriverSleeps sleeps
94100
) {
95101
this.sessionId = sessionId;
102+
this.taskDescription = taskDescription;
96103
this.started = started;
97104
this.lastUpdated = lastUpdated;
98105
this.cpuNanos = cpuTime;
@@ -105,6 +112,9 @@ public class DriverStatus implements Task.Status {
105112

106113
public DriverStatus(StreamInput in) throws IOException {
107114
this.sessionId = in.readString();
115+
this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19)
116+
? in.readString()
117+
: "";
108118
this.started = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readLong() : 0;
109119
this.lastUpdated = in.readLong();
110120
this.cpuNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0;
@@ -122,6 +132,9 @@ public DriverStatus(StreamInput in) throws IOException {
122132
@Override
123133
public void writeTo(StreamOutput out) throws IOException {
124134
out.writeString(sessionId);
135+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19)) {
136+
out.writeString(taskDescription);
137+
}
125138
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
126139
out.writeLong(started);
127140
}
@@ -150,6 +163,15 @@ public String sessionId() {
150163
return sessionId;
151164
}
152165

166+
/**
167+
* Description of the task this driver is running. This description should be
168+
* short and meaningful as a grouping identifier. We use the phase of the
169+
* query right now: "data", "node_reduce", "final".
170+
*/
171+
public String taskDescription() {
172+
return taskDescription;
173+
}
174+
153175
/**
154176
* When this {@link Driver} was started.
155177
*/
@@ -211,7 +233,8 @@ public List<OperatorStatus> activeOperators() {
211233
@Override
212234
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
213235
builder.startObject();
214-
builder.field("sessionId", sessionId);
236+
builder.field("session_id", sessionId);
237+
builder.field("task_description", taskDescription);
215238
builder.field("started", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(started));
216239
builder.field("last_updated", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(lastUpdated));
217240
builder.field("cpu_nanos", cpuNanos);
@@ -240,6 +263,7 @@ public boolean equals(Object o) {
240263
if (o == null || getClass() != o.getClass()) return false;
241264
DriverStatus that = (DriverStatus) o;
242265
return sessionId.equals(that.sessionId)
266+
&& taskDescription.equals(that.taskDescription)
243267
&& started == that.started
244268
&& lastUpdated == that.lastUpdated
245269
&& cpuNanos == that.cpuNanos
@@ -252,7 +276,18 @@ public boolean equals(Object o) {
252276

253277
@Override
254278
public int hashCode() {
255-
return Objects.hash(sessionId, started, lastUpdated, cpuNanos, iterations, status, completedOperators, activeOperators, sleeps);
279+
return Objects.hash(
280+
sessionId,
281+
taskDescription,
282+
started,
283+
lastUpdated,
284+
cpuNanos,
285+
iterations,
286+
status,
287+
completedOperators,
288+
activeOperators,
289+
sleeps
290+
);
256291
}
257292

258293
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void testQueryOperator() throws IOException {
123123
}
124124
});
125125
DriverContext driverContext = driverContext();
126-
drivers.add(new Driver(driverContext, factory.get(driverContext), List.of(), docCollector, () -> {}));
126+
drivers.add(new Driver("test", driverContext, factory.get(driverContext), List.of(), docCollector, () -> {}));
127127
}
128128
OperatorTestCase.runDriver(drivers);
129129
Set<Integer> expectedDocIds = searchForDocIds(reader, query);
@@ -215,6 +215,7 @@ public String toString() {
215215
)
216216
);
217217
Driver driver = new Driver(
218+
"test",
218219
driverContext,
219220
luceneOperatorFactory(reader, new MatchAllDocsQuery(), LuceneOperator.NO_LIMIT).get(driverContext),
220221
operators,
@@ -248,6 +249,7 @@ public void testLimitOperator() {
248249
DriverContext driverContext = driverContext();
249250
try (
250251
var driver = new Driver(
252+
"test",
251253
driverContext,
252254
new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
253255
List.of((new LimitOperator.Factory(limit)).get(driverContext)),
@@ -335,6 +337,7 @@ public void testHashLookup() {
335337
var actualPrimeOrds = new ArrayList<>();
336338
try (
337339
var driver = new Driver(
340+
"test",
338341
driverContext,
339342
new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
340343
List.of(

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public final void testIgnoresNulls() {
111111

112112
try (
113113
Driver d = new Driver(
114+
"test",
114115
driverContext,
115116
new NullInsertingSourceOperator(new CannedSourceOperator(input.iterator()), blockFactory),
116117
List.of(simple().get(driverContext)),

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public void testRejectsDouble() {
6565
BlockFactory blockFactory = driverContext.blockFactory();
6666
try (
6767
Driver d = new Driver(
68+
"test",
6869
driverContext,
6970
new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))),
7071
List.of(simple().get(driverContext)),

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public void testRejectsDouble() {
6666
BlockFactory blockFactory = driverContext.blockFactory();
6767
try (
6868
Driver d = new Driver(
69+
"test",
6970
driverContext,
7071
new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))),
7172
List.of(simple().get(driverContext)),

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public void testOverflowSucceeds() {
5353
List<Page> results = new ArrayList<>();
5454
try (
5555
Driver d = new Driver(
56+
"test",
5657
driverContext,
5758
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(Double.MAX_VALUE - 1, 2)),
5859
List.of(simple().get(driverContext)),
@@ -71,6 +72,7 @@ public void testSummationAccuracy() {
7172
List<Page> results = new ArrayList<>();
7273
try (
7374
Driver d = new Driver(
75+
"test",
7476
driverContext,
7577
new SequenceDoubleBlockSourceOperator(
7678
driverContext.blockFactory(),
@@ -100,6 +102,7 @@ public void testSummationAccuracy() {
100102
driverContext = driverContext();
101103
try (
102104
Driver d = new Driver(
105+
"test",
103106
driverContext,
104107
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(values)),
105108
List.of(simple().get(driverContext)),
@@ -122,6 +125,7 @@ public void testSummationAccuracy() {
122125
driverContext = driverContext();
123126
try (
124127
Driver d = new Driver(
128+
"test",
125129
driverContext,
126130
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)),
127131
List.of(simple().get(driverContext)),
@@ -141,6 +145,7 @@ public void testSummationAccuracy() {
141145
driverContext = driverContext();
142146
try (
143147
Driver d = new Driver(
148+
"test",
144149
driverContext,
145150
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)),
146151
List.of(simple().get(driverContext)),

0 commit comments

Comments
 (0)