diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 38ba4b422813a..c32f69d627874 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -218,6 +218,7 @@ static TransportVersion def(int id) { public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO_BACKPORT_8_19 = def(8_841_0_27); public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT_8_19 = def(8_841_0_28); public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING_8_19 = def(8_841_0_29); + public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION_8_19 = def(8_841_0_30); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index 220f340fa0728..142d6d4386478 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -52,6 +52,13 @@ public class Driver implements Releasable, Describable { private final String sessionId; + /** + * Description of the task this driver is running. This description should be + * short and meaningful as a grouping identifier. We use the phase of the + * query right now: "data", "node_reduce", "final". + */ + private final String taskDescription; + /** * The wall clock time when this driver was created in milliseconds since epoch. * Compared to {@link #startNanos} this is less accurate and is measured by a @@ -96,6 +103,10 @@ public class Driver implements Releasable, Describable { /** * Creates a new driver with a chain of operators. * @param sessionId session Id + * @param taskDescription Description of the task this driver is running. This + * description should be short and meaningful as a grouping + * identifier. We use the phase of the query right now: + * "data", "node_reduce", "final". * @param driverContext the driver context * @param source source operator * @param intermediateOperators the chain of operators to execute @@ -105,6 +116,7 @@ public class Driver implements Releasable, Describable { */ public Driver( String sessionId, + String taskDescription, long startTime, long startNanos, DriverContext driverContext, @@ -116,6 +128,7 @@ public Driver( Releasable releasable ) { this.sessionId = sessionId; + this.taskDescription = taskDescription; this.startTime = startTime; this.startNanos = startNanos; this.driverContext = driverContext; @@ -129,6 +142,7 @@ public Driver( this.status = new AtomicReference<>( new DriverStatus( sessionId, + taskDescription, startTime, System.currentTimeMillis(), 0, @@ -150,6 +164,7 @@ public Driver( * @param releasable a {@link Releasable} to invoked once the chain of operators has run to completion */ public Driver( + String taskDescription, DriverContext driverContext, SourceOperator source, List intermediateOperators, @@ -158,6 +173,7 @@ public Driver( ) { this( "unset", + taskDescription, System.currentTimeMillis(), System.nanoTime(), driverContext, @@ -497,6 +513,7 @@ public DriverProfile profile() { throw new IllegalStateException("can only get profile from finished driver"); } return new DriverProfile( + status.taskDescription(), status.started(), status.lastUpdated(), finishNanos - startNanos, @@ -543,6 +560,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus. return new DriverStatus( sessionId, + taskDescription, startTime, now, prev.cpuNanos() + extraCpuNanos, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java index c071b5055df76..6ce691aa1369d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java @@ -26,6 +26,13 @@ * Profile results from a single {@link Driver}. */ public class DriverProfile implements Writeable, ChunkedToXContentObject { + /** + * Description of the task this driver is running. This description should be + * short and meaningful as a grouping identifier. We use the phase of the + * query right now: "data", "node_reduce", "final". + */ + private final String taskDescription; + /** * Millis since epoch when the driver started. */ @@ -61,6 +68,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject { private final DriverSleeps sleeps; public DriverProfile( + String taskDescription, long startMillis, long stopMillis, long tookNanos, @@ -69,6 +77,7 @@ public DriverProfile( List operators, DriverSleeps sleeps ) { + this.taskDescription = taskDescription; this.startMillis = startMillis; this.stopMillis = stopMillis; this.tookNanos = tookNanos; @@ -79,6 +88,9 @@ public DriverProfile( } public DriverProfile(StreamInput in) throws IOException { + this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19) + ? in.readString() + : ""; if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { this.startMillis = in.readVLong(); this.stopMillis = in.readVLong(); @@ -101,6 +113,9 @@ public DriverProfile(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19)) { + out.writeString(taskDescription); + } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { out.writeVLong(startMillis); out.writeVLong(stopMillis); @@ -114,6 +129,13 @@ public void writeTo(StreamOutput out) throws IOException { sleeps.writeTo(out); } + /** + * Description of the task this driver is running. + */ + public String taskDescription() { + return taskDescription; + } + /** * Millis since epoch when the driver started. */ @@ -169,6 +191,7 @@ public DriverSleeps sleeps() { public Iterator toXContentChunked(ToXContent.Params params) { return ChunkedToXContent.builder(params).object(ob -> { ob.append((b, p) -> { + b.field("task_description", taskDescription); b.timestampFieldsFromUnixEpochMillis("start_millis", "start", startMillis); b.timestampFieldsFromUnixEpochMillis("stop_millis", "stop", stopMillis); b.field("took_nanos", tookNanos); @@ -196,7 +219,8 @@ public boolean equals(Object o) { return false; } DriverProfile that = (DriverProfile) o; - return startMillis == that.startMillis + return taskDescription.equals(that.taskDescription) + && startMillis == that.startMillis && stopMillis == that.stopMillis && tookNanos == that.tookNanos && cpuNanos == that.cpuNanos @@ -207,7 +231,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps); + return Objects.hash(taskDescription, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java index 42e3908231206..22f32a1bef403 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java @@ -42,6 +42,11 @@ public class DriverStatus implements Task.Status { */ private final String sessionId; + /** + * Description of the task this driver is running. + */ + private final String taskDescription; + /** * Milliseconds since epoch when this driver started. */ @@ -83,6 +88,7 @@ public class DriverStatus implements Task.Status { DriverStatus( String sessionId, + String taskDescription, long started, long lastUpdated, long cpuTime, @@ -93,6 +99,7 @@ public class DriverStatus implements Task.Status { DriverSleeps sleeps ) { this.sessionId = sessionId; + this.taskDescription = taskDescription; this.started = started; this.lastUpdated = lastUpdated; this.cpuNanos = cpuTime; @@ -105,6 +112,9 @@ public class DriverStatus implements Task.Status { public DriverStatus(StreamInput in) throws IOException { this.sessionId = in.readString(); + this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19) + ? in.readString() + : ""; this.started = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readLong() : 0; this.lastUpdated = in.readLong(); this.cpuNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0; @@ -122,6 +132,9 @@ public DriverStatus(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(sessionId); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_8_19)) { + out.writeString(taskDescription); + } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) { out.writeLong(started); } @@ -150,6 +163,15 @@ public String sessionId() { return sessionId; } + /** + * Description of the task this driver is running. This description should be + * short and meaningful as a grouping identifier. We use the phase of the + * query right now: "data", "node_reduce", "final". + */ + public String taskDescription() { + return taskDescription; + } + /** * When this {@link Driver} was started. */ @@ -211,7 +233,8 @@ public List activeOperators() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("sessionId", sessionId); + builder.field("session_id", sessionId); + builder.field("task_description", taskDescription); builder.field("started", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(started)); builder.field("last_updated", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(lastUpdated)); builder.field("cpu_nanos", cpuNanos); @@ -240,6 +263,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; DriverStatus that = (DriverStatus) o; return sessionId.equals(that.sessionId) + && taskDescription.equals(that.taskDescription) && started == that.started && lastUpdated == that.lastUpdated && cpuNanos == that.cpuNanos @@ -252,7 +276,18 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(sessionId, started, lastUpdated, cpuNanos, iterations, status, completedOperators, activeOperators, sleeps); + return Objects.hash( + sessionId, + taskDescription, + started, + lastUpdated, + cpuNanos, + iterations, + status, + completedOperators, + activeOperators, + sleeps + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 9bd5d184eb477..b304c59c28108 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -123,7 +123,7 @@ public void testQueryOperator() throws IOException { } }); DriverContext driverContext = driverContext(); - drivers.add(new Driver(driverContext, factory.get(driverContext), List.of(), docCollector, () -> {})); + drivers.add(new Driver("test", driverContext, factory.get(driverContext), List.of(), docCollector, () -> {})); } OperatorTestCase.runDriver(drivers); Set expectedDocIds = searchForDocIds(reader, query); @@ -215,6 +215,7 @@ public String toString() { ) ); Driver driver = new Driver( + "test", driverContext, luceneOperatorFactory(reader, new MatchAllDocsQuery(), LuceneOperator.NO_LIMIT).get(driverContext), operators, @@ -248,6 +249,7 @@ public void testLimitOperator() { DriverContext driverContext = driverContext(); try ( var driver = new Driver( + "test", driverContext, new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100), List.of((new LimitOperator.Factory(limit)).get(driverContext)), @@ -335,6 +337,7 @@ public void testHashLookup() { var actualPrimeOrds = new ArrayList<>(); try ( var driver = new Driver( + "test", driverContext, new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java index d9243abe4794f..abac7a4cd47e3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java @@ -111,6 +111,7 @@ public final void testIgnoresNulls() { try ( Driver d = new Driver( + "test", driverContext, new NullInsertingSourceOperator(new CannedSourceOperator(input.iterator()), blockFactory), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java index 126d75f3aa46b..8657caafef409 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java @@ -65,6 +65,7 @@ public void testRejectsDouble() { BlockFactory blockFactory = driverContext.blockFactory(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionTests.java index 8d32f4413d340..55f522f31b28a 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionTests.java @@ -66,6 +66,7 @@ public void testRejectsDouble() { BlockFactory blockFactory = driverContext.blockFactory(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java index 83bf1a4657e61..a64ec4e155ad0 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java @@ -53,6 +53,7 @@ public void testOverflowSucceeds() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(Double.MAX_VALUE - 1, 2)), List.of(simple().get(driverContext)), @@ -71,6 +72,7 @@ public void testSummationAccuracy() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new SequenceDoubleBlockSourceOperator( driverContext.blockFactory(), @@ -100,6 +102,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(values)), List.of(simple().get(driverContext)), @@ -122,6 +125,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)), List.of(simple().get(driverContext)), @@ -141,6 +145,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumFloatAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumFloatAggregatorFunctionTests.java index a140750c736c5..11205907acb2d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumFloatAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumFloatAggregatorFunctionTests.java @@ -53,6 +53,7 @@ public void testOverflowSucceeds() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new SequenceFloatBlockSourceOperator(driverContext.blockFactory(), Stream.of(Float.MAX_VALUE - 1, 2f)), List.of(simple().get(driverContext)), @@ -71,6 +72,7 @@ public void testSummationAccuracy() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new SequenceFloatBlockSourceOperator( driverContext.blockFactory(), @@ -100,6 +102,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceFloatBlockSourceOperator(driverContext.blockFactory(), Stream.of(values)), List.of(simple().get(driverContext)), @@ -122,6 +125,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceFloatBlockSourceOperator(driverContext.blockFactory(), Stream.of(largeValues)), List.of(simple().get(driverContext)), @@ -141,6 +145,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceFloatBlockSourceOperator(driverContext.blockFactory(), Stream.of(largeValues)), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntAggregatorFunctionTests.java index 4f8a3a693f5ea..6484382d5ff50 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntAggregatorFunctionTests.java @@ -52,6 +52,7 @@ public void testRejectsDouble() { BlockFactory blockFactory = driverContext.blockFactory(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionTests.java index 3422de4b15dfb..c2b805291f4f6 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionTests.java @@ -51,6 +51,7 @@ public void testOverflowFails() { DriverContext driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceLongBlockSourceOperator(driverContext.blockFactory(), LongStream.of(Long.MAX_VALUE - 1, 2)), List.of(simple().get(driverContext)), @@ -68,6 +69,7 @@ public void testRejectsDouble() { BlockFactory blockFactory = driverContext.blockFactory(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java index 832945cdb549c..42e9fc8deafc1 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java @@ -416,6 +416,7 @@ public void testCategorize_withDriver() { List intermediateOutput = new ArrayList<>(); Driver driver = new Driver( + "test", driverContext, new LocalSourceOperator(input1), List.of( @@ -436,6 +437,7 @@ public void testCategorize_withDriver() { runDriver(driver); driver = new Driver( + "test", driverContext, new LocalSourceOperator(input2), List.of( @@ -458,6 +460,7 @@ public void testCategorize_withDriver() { List finalOutput = new ArrayList<>(); driver = new Driver( + "test", driverContext, new CannedSourceOperator(intermediateOutput.iterator()), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizePackedValuesBlockHashTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizePackedValuesBlockHashTests.java index a1028a76f7b2b..9c89317e4c359 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizePackedValuesBlockHashTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizePackedValuesBlockHashTests.java @@ -137,6 +137,7 @@ public void testCategorize_withDriver() { List intermediateOutput = new ArrayList<>(); Driver driver = new Driver( + "test", driverContext, new LocalSourceOperator(input1), List.of( @@ -154,6 +155,7 @@ public void testCategorize_withDriver() { runDriver(driver); driver = new Driver( + "test", driverContext, new LocalSourceOperator(input2), List.of( @@ -173,6 +175,7 @@ public void testCategorize_withDriver() { List finalOutput = new ArrayList<>(); driver = new Driver( + "test", driverContext, new CannedSourceOperator(intermediateOutput.iterator()), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java index c67fac2383805..a1a94acfdf66d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java @@ -151,7 +151,7 @@ private void testCount(Supplier contexts, int size, int limit) { int taskConcurrency = between(1, 8); for (int i = 0; i < taskConcurrency; i++) { DriverContext ctx = contexts.get(); - drivers.add(new Driver(ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); + drivers.add(new Driver("test", ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); } OperatorTestCase.runDriver(drivers); assertThat(results.size(), lessThanOrEqualTo(taskConcurrency)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java index 7534af6748c73..db95a23de40cd 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java @@ -166,7 +166,7 @@ private void testMax(Supplier contexts, int size, int limit) { int taskConcurrency = between(1, 8); for (int i = 0; i < taskConcurrency; i++) { DriverContext ctx = contexts.get(); - drivers.add(new Driver(ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); + drivers.add(new Driver("test", ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); } OperatorTestCase.runDriver(drivers); assertThat(results.size(), lessThanOrEqualTo(taskConcurrency)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java index 5ec1c8f8f734c..f7755e59cdd8a 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java @@ -166,7 +166,7 @@ private void testMin(Supplier contexts, int size, int limit) { int taskConcurrency = between(1, 8); for (int i = 0; i < taskConcurrency; i++) { DriverContext ctx = contexts.get(); - drivers.add(new Driver(ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); + drivers.add(new Driver("test", ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); } OperatorTestCase.runDriver(drivers); assertThat(results.size(), lessThanOrEqualTo(taskConcurrency)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java index 8161ef5ff9561..f3f380e3be3bb 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java @@ -216,6 +216,7 @@ private List runQuery(Set values, Query query, boolean shuffleDocs operators.add(createOperator(blockFactory, shards)); List results = new ArrayList<>(); Driver driver = new Driver( + "test", driverContext, luceneOperatorFactory(reader, new MatchAllDocsQuery(), usesScoring()).get(driverContext), operators, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index a0dff14776b69..5b7ec4ceb5052 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -164,6 +164,7 @@ public void testEarlyTermination() { }); Driver driver = new Driver( "driver" + i, + "test", 0, 0, driverContext, @@ -229,7 +230,7 @@ private void testSimple(DriverContext ctx, DataPartitioning partitioning, int si List results = new ArrayList<>(); OperatorTestCase.runDriver( - new Driver(ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) + new Driver("test", ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) ); OperatorTestCase.assertDriverContext(ctx); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java index 82d578e2a2d49..3e20bee9ef3d2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java @@ -127,7 +127,7 @@ protected void testSimple(DriverContext ctx, int size, int limit) { List results = new ArrayList<>(); OperatorTestCase.runDriver( - new Driver(ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) + new Driver("test", ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) ); OperatorTestCase.assertDriverContext(ctx); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index c235bf5ae9883..146dc91588bc4 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -187,7 +187,7 @@ protected void testSimple(DriverContext ctx, int size, int limit) { List results = new ArrayList<>(); OperatorTestCase.runDriver( - new Driver(ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) + new Driver("test", ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) ); OperatorTestCase.assertDriverContext(ctx); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java index feba401d445e7..934fbcc0b897e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java @@ -174,6 +174,7 @@ record Doc(int host, long timestamp, long metric) {} var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG); OperatorTestCase.runDriver( new Driver( + "test", driverContext, timeSeriesFactory.get(driverContext), List.of(ValuesSourceReaderOperatorTests.factory(reader, metricField, ElementType.LONG).get(driverContext)), @@ -248,6 +249,7 @@ public void testMatchNone() throws Exception { List results = new ArrayList<>(); OperatorTestCase.runDriver( new Driver( + "test", driverContext, timeSeriesFactory.get(driverContext), List.of(), @@ -306,6 +308,7 @@ List runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname"); OperatorTestCase.runDriver( new Driver( + "test", ctx, timeSeriesFactory.get(ctx), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java index 8d12262389bf0..f75208b1ab880 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java @@ -1299,6 +1299,7 @@ public void testWithNulls() throws IOException { var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2); try ( Driver driver = new Driver( + "test", driverContext, luceneFactory.get(driverContext), List.of( @@ -1376,6 +1377,7 @@ public void testNullsShared() { int[] pages = new int[] { 0 }; try ( Driver d = new Driver( + "test", driverContext, simpleInput(driverContext, 10), List.of( @@ -1499,6 +1501,7 @@ protected final List drive(List operators, Iterator input, boolean success = false; try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(input), operators, @@ -1526,6 +1529,7 @@ public static void runDriver(List drivers) { for (int i = 0; i < dummyDrivers; i++) { drivers.add( new Driver( + "test", "dummy-session", 0, 0, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java index 051ede0a7e993..50695ec2f62bd 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java @@ -1342,6 +1342,7 @@ public void testWithNulls() throws IOException { ); try ( Driver driver = new Driver( + "test", driverContext, luceneFactory.get(driverContext), List.of( @@ -1444,6 +1445,7 @@ public void testNullsShared() { int[] pages = new int[] { 0 }; try ( Driver d = new Driver( + "test", driverContext, simpleInput(driverContext.blockFactory(), 10), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java index a5a6333bd846c..f4d489e8206ef 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java @@ -165,7 +165,14 @@ public void doClose() { } }); PlainActionFuture future = new PlainActionFuture<>(); - Driver driver = new Driver(driverContext, sourceOperator, intermediateOperators, outputOperator, () -> assertFalse(it.hasNext())); + Driver driver = new Driver( + "test", + driverContext, + sourceOperator, + intermediateOperators, + outputOperator, + () -> assertFalse(it.hasNext()) + ); Driver.start(threadPool.getThreadContext(), threadPool.executor(ESQL_TEST_EXECUTOR), driver, between(1, 10000), future); future.actionGet(); Releasables.close(localBreaker); @@ -295,7 +302,7 @@ protected void doClose() { }; SinkOperator outputOperator = new PageConsumerOperator(Page::releaseBlocks); PlainActionFuture future = new PlainActionFuture<>(); - Driver driver = new Driver(driverContext, sourceOperator, List.of(asyncOperator), outputOperator, localBreaker); + Driver driver = new Driver("test", driverContext, sourceOperator, List.of(asyncOperator), outputOperator, localBreaker); Driver.start(threadPool.getThreadContext(), threadPool.executor(ESQL_TEST_EXECUTOR), driver, between(1, 1000), future); assertBusy(() -> assertTrue(future.isDone())); if (failed.get()) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java index 27083ea0fcd13..a39aa10af5f31 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java @@ -27,6 +27,7 @@ public class DriverProfileTests extends AbstractWireSerializingTestCase { public void testToXContent() { DriverProfile status = new DriverProfile( + "test", 123413220000L, 123413243214L, 10012, @@ -44,6 +45,7 @@ public void testToXContent() { ); assertThat(Strings.toString(status, true, true), equalTo(""" { + "task_description" : "test", "start" : "1973-11-29T09:27:00.000Z", "start_millis" : 123413220000, "stop" : "1973-11-29T09:27:23.214Z", @@ -101,6 +103,7 @@ protected Writeable.Reader instanceReader() { @Override protected DriverProfile createTestInstance() { return new DriverProfile( + DriverStatusTests.randomTaskDescription(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -113,6 +116,7 @@ protected DriverProfile createTestInstance() { @Override protected DriverProfile mutateInstance(DriverProfile instance) throws IOException { + String taskDescription = instance.taskDescription(); long startMillis = instance.startMillis(); long stopMillis = instance.stopMillis(); long tookNanos = instance.tookNanos(); @@ -120,17 +124,18 @@ protected DriverProfile mutateInstance(DriverProfile instance) throws IOExceptio long iterations = instance.iterations(); var operators = instance.operators(); var sleeps = instance.sleeps(); - switch (between(0, 6)) { - case 0 -> startMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong); - case 1 -> stopMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong); - case 2 -> tookNanos = randomValueOtherThan(tookNanos, ESTestCase::randomNonNegativeLong); - case 3 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong); - case 4 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong); - case 5 -> operators = randomValueOtherThan(operators, DriverStatusTests::randomOperatorStatuses); - case 6 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps); + switch (between(0, 7)) { + case 0 -> taskDescription = randomValueOtherThan(taskDescription, DriverStatusTests::randomTaskDescription); + case 1 -> startMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong); + case 2 -> stopMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong); + case 3 -> tookNanos = randomValueOtherThan(tookNanos, ESTestCase::randomNonNegativeLong); + case 4 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong); + case 5 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong); + case 6 -> operators = randomValueOtherThan(operators, DriverStatusTests::randomOperatorStatuses); + case 7 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps); default -> throw new UnsupportedOperationException(); } - return new DriverProfile(startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps); + return new DriverProfile(taskDescription, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java index b46d9f3f4add7..83deb57a3ba7c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java @@ -32,6 +32,7 @@ public class DriverStatusTests extends AbstractWireSerializingTestCase instanceReader() { protected DriverStatus createTestInstance() { return new DriverStatus( randomSessionId(), + randomTaskDescription(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -129,6 +132,10 @@ private String randomSessionId() { return RandomStrings.randomAsciiLettersOfLengthBetween(random(), 1, 15); } + public static String randomTaskDescription() { + return RandomStrings.randomAsciiLettersOfLength(random(), 5); + } + private DriverStatus.Status randomStatus() { return randomFrom(DriverStatus.Status.values()); } @@ -150,6 +157,7 @@ private static DriverStatus.OperatorStatus randomOperatorStatus() { @Override protected DriverStatus mutateInstance(DriverStatus instance) throws IOException { var sessionId = instance.sessionId(); + var taskDescription = instance.taskDescription(); long started = instance.started(); long lastUpdated = instance.lastUpdated(); long cpuNanos = instance.cpuNanos(); @@ -158,19 +166,31 @@ protected DriverStatus mutateInstance(DriverStatus instance) throws IOException var completedOperators = instance.completedOperators(); var activeOperators = instance.activeOperators(); var sleeps = instance.sleeps(); - switch (between(0, 8)) { + switch (between(0, 9)) { case 0 -> sessionId = randomValueOtherThan(sessionId, this::randomSessionId); - case 1 -> started = randomValueOtherThan(started, ESTestCase::randomNonNegativeLong); - case 2 -> lastUpdated = randomValueOtherThan(lastUpdated, ESTestCase::randomNonNegativeLong); - case 3 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong); - case 4 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong); - case 5 -> status = randomValueOtherThan(status, this::randomStatus); - case 6 -> completedOperators = randomValueOtherThan(completedOperators, DriverStatusTests::randomOperatorStatuses); - case 7 -> activeOperators = randomValueOtherThan(activeOperators, DriverStatusTests::randomOperatorStatuses); - case 8 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps); + case 1 -> taskDescription = randomValueOtherThan(taskDescription, DriverStatusTests::randomTaskDescription); + case 2 -> started = randomValueOtherThan(started, ESTestCase::randomNonNegativeLong); + case 3 -> lastUpdated = randomValueOtherThan(lastUpdated, ESTestCase::randomNonNegativeLong); + case 4 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong); + case 5 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong); + case 6 -> status = randomValueOtherThan(status, this::randomStatus); + case 7 -> completedOperators = randomValueOtherThan(completedOperators, DriverStatusTests::randomOperatorStatuses); + case 8 -> activeOperators = randomValueOtherThan(activeOperators, DriverStatusTests::randomOperatorStatuses); + case 9 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps); default -> throw new UnsupportedOperationException(); } - return new DriverStatus(sessionId, started, lastUpdated, cpuNanos, iterations, status, completedOperators, activeOperators, sleeps); + return new DriverStatus( + sessionId, + taskDescription, + started, + lastUpdated, + cpuNanos, + iterations, + status, + completedOperators, + activeOperators, + sleeps + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index 663a286727445..b0b47ec56aeaa 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -67,6 +67,7 @@ public void testProfileAndStatusFinishInOneRound() { Driver driver = new Driver( "unset", + "test", startEpoch, startNanos, driverContext, @@ -116,6 +117,7 @@ public void testProfileAndStatusOneIterationAtATime() { Driver driver = new Driver( "unset", + "test", startEpoch, startNanos, driverContext, @@ -166,6 +168,7 @@ public void testProfileAndStatusTimeout() { Driver driver = new Driver( "unset", + "test", startEpoch, startNanos, driverContext, @@ -212,6 +215,7 @@ public void testProfileAndStatusInterval() { Driver driver = new Driver( "unset", + "test", startEpoch, startNanos, driverContext, @@ -280,7 +284,7 @@ public void testThreadContext() throws Exception { WarningsOperator warning1 = new WarningsOperator(threadPool); WarningsOperator warning2 = new WarningsOperator(threadPool); CyclicBarrier allPagesProcessed = new CyclicBarrier(2); - Driver driver = new Driver(driverContext, new CannedSourceOperator(inPages.iterator()) { + Driver driver = new Driver("test", driverContext, new CannedSourceOperator(inPages.iterator()) { @Override public Page getOutput() { assertRunningWithRegularUser(threadPool); @@ -364,7 +368,7 @@ public void close() { } }); - Driver driver = new Driver(driverContext, sourceOperator, List.of(delayOperator), sinkOperator, () -> {}); + Driver driver = new Driver("test", driverContext, sourceOperator, List.of(delayOperator), sinkOperator, () -> {}); ThreadContext threadContext = threadPool.getThreadContext(); PlainActionFuture future = new PlainActionFuture<>(); @@ -384,7 +388,7 @@ public void testResumeOnEarlyFinish() throws Exception { var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis); var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource()); var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity()); - Driver driver = new Driver(driverContext, sourceOperator, List.of(), sinkOperator, () -> {}); + Driver driver = new Driver("test", driverContext, sourceOperator, List.of(), sinkOperator, () -> {}); PlainActionFuture future = new PlainActionFuture<>(); Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future); assertBusy( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java index e1ca26da035e7..f08552913963d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java @@ -68,6 +68,7 @@ public final void testInitialFinal() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(input.iterator()), List.of(simpleWithMode(AggregatorMode.INITIAL).get(driverContext), simpleWithMode(AggregatorMode.FINAL).get(driverContext)), @@ -89,6 +90,7 @@ public final void testManyInitialFinal() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(partials.iterator()), List.of(simpleWithMode(AggregatorMode.FINAL).get(driverContext)), @@ -110,6 +112,7 @@ public final void testInitialIntermediateFinal() { try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(input.iterator()), List.of( @@ -142,6 +145,7 @@ public final void testManyInitialManyPartialFinal() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(intermediates.iterator()), List.of(simpleWithMode(AggregatorMode.FINAL).get(driverContext)), @@ -236,6 +240,7 @@ List createDriversForInput(List input, List results, boolean DriverContext driver1Context = driverContext(); drivers.add( new Driver( + "test", driver1Context, new CannedSourceOperator(pages.iterator()), List.of( @@ -253,6 +258,7 @@ List createDriversForInput(List input, List results, boolean DriverContext driver2Context = driverContext(); drivers.add( new Driver( + "test", driver2Context, new ExchangeSourceOperator(sourceExchanger.createExchangeSource()), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java index a0f6711051213..0e4d68af91f8e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitOperatorTests.java @@ -167,6 +167,7 @@ public void close() { drivers.add( new Driver( "unset", + "test", 0, 0, driverContext, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java index eb862e20682f4..103a6a35651c7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java @@ -304,6 +304,7 @@ public void close() { List results = new ArrayList<>(); OperatorTestCase.runDriver( new Driver( + "test", ctx, sourceOperatorFactory.get(ctx), CollectionUtils.concatLists(intermediateOperators, List.of(intialAgg, intermediateAgg, finalAgg)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index 3e86b9c44b5c1..92e9da03eb8aa 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -304,6 +304,7 @@ Set runConcurrentTest( DriverContext dc = driverContext(); Driver d = new Driver( "test-session:1", + "test", 0, 0, dc, @@ -322,6 +323,7 @@ Set runConcurrentTest( DriverContext dc = driverContext(); Driver d = new Driver( "test-session:2", + "test", 0, 0, dc, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index a55ae5d426b27..176a8c998b103 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -544,6 +544,7 @@ public void testCollectAllValues() { List> actualTop = new ArrayList<>(); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(new Page(blocks.toArray(Block[]::new))).iterator()), List.of( @@ -634,6 +635,7 @@ public void testCollectAllValues_RandomMultiValues() { List> actualTop = new ArrayList<>(); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(new Page(blocks.toArray(Block[]::new))).iterator()), List.of( @@ -669,6 +671,7 @@ private List> topNTwoColumns( List> outputValues = new ArrayList<>(); try ( Driver driver = new Driver( + "test", driverContext, new TupleBlockSourceOperator(driverContext.blockFactory(), inputValues, randomIntBetween(1, 1000)), List.of( @@ -939,6 +942,7 @@ private void assertSortingOnMV( int topCount = randomIntBetween(1, values.size()); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(page).iterator()), List.of( @@ -1113,6 +1117,7 @@ public void testIPSortingSingleValue() throws UnknownHostException { List> actual = new ArrayList<>(); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(new Page(builder.build())).iterator()), List.of( @@ -1240,6 +1245,7 @@ private void assertIPSortingOnMultiValues( DriverContext driverContext = driverContext(); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(new Page(builder.build())).iterator()), List.of( @@ -1328,6 +1334,7 @@ public void testZeroByte() { DriverContext driverContext = driverContext(); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(new Page(blocks.toArray(Block[]::new))).iterator()), List.of( @@ -1368,6 +1375,7 @@ public void testErrorBeforeFullyDraining() { DriverContext driverContext = driverContext(); try ( Driver driver = new Driver( + "test", driverContext, new SequenceLongBlockSourceOperator(driverContext.blockFactory(), LongStream.range(0, docCount)), List.of( diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java index 5fac2bae8ca0d..6ac34def4c3a1 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java @@ -189,6 +189,7 @@ protected final List oneDriverPerPageList(Iterator> source, Sup List in = source.next(); try ( Driver d = new Driver( + "test", driverContext(), new CannedSourceOperator(in.iterator()), operators.get(), @@ -263,6 +264,7 @@ protected final List drive(List operators, Iterator input, boolean success = false; try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(input), operators, @@ -290,6 +292,7 @@ public static void runDriver(List drivers) { for (int i = 0; i < dummyDrivers; i++) { drivers.add( new Driver( + "test", "dummy-session", 0, 0, diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index f8f980de8ed4a..27e40f7d1545d 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -52,6 +52,7 @@ import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC; import static org.elasticsearch.xpack.esql.tools.ProfileParser.parseProfile; import static org.elasticsearch.xpack.esql.tools.ProfileParser.readProfileFromResponse; +import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; @@ -303,7 +304,6 @@ public void testProfile() throws IOException { equalTo(List.of(List.of(499.5d))) ); - List> signatures = new ArrayList<>(); @SuppressWarnings("unchecked") List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); for (Map p : profiles) { @@ -315,25 +315,34 @@ public void testProfile() throws IOException { for (Map o : operators) { sig.add(checkOperatorProfile(o)); } - signatures.add(sig); + String taskDescription = p.get("task_description").toString(); + switch (taskDescription) { + case "data" -> assertMap( + sig, + matchesList().item("LuceneSourceOperator") + .item("ValuesSourceReaderOperator") + .item("AggregationOperator") + .item("ExchangeSinkOperator") + ); + case "node_reduce" -> assertThat( + sig, + either(matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator")).or( + matchesList().item("ExchangeSourceOperator").item("AggregationOperator").item("ExchangeSinkOperator") + ) + ); + case "final" -> assertMap( + sig, + matchesList().item("ExchangeSourceOperator") + .item("AggregationOperator") + .item("ProjectOperator") + .item("LimitOperator") + .item("EvalOperator") + .item("ProjectOperator") + .item("OutputOperator") + ); + default -> throw new IllegalArgumentException("can't match " + taskDescription); + } } - assertThat( - signatures, - containsInAnyOrder( - matchesList().item("LuceneSourceOperator") - .item("ValuesSourceReaderOperator") - .item("AggregationOperator") - .item("ExchangeSinkOperator"), - matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"), - matchesList().item("ExchangeSourceOperator") - .item("AggregationOperator") - .item("ProjectOperator") - .item("LimitOperator") - .item("EvalOperator") - .item("ProjectOperator") - .item("OutputOperator") - ) - ); } private final String PROCESS_NAME = "process_name"; @@ -520,6 +529,7 @@ public void testInlineStatsProfile() throws IOException { } signatures.add(sig); } + // TODO adapt this to use task_description once this is reenabled assertThat( signatures, containsInAnyOrder( @@ -620,28 +630,38 @@ public void testForceSleepsProfile() throws IOException { MapMatcher sleepMatcher = matchesMap().entry("reason", "exchange empty") .entry("sleep_millis", greaterThan(0L)) .entry("wake_millis", greaterThan(0L)); - if (operators.contains("LuceneSourceOperator")) { - assertMap(sleeps, matchesMap().entry("counts", Map.of()).entry("first", List.of()).entry("last", List.of())); - } else if (operators.contains("ExchangeSourceOperator")) { - assertMap(sleeps, matchesMap().entry("counts", matchesMap().entry("exchange empty", greaterThan(0))).extraOk()); - @SuppressWarnings("unchecked") - List> first = (List>) sleeps.get("first"); - for (Map s : first) { - assertMap(s, sleepMatcher); + String taskDescription = p.get("task_description").toString(); + switch (taskDescription) { + case "data" -> assertMap(sleeps, matchesMap().entry("counts", Map.of()).entry("first", List.of()).entry("last", List.of())); + case "node_reduce" -> { + assertMap(sleeps, matchesMap().entry("counts", matchesMap().entry("exchange empty", greaterThan(0))).extraOk()); + @SuppressWarnings("unchecked") + List> first = (List>) sleeps.get("first"); + for (Map s : first) { + assertMap(s, sleepMatcher); + } + @SuppressWarnings("unchecked") + List> last = (List>) sleeps.get("last"); + for (Map s : last) { + assertMap(s, sleepMatcher); + } } - @SuppressWarnings("unchecked") - List> last = (List>) sleeps.get("last"); - for (Map s : last) { - assertMap(s, sleepMatcher); + case "final" -> { + assertMap( + sleeps, + matchesMap().entry("counts", matchesMap().entry("exchange empty", 1)) + .entry("first", List.of(sleepMatcher)) + .entry("last", List.of(sleepMatcher)) + ); } - } else { - fail("unknown signature: " + operators); + default -> throw new IllegalArgumentException("unknown task: " + taskDescription); } } } public static MapMatcher commonProfile() { - return matchesMap().entry("start_millis", greaterThan(0L)) + return matchesMap().entry("task_description", any(String.class)) + .entry("start_millis", greaterThan(0L)) .entry("stop_millis", greaterThan(0L)) .entry("iterations", greaterThan(0L)) .entry("cpu_nanos", greaterThan(0L)) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index f973c3fe2347a..595c251231e5b 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -38,6 +38,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.hamcrest.Matcher; import org.junit.Before; import java.io.IOException; @@ -75,9 +76,6 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase { private static final Logger LOGGER = LogManager.getLogger(EsqlActionTaskIT.class); - private String READ_DESCRIPTION; - private String MERGE_DESCRIPTION; - private String REDUCE_DESCRIPTION; private boolean nodeLevelReduction; /** @@ -89,21 +87,6 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase { public void setup() { assumeTrue("requires query pragmas", canUseQueryPragmas()); nodeLevelReduction = randomBoolean(); - READ_DESCRIPTION = """ - \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 2147483647, needsScore = false] - \\_ValuesSourceReaderOperator[fields = [pause_me]] - \\_AggregationOperator[mode = INITIAL, aggs = sum of longs] - \\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize())); - MERGE_DESCRIPTION = """ - \\_ExchangeSourceOperator[] - \\_AggregationOperator[mode = FINAL, aggs = sum of longs] - \\_ProjectOperator[projection = [0]] - \\_LimitOperator[limit = 1000] - \\_OutputOperator[columns = [sum(pause_me)]]"""; - REDUCE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n" - + (nodeLevelReduction ? "\\_AggregationOperator[mode = INTERMEDIATE, aggs = sum of longs]\n" : "") - + "\\_ExchangeSinkOperator"; - } public void testTaskContents() throws Exception { @@ -120,9 +103,11 @@ public void testTaskContents() throws Exception { for (TaskInfo task : foundTasks) { DriverStatus status = (DriverStatus) task.status(); assertThat(status.sessionId(), not(emptyOrNullString())); + String taskDescription = status.taskDescription(); for (DriverStatus.OperatorStatus o : status.activeOperators()) { logger.info("status {}", o); if (o.operator().startsWith("LuceneSourceOperator[maxPageSize = " + pageSize())) { + assertThat(taskDescription, equalTo("data")); LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status(); assertThat(oStatus.processedSlices(), lessThanOrEqualTo(oStatus.totalSlices())); assertThat(oStatus.processedQueries(), equalTo(Set.of("*:*"))); @@ -142,6 +127,7 @@ public void testTaskContents() throws Exception { continue; } if (o.operator().equals("ValuesSourceReaderOperator[fields = [pause_me]]")) { + assertThat(taskDescription, equalTo("data")); ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status(); assertMap( oStatus.readersBuilt(), @@ -152,6 +138,7 @@ public void testTaskContents() throws Exception { continue; } if (o.operator().equals("ExchangeSourceOperator")) { + assertThat(taskDescription, either(equalTo("node_reduce")).or(equalTo("final"))); ExchangeSourceOperator.Status oStatus = (ExchangeSourceOperator.Status) o.status(); assertThat(oStatus.pagesWaiting(), greaterThanOrEqualTo(0)); assertThat(oStatus.pagesEmitted(), greaterThanOrEqualTo(0)); @@ -159,6 +146,7 @@ public void testTaskContents() throws Exception { continue; } if (o.operator().equals("ExchangeSinkOperator")) { + assertThat(taskDescription, either(equalTo("data")).or(equalTo("node_reduce"))); ExchangeSinkOperator.Status oStatus = (ExchangeSinkOperator.Status) o.status(); assertThat(oStatus.pagesReceived(), greaterThanOrEqualTo(0)); exchangeSinks++; @@ -169,6 +157,29 @@ public void testTaskContents() throws Exception { assertThat(valuesSourceReaders, equalTo(1)); assertThat(exchangeSinks, greaterThanOrEqualTo(1)); assertThat(exchangeSources, equalTo(2)); + assertThat( + dataTasks(foundTasks).get(0).description(), + equalTo( + """ + \\_LuceneSourceOperator[sourceStatus] + \\_ValuesSourceReaderOperator[fields = [pause_me]] + \\_AggregationOperator[mode = INITIAL, aggs = sum of longs] + \\_ExchangeSinkOperator""".replace( + "sourceStatus", + "dataPartitioning = SHARD, maxPageSize = " + pageSize() + ", limit = 2147483647, needsScore = false" + ) + ) + ); + assertThat( + nodeReduceTasks(foundTasks).get(0).description(), + nodeLevelReduceDescriptionMatcher(foundTasks, "\\_AggregationOperator[mode = INTERMEDIATE, aggs = sum of longs]\n") + ); + assertThat(coordinatorTasks(foundTasks).get(0).description(), equalTo(""" + \\_ExchangeSourceOperator[] + \\_AggregationOperator[mode = FINAL, aggs = sum of longs] + \\_ProjectOperator[projection = [0]] + \\_LimitOperator[limit = 1000] + \\_OutputOperator[columns = [sum(pause_me)]]""")); } finally { scriptPermits.release(numberOfDocs()); try (EsqlQueryResponse esqlResponse = response.get()) { @@ -181,7 +192,7 @@ public void testCancelRead() throws Exception { ActionFuture response = startEsql(); try { List infos = getTasksStarting(); - TaskInfo running = infos.stream().filter(t -> t.description().equals(READ_DESCRIPTION)).findFirst().get(); + TaskInfo running = infos.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("data")).findFirst().get(); cancelTask(running.taskId()); assertCancelled(response); } finally { @@ -193,7 +204,7 @@ public void testCancelMerge() throws Exception { ActionFuture response = startEsql(); try { List infos = getTasksStarting(); - TaskInfo running = infos.stream().filter(t -> t.description().equals(MERGE_DESCRIPTION)).findFirst().get(); + TaskInfo running = infos.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("final")).findFirst().get(); cancelTask(running.taskId()); assertCancelled(response); } finally { @@ -277,8 +288,8 @@ private List getTasksStarting() throws Exception { for (TaskInfo task : tasks) { assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME)); DriverStatus status = (DriverStatus) task.status(); - logger.info("task {} {}", task.description(), status); - assertThat(task.description(), anyOf(equalTo(READ_DESCRIPTION), equalTo(MERGE_DESCRIPTION), equalTo(REDUCE_DESCRIPTION))); + logger.info("task {} {} {}", status.taskDescription(), task.description(), status); + assertThat(status.taskDescription(), anyOf(equalTo("data"), equalTo("node_reduce"), equalTo("final"))); /* * Accept tasks that are either starting or have gone * immediately async. The coordinating task is likely @@ -302,8 +313,8 @@ private List getTasksRunning() throws Exception { for (TaskInfo task : tasks) { assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME)); DriverStatus status = (DriverStatus) task.status(); - assertThat(task.description(), anyOf(equalTo(READ_DESCRIPTION), equalTo(MERGE_DESCRIPTION), equalTo(REDUCE_DESCRIPTION))); - if (task.description().equals(READ_DESCRIPTION)) { + assertThat(status.taskDescription(), anyOf(equalTo("data"), equalTo("node_reduce"), equalTo("final"))); + if (status.taskDescription().equals("data")) { assertThat(status.status(), equalTo(DriverStatus.Status.RUNNING)); } else { assertThat(status.status(), equalTo(DriverStatus.Status.ASYNC)); @@ -328,23 +339,26 @@ private List getDriverTasks() throws Exception { .get() .getTasks(); assertThat(tasks, hasSize(equalTo(3))); - List readTasks = tasks.stream().filter(t -> t.description().equals(READ_DESCRIPTION)).toList(); - List mergeTasks = tasks.stream().filter(t -> t.description().equals(MERGE_DESCRIPTION)).toList(); - assertThat(readTasks, hasSize(1)); - assertThat(mergeTasks, hasSize(1)); - // node-level reduction is disabled when the target data node is also the coordinator - if (readTasks.get(0).node().equals(mergeTasks.get(0).node())) { - REDUCE_DESCRIPTION = """ - \\_ExchangeSourceOperator[] - \\_ExchangeSinkOperator"""; - } - List reduceTasks = tasks.stream().filter(t -> t.description().equals(REDUCE_DESCRIPTION)).toList(); - assertThat(reduceTasks, hasSize(1)); + assertThat(dataTasks(tasks), hasSize(1)); + assertThat(nodeReduceTasks(tasks), hasSize(1)); + assertThat(coordinatorTasks(tasks), hasSize(1)); foundTasks.addAll(tasks); }); return foundTasks; } + private List dataTasks(List tasks) { + return tasks.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("data")).toList(); + } + + private List nodeReduceTasks(List tasks) { + return tasks.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("node_reduce")).toList(); + } + + private List coordinatorTasks(List tasks) { + return tasks.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("final")).toList(); + } + private void assertCancelled(ActionFuture response) throws Exception { Exception e = expectThrows(Exception.class, response); Throwable cancelException = ExceptionsHelper.unwrap(e, TaskCancelledException.class); @@ -477,30 +491,41 @@ protected void doRun() throws Exception { } public void testTaskContentsForTopNQuery() throws Exception { - READ_DESCRIPTION = ("\\_LuceneTopNSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 1000, " - + "needsScore = false, " - + "sorts = [{\"pause_me\":{\"order\":\"asc\",\"missing\":\"_last\",\"unmapped_type\":\"long\"}}]]\n" - + "\\_ValuesSourceReaderOperator[fields = [pause_me]]\n" - + "\\_ProjectOperator[projection = [1]]\n" - + "\\_ExchangeSinkOperator").replace("pageSize()", Integer.toString(pageSize())); - MERGE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n" - + "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], " - + "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n" - + "\\_ProjectOperator[projection = [0]]\n" - + "\\_OutputOperator[columns = [pause_me]]"; - REDUCE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n" - + (nodeLevelReduction - ? "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], " - + "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n" - : "") - + "\\_ExchangeSinkOperator"; - ActionFuture response = startEsql("from test | sort pause_me | keep pause_me"); try { getTasksStarting(); logger.info("unblocking script"); scriptPermits.release(pageSize()); - getTasksRunning(); + List tasks = getTasksRunning(); + String sortStatus = """ + [{"pause_me":{"order":"asc","missing":"_last","unmapped_type":"long"}}]"""; + String sourceStatus = "dataPartitioning = SHARD, maxPageSize = " + + pageSize() + + ", limit = 1000, needsScore = false, sorts = " + + sortStatus; + assertThat(dataTasks(tasks).get(0).description(), equalTo(""" + \\_LuceneTopNSourceOperator[sourceStatus] + \\_ValuesSourceReaderOperator[fields = [pause_me]] + \\_ProjectOperator[projection = [1]] + \\_ExchangeSinkOperator""".replace("sourceStatus", sourceStatus))); + assertThat( + nodeReduceTasks(tasks).get(0).description(), + nodeLevelReduceDescriptionMatcher( + tasks, + "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], " + + "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n" + ) + ); + assertThat( + coordinatorTasks(tasks).get(0).description(), + equalTo( + "\\_ExchangeSourceOperator[]\n" + + "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], " + + "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n" + + "\\_ProjectOperator[projection = [0]]\n" + + "\\_OutputOperator[columns = [pause_me]]" + ) + ); } finally { // each scripted field "emit" is called by LuceneTopNSourceOperator and by ValuesSourceReaderOperator scriptPermits.release(2 * numberOfDocs()); @@ -512,26 +537,26 @@ public void testTaskContentsForTopNQuery() throws Exception { public void testTaskContentsForLimitQuery() throws Exception { String limit = Integer.toString(randomIntBetween(pageSize() + 1, 2 * numberOfDocs())); - READ_DESCRIPTION = """ - \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = limit(), needsScore = false] - \\_ValuesSourceReaderOperator[fields = [pause_me]] - \\_ProjectOperator[projection = [1]] - \\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize())).replace("limit()", limit); - MERGE_DESCRIPTION = """ - \\_ExchangeSourceOperator[] - \\_LimitOperator[limit = limit()] - \\_ProjectOperator[projection = [0]] - \\_OutputOperator[columns = [pause_me]]""".replace("limit()", limit); - REDUCE_DESCRIPTION = ("\\_ExchangeSourceOperator[]\n" - + (nodeLevelReduction ? "\\_LimitOperator[limit = limit()]\n" : "") - + "\\_ExchangeSinkOperator").replace("limit()", limit); - ActionFuture response = startEsql("from test | keep pause_me | limit " + limit); try { getTasksStarting(); logger.info("unblocking script"); scriptPermits.release(pageSize() - prereleasedDocs); - getTasksRunning(); + List tasks = getTasksRunning(); + assertThat(dataTasks(tasks).get(0).description(), equalTo(""" + \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = limit(), needsScore = false] + \\_ValuesSourceReaderOperator[fields = [pause_me]] + \\_ProjectOperator[projection = [1]] + \\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize())).replace("limit()", limit))); + assertThat( + nodeReduceTasks(tasks).get(0).description(), + nodeLevelReduceDescriptionMatcher(tasks, "\\_LimitOperator[limit = " + limit + "]\n") + ); + assertThat(coordinatorTasks(tasks).get(0).description(), equalTo(""" + \\_ExchangeSourceOperator[] + \\_LimitOperator[limit = limit()] + \\_ProjectOperator[projection = [0]] + \\_OutputOperator[columns = [pause_me]]""".replace("limit()", limit))); } finally { scriptPermits.release(numberOfDocs()); try (EsqlQueryResponse esqlResponse = response.get()) { @@ -541,27 +566,37 @@ public void testTaskContentsForLimitQuery() throws Exception { } public void testTaskContentsForGroupingStatsQuery() throws Exception { - READ_DESCRIPTION = """ - \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 2147483647, needsScore = false] - \\_ValuesSourceReaderOperator[fields = [foo]] - \\_OrdinalsGroupingOperator(aggs = max of longs) - \\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize())); - MERGE_DESCRIPTION = """ - \\_ExchangeSourceOperator[] - \\_HashAggregationOperator[mode = , aggs = max of longs] - \\_ProjectOperator[projection = [1, 0]] - \\_LimitOperator[limit = 1000] - \\_OutputOperator[columns = [max(foo), pause_me]]"""; - REDUCE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n" - + (nodeLevelReduction ? "\\_HashAggregationOperator[mode = , aggs = max of longs]\n" : "") - + "\\_ExchangeSinkOperator"; - ActionFuture response = startEsql("from test | stats max(foo) by pause_me"); try { getTasksStarting(); logger.info("unblocking script"); scriptPermits.release(pageSize()); - getTasksRunning(); + List tasks = getTasksRunning(); + String sourceStatus = "dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 2147483647, needsScore = false".replace( + "pageSize()", + Integer.toString(pageSize()) + ); + assertThat( + dataTasks(tasks).get(0).description(), + equalTo( + """ + \\_LuceneSourceOperator[sourceStatus] + \\_ValuesSourceReaderOperator[fields = [foo]] + \\_OrdinalsGroupingOperator(aggs = max of longs) + \\_ExchangeSinkOperator""".replace("sourceStatus", sourceStatus) + + ) + ); + assertThat( + nodeReduceTasks(tasks).get(0).description(), + nodeLevelReduceDescriptionMatcher(tasks, "\\_HashAggregationOperator[mode = , aggs = max of longs]\n") + ); + assertThat(coordinatorTasks(tasks).get(0).description(), equalTo(""" + \\_ExchangeSourceOperator[] + \\_HashAggregationOperator[mode = , aggs = max of longs] + \\_ProjectOperator[projection = [1, 0]] + \\_LimitOperator[limit = 1000] + \\_OutputOperator[columns = [max(foo), pause_me]]""")); } finally { scriptPermits.release(numberOfDocs()); try (EsqlQueryResponse esqlResponse = response.get()) { @@ -572,6 +607,13 @@ public void testTaskContentsForGroupingStatsQuery() throws Exception { } } + private Matcher nodeLevelReduceDescriptionMatcher(List tasks, String nodeReduce) { + boolean matchNodeReduction = nodeLevelReduction + // If the data node and the coordinator are the same node then we don't reduce aggs in it. + && false == dataTasks(tasks).get(0).node().equals(coordinatorTasks(tasks).get(0).node()); + return equalTo("\\_ExchangeSourceOperator[]\n" + (matchNodeReduction ? nodeReduce : "") + "\\_ExchangeSinkOperator"); + } + @Override protected Collection> nodePlugins() { return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index ea6bb4de9a69f..bad5fe8f9831f 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -227,6 +227,7 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws DriverContext driverContext = driverContext(); try ( var driver = new Driver( + "test", driverContext, source.get(driverContext), List.of(reader.get(driverContext), lookup.get(driverContext)), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index a8e900c32acf9..afd0022dd2c13 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -330,6 +330,7 @@ private void doLookup(T request, CancellableTask task, ActionListener releasables.add(outputOperator); Driver driver = new Driver( "enrich-lookup:" + request.sessionId, + "enrich", System.currentTimeMillis(), System.nanoTime(), driverContext, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 68797bf08cc1f..0e0b7907d83f7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -174,7 +174,7 @@ public LocalExecutionPlanner( /** * turn the given plan into a list of drivers to execute */ - public LocalExecutionPlan plan(FoldContext foldCtx, PhysicalPlan localPhysicalPlan) { + public LocalExecutionPlan plan(String taskDescription, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) { var context = new LocalExecutionPlannerContext( new ArrayList<>(), new Holder<>(DriverParallelism.SINGLE), @@ -195,7 +195,7 @@ public LocalExecutionPlan plan(FoldContext foldCtx, PhysicalPlan localPhysicalPl final TimeValue statusInterval = configuration.pragmas().statusInterval(); context.addDriverFactory( new DriverFactory( - new DriverSupplier(context.bigArrays, context.blockFactory, physicalOperation, statusInterval, settings), + new DriverSupplier(taskDescription, context.bigArrays, context.blockFactory, physicalOperation, statusInterval, settings), context.driverParallelism().get() ) ); @@ -871,6 +871,7 @@ int pageSize(Integer estimatedRowSize) { } record DriverSupplier( + String taskDescription, BigArrays bigArrays, BlockFactory blockFactory, PhysicalOperation physicalOperation, @@ -897,6 +898,7 @@ public Driver apply(String sessionId) { success = true; return new Driver( sessionId, + taskDescription, System.currentTimeMillis(), System.nanoTime(), driverContext, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 51c1aab09e9a6..1e464f32b7fc9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -264,6 +264,7 @@ void runComputeOnRemoteCluster( parentTask, new ComputeContext( localSessionId, + "remote_reduce", clusterAlias, List.of(), configuration, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java index 82943d23581fd..86af106ea7e42 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java @@ -19,6 +19,7 @@ record ComputeContext( String sessionId, + String taskDescription, String clusterAlias, List searchContexts, Configuration configuration, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 9895538f480a9..fda15770b0472 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -168,6 +168,7 @@ public void execute( } var computeContext = new ComputeContext( newChildSession(sessionId), + "single", LOCAL_CLUSTER, List.of(), configuration, @@ -243,6 +244,7 @@ public void execute( rootTask, new ComputeContext( sessionId, + "final", LOCAL_CLUSTER, List.of(), configuration, @@ -413,7 +415,7 @@ public SourceProvider createSourceProvider() { // the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below) // it's doing this in the planning of EsQueryExec (the source of the data) // see also EsPhysicalOperationProviders.sourcePhysicalOperation - LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.foldCtx(), plan); + LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.taskDescription(), context.foldCtx(), plan); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe()); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 78bca3e81ef9b..4ea3588ec216e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -288,6 +288,7 @@ public void onFailure(Exception e) { } var computeContext = new ComputeContext( sessionId, + "data", clusterAlias, searchContexts, configuration, @@ -431,6 +432,7 @@ private void runComputeOnDataNode( task, new ComputeContext( request.sessionId(), + "node_reduce", request.clusterAlias(), List.of(), request.configuration(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 77449f7edc920..30756957416f3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -648,6 +648,7 @@ void executeSubPlan( // replace fragment inside the coordinator plan List drivers = new ArrayList<>(); LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan( + "final", foldCtx, new OutputExec(coordinatorPlan, collectedPages::add) ); @@ -669,7 +670,7 @@ void executeSubPlan( throw new AssertionError("expected no failure", e); }) ); - LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan(foldCtx, csvDataNodePhysicalPlan); + LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan("data", foldCtx, csvDataNodePhysicalPlan); drivers.addAll(dataNodeExecutionPlan.createDrivers(getTestName())); Randomness.shuffle(drivers); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java index 257ef2cfb2d55..286aa8fad4bbe 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.esql.action; +import com.carrotsearch.randomizedtesting.generators.RandomStrings; + import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; @@ -45,6 +47,7 @@ private List randomDriverProfiles() { private DriverProfile randomDriverProfile() { return new DriverProfile( + RandomStrings.randomAsciiLettersOfLength(random(), 5), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index d257d8e71fb44..fc5eaee8cda8b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -746,6 +746,7 @@ public void testProfileXContent() { new EsqlQueryResponse.Profile( List.of( new DriverProfile( + "test", 1723489812649L, 1723489819929L, 20021, @@ -780,6 +781,7 @@ public void testProfileXContent() { "profile" : { "drivers" : [ { + "task_description" : "test", "start_millis" : 1723489812649, "stop_millis" : 1723489819929, "took_nanos" : 20021, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index b9e8c68d9a3c8..e7ef52d9baa98 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -7581,7 +7581,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP List.of() ); - return planner.plan(FoldContext.small(), plan); + return planner.plan("test", FoldContext.small(), plan); } private List> findFieldNamesInLookupJoinDescription(LocalExecutionPlanner.LocalExecutionPlan physicalOperations) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index 50833fcd4586f..02c0f639f8e3c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -104,6 +104,7 @@ public void closeIndex() throws IOException { public void testLuceneSourceOperatorHugeRowSize() throws IOException { int estimatedRowSize = randomEstimatedRowSize(estimatedRowSizeIsHuge); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( + "test", FoldContext.small(), new EsQueryExec( Source.EMPTY, @@ -130,6 +131,7 @@ public void testLuceneTopNSourceOperator() throws IOException { EsQueryExec.FieldSort sort = new EsQueryExec.FieldSort(sortField, Order.OrderDirection.ASC, Order.NullsPosition.LAST); Literal limit = new Literal(Source.EMPTY, 10, DataType.INTEGER); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( + "test", FoldContext.small(), new EsQueryExec( Source.EMPTY, @@ -156,6 +158,7 @@ public void testLuceneTopNSourceOperatorDistanceSort() throws IOException { EsQueryExec.GeoDistanceSort sort = new EsQueryExec.GeoDistanceSort(sortField, Order.OrderDirection.ASC, 1, -1); Literal limit = new Literal(Source.EMPTY, 10, DataType.INTEGER); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( + "test", FoldContext.small(), new EsQueryExec( Source.EMPTY, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index 7db3216d1736d..f4deaa45f1f87 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.esql.plugin; +import com.carrotsearch.randomizedtesting.generators.RandomStrings; + import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; @@ -62,6 +64,7 @@ private List randomProfiles() { for (int i = 0; i < numProfiles; i++) { profiles.add( new DriverProfile( + RandomStrings.randomAsciiLettersOfLength(random(), 5), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),