Skip to content

ESQL: Add description to status and profile (#121783) #127942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ static TransportVersion def(int id) {
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO_BACKPORT_8_19 = def(8_841_0_27);
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT_8_19 = def(8_841_0_28);
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING_8_19 = def(8_841_0_29);
public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION_8_19 = def(8_841_0_30);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public class Driver implements Releasable, Describable {

private final String sessionId;

/**
* Description of the task this driver is running. This description should be
* short and meaningful as a grouping identifier. We use the phase of the
* query right now: "data", "node_reduce", "final".
*/
private final String taskDescription;

/**
* The wall clock time when this driver was created in milliseconds since epoch.
* Compared to {@link #startNanos} this is less accurate and is measured by a
Expand Down Expand Up @@ -96,6 +103,10 @@ public class Driver implements Releasable, Describable {
/**
* Creates a new driver with a chain of operators.
* @param sessionId session Id
* @param taskDescription Description of the task this driver is running. This
* description should be short and meaningful as a grouping
* identifier. We use the phase of the query right now:
* "data", "node_reduce", "final".
* @param driverContext the driver context
* @param source source operator
* @param intermediateOperators the chain of operators to execute
Expand All @@ -105,6 +116,7 @@ public class Driver implements Releasable, Describable {
*/
public Driver(
String sessionId,
String taskDescription,
long startTime,
long startNanos,
DriverContext driverContext,
Expand All @@ -116,6 +128,7 @@ public Driver(
Releasable releasable
) {
this.sessionId = sessionId;
this.taskDescription = taskDescription;
this.startTime = startTime;
this.startNanos = startNanos;
this.driverContext = driverContext;
Expand All @@ -129,6 +142,7 @@ public Driver(
this.status = new AtomicReference<>(
new DriverStatus(
sessionId,
taskDescription,
startTime,
System.currentTimeMillis(),
0,
Expand All @@ -150,6 +164,7 @@ public Driver(
* @param releasable a {@link Releasable} to invoked once the chain of operators has run to completion
*/
public Driver(
String taskDescription,
DriverContext driverContext,
SourceOperator source,
List<Operator> intermediateOperators,
Expand All @@ -158,6 +173,7 @@ public Driver(
) {
this(
"unset",
taskDescription,
System.currentTimeMillis(),
System.nanoTime(),
driverContext,
Expand Down Expand Up @@ -497,6 +513,7 @@ public DriverProfile profile() {
throw new IllegalStateException("can only get profile from finished driver");
}
return new DriverProfile(
status.taskDescription(),
status.started(),
status.lastUpdated(),
finishNanos - startNanos,
Expand Down Expand Up @@ -543,6 +560,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.

return new DriverStatus(
sessionId,
taskDescription,
startTime,
now,
prev.cpuNanos() + extraCpuNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
* Profile results from a single {@link Driver}.
*/
public class DriverProfile implements Writeable, ChunkedToXContentObject {
/**
* Description of the task this driver is running. This description should be
* short and meaningful as a grouping identifier. We use the phase of the
* query right now: "data", "node_reduce", "final".
*/
private final String taskDescription;

/**
* Millis since epoch when the driver started.
*/
Expand Down Expand Up @@ -61,6 +68,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
private final DriverSleeps sleeps;

public DriverProfile(
String taskDescription,
long startMillis,
long stopMillis,
long tookNanos,
Expand All @@ -69,6 +77,7 @@ public DriverProfile(
List<DriverStatus.OperatorStatus> operators,
DriverSleeps sleeps
) {
this.taskDescription = taskDescription;
this.startMillis = startMillis;
this.stopMillis = stopMillis;
this.tookNanos = tookNanos;
Expand All @@ -79,6 +88,9 @@ public DriverProfile(
}

public DriverProfile(StreamInput in) throws IOException {
this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19)
? in.readString()
: "";
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
this.startMillis = in.readVLong();
this.stopMillis = in.readVLong();
Expand All @@ -101,6 +113,9 @@ public DriverProfile(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19)) {
out.writeString(taskDescription);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
out.writeVLong(startMillis);
out.writeVLong(stopMillis);
Expand All @@ -114,6 +129,13 @@ public void writeTo(StreamOutput out) throws IOException {
sleeps.writeTo(out);
}

/**
* Description of the task this driver is running.
*/
public String taskDescription() {
return taskDescription;
}

/**
* Millis since epoch when the driver started.
*/
Expand Down Expand Up @@ -169,6 +191,7 @@ public DriverSleeps sleeps() {
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return ChunkedToXContent.builder(params).object(ob -> {
ob.append((b, p) -> {
b.field("task_description", taskDescription);
b.timestampFieldsFromUnixEpochMillis("start_millis", "start", startMillis);
b.timestampFieldsFromUnixEpochMillis("stop_millis", "stop", stopMillis);
b.field("took_nanos", tookNanos);
Expand Down Expand Up @@ -196,7 +219,8 @@ public boolean equals(Object o) {
return false;
}
DriverProfile that = (DriverProfile) o;
return startMillis == that.startMillis
return taskDescription.equals(that.taskDescription)
&& startMillis == that.startMillis
&& stopMillis == that.stopMillis
&& tookNanos == that.tookNanos
&& cpuNanos == that.cpuNanos
Expand All @@ -207,7 +231,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps);
return Objects.hash(taskDescription, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public class DriverStatus implements Task.Status {
*/
private final String sessionId;

/**
* Description of the task this driver is running.
*/
private final String taskDescription;

/**
* Milliseconds since epoch when this driver started.
*/
Expand Down Expand Up @@ -83,6 +88,7 @@ public class DriverStatus implements Task.Status {

DriverStatus(
String sessionId,
String taskDescription,
long started,
long lastUpdated,
long cpuTime,
Expand All @@ -93,6 +99,7 @@ public class DriverStatus implements Task.Status {
DriverSleeps sleeps
) {
this.sessionId = sessionId;
this.taskDescription = taskDescription;
this.started = started;
this.lastUpdated = lastUpdated;
this.cpuNanos = cpuTime;
Expand All @@ -105,6 +112,9 @@ public class DriverStatus implements Task.Status {

public DriverStatus(StreamInput in) throws IOException {
this.sessionId = in.readString();
this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19)
? in.readString()
: "";
this.started = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readLong() : 0;
this.lastUpdated = in.readLong();
this.cpuNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0;
Expand All @@ -122,6 +132,9 @@ public DriverStatus(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(sessionId);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19)) {
out.writeString(taskDescription);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
out.writeLong(started);
}
Expand Down Expand Up @@ -150,6 +163,15 @@ public String sessionId() {
return sessionId;
}

/**
* Description of the task this driver is running. This description should be
* short and meaningful as a grouping identifier. We use the phase of the
* query right now: "data", "node_reduce", "final".
*/
public String taskDescription() {
return taskDescription;
}

/**
* When this {@link Driver} was started.
*/
Expand Down Expand Up @@ -211,7 +233,8 @@ public List<OperatorStatus> activeOperators() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("sessionId", sessionId);
builder.field("session_id", sessionId);
builder.field("task_description", taskDescription);
builder.field("started", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(started));
builder.field("last_updated", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(lastUpdated));
builder.field("cpu_nanos", cpuNanos);
Expand Down Expand Up @@ -240,6 +263,7 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
DriverStatus that = (DriverStatus) o;
return sessionId.equals(that.sessionId)
&& taskDescription.equals(that.taskDescription)
&& started == that.started
&& lastUpdated == that.lastUpdated
&& cpuNanos == that.cpuNanos
Expand All @@ -252,7 +276,18 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(sessionId, started, lastUpdated, cpuNanos, iterations, status, completedOperators, activeOperators, sleeps);
return Objects.hash(
sessionId,
taskDescription,
started,
lastUpdated,
cpuNanos,
iterations,
status,
completedOperators,
activeOperators,
sleeps
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testQueryOperator() throws IOException {
}
});
DriverContext driverContext = driverContext();
drivers.add(new Driver(driverContext, factory.get(driverContext), List.of(), docCollector, () -> {}));
drivers.add(new Driver("test", driverContext, factory.get(driverContext), List.of(), docCollector, () -> {}));
}
OperatorTestCase.runDriver(drivers);
Set<Integer> expectedDocIds = searchForDocIds(reader, query);
Expand Down Expand Up @@ -215,6 +215,7 @@ public String toString() {
)
);
Driver driver = new Driver(
"test",
driverContext,
luceneOperatorFactory(reader, new MatchAllDocsQuery(), LuceneOperator.NO_LIMIT).get(driverContext),
operators,
Expand Down Expand Up @@ -248,6 +249,7 @@ public void testLimitOperator() {
DriverContext driverContext = driverContext();
try (
var driver = new Driver(
"test",
driverContext,
new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
List.of((new LimitOperator.Factory(limit)).get(driverContext)),
Expand Down Expand Up @@ -335,6 +337,7 @@ public void testHashLookup() {
var actualPrimeOrds = new ArrayList<>();
try (
var driver = new Driver(
"test",
driverContext,
new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public final void testIgnoresNulls() {

try (
Driver d = new Driver(
"test",
driverContext,
new NullInsertingSourceOperator(new CannedSourceOperator(input.iterator()), blockFactory),
List.of(simple().get(driverContext)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void testRejectsDouble() {
BlockFactory blockFactory = driverContext.blockFactory();
try (
Driver d = new Driver(
"test",
driverContext,
new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))),
List.of(simple().get(driverContext)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public void testRejectsDouble() {
BlockFactory blockFactory = driverContext.blockFactory();
try (
Driver d = new Driver(
"test",
driverContext,
new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))),
List.of(simple().get(driverContext)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void testOverflowSucceeds() {
List<Page> results = new ArrayList<>();
try (
Driver d = new Driver(
"test",
driverContext,
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(Double.MAX_VALUE - 1, 2)),
List.of(simple().get(driverContext)),
Expand All @@ -71,6 +72,7 @@ public void testSummationAccuracy() {
List<Page> results = new ArrayList<>();
try (
Driver d = new Driver(
"test",
driverContext,
new SequenceDoubleBlockSourceOperator(
driverContext.blockFactory(),
Expand Down Expand Up @@ -100,6 +102,7 @@ public void testSummationAccuracy() {
driverContext = driverContext();
try (
Driver d = new Driver(
"test",
driverContext,
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(values)),
List.of(simple().get(driverContext)),
Expand All @@ -122,6 +125,7 @@ public void testSummationAccuracy() {
driverContext = driverContext();
try (
Driver d = new Driver(
"test",
driverContext,
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)),
List.of(simple().get(driverContext)),
Expand All @@ -141,6 +145,7 @@ public void testSummationAccuracy() {
driverContext = driverContext();
try (
Driver d = new Driver(
"test",
driverContext,
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)),
List.of(simple().get(driverContext)),
Expand Down
Loading