From 15e2d5322dff3d8c6ee34384281436331a8b96f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 26 Jul 2022 11:08:15 +0200 Subject: [PATCH] Tracing: added OpenTelemetry integration test The test verifies that span tree structure and status code are valid. Speculative executions are run parallel to the main thread, so some of them can finish only after query result has been returned. Thus, in order to collect span data from entire request, we decided to wait until all speculative executions end. The main thread uses conditional variable `allEnded` to wait for them and lock is used for concurrent mutation of activeSpans. --- .../OpenTelemetryTracingInfo.java | 2 +- .../opentelemetry/OpenTelemetryTest.java | 871 ++++++++++++++++++ 2 files changed, 872 insertions(+), 1 deletion(-) create mode 100644 driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java diff --git a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java index a7ea44dba39..2c93ab4949e 100644 --- a/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java +++ b/driver-opentelemetry/src/main/java/com/datastax/driver/opentelemetry/OpenTelemetryTracingInfo.java @@ -162,7 +162,7 @@ public void setHasMorePages(boolean hasMorePages) { @Override public void setRowsCount(int rowsCount) { assertStarted(); - span.setAttribute("db.scylla.rows_count", rowsCount); + span.setAttribute("db.scylla.rows_count", Integer.toString(rowsCount)); } @Override diff --git a/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java new file mode 100644 index 00000000000..b9fdec8d31c --- /dev/null +++ b/driver-opentelemetry/src/test/java/com/datastax/driver/opentelemetry/OpenTelemetryTest.java @@ -0,0 +1,871 @@ +/* + * Copyright (C) 2021 ScyllaDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.driver.opentelemetry; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.CCMTestsSupport; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.SyntaxError; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.tracing.NoopTracingInfoFactory; +import com.datastax.driver.core.tracing.PrecisionLevel; +import com.datastax.driver.core.tracing.TracingInfoFactory; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.ReadWriteSpan; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.semconv.resource.attributes.ResourceAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import org.testng.annotations.Test; + +/** Tests for OpenTelemetry integration. */ +public class OpenTelemetryTest extends CCMTestsSupport { + /** Collects and saves spans. */ + private static final class BookkeepingSpanProcessor implements SpanProcessor { + final Lock lock = new ReentrantLock(); + final Condition allEnded = lock.newCondition(); + + final Collection startedSpans = new ArrayList<>(); + final Collection spans = new ArrayList<>(); + + int activeSpans = 0; + + @Override + public void onStart(Context parentContext, ReadWriteSpan span) { + lock.lock(); + + startedSpans.add(span); + ++activeSpans; + + lock.unlock(); + } + + @Override + public boolean isStartRequired() { + return true; + } + + @Override + public void onEnd(ReadableSpan span) { + lock.lock(); + + spans.add(span); + --activeSpans; + + if (activeSpans == 0) allEnded.signal(); + + lock.unlock(); + } + + @Override + public boolean isEndRequired() { + return true; + } + + public Collection getSpans() { + lock.lock(); + + try { + while (activeSpans > 0) allEnded.await(); + + for (ReadableSpan span : startedSpans) { + assertTrue(span.hasEnded()); + } + } catch (InterruptedException e) { + assert false; + } finally { + lock.unlock(); + } + + return spans; + } + } + + private Session session; + + /** + * Prepare OpenTelemetry configuration and run test with it. + * + * @param precisionLevel precision level of tracing for the tests. + * @param test test to run. + * @return collected spans. + */ + private Collection collectSpans( + PrecisionLevel precisionLevel, BiConsumer test) { + final Resource serviceNameResource = + Resource.create( + Attributes.of(ResourceAttributes.SERVICE_NAME, "Scylla Java driver - test")); + + final BookkeepingSpanProcessor collector = new BookkeepingSpanProcessor(); + + final SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(collector) + .setResource(Resource.getDefault().merge(serviceNameResource)) + .build(); + final OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build(); + + final Tracer tracer = openTelemetry.getTracerProvider().get("this"); + final OpenTelemetryTracingInfoFactory tracingInfoFactory = + new OpenTelemetryTracingInfoFactory(tracer, precisionLevel); + cluster().setTracingInfoFactory(tracingInfoFactory); + session = cluster().connect(); + + session.execute("USE " + keyspace); + session.execute("DROP TABLE IF EXISTS t"); + session.execute("CREATE TABLE t (k int PRIMARY KEY, v int)"); + session.execute("INSERT INTO t(k, v) VALUES (12, 3)"); + session.execute("INSERT INTO t(k, v) VALUES (1, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (5, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (6, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (7, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (8, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (9, 7)"); + session.execute("INSERT INTO t(k, v) VALUES (10, 7)"); + collector.getSpans().clear(); + + test.accept(tracer, tracingInfoFactory); + + tracerProvider.close(); + cluster().setTracingInfoFactory(new NoopTracingInfoFactory()); + + return collector.getSpans(); + } + + private Collection getChildrenOfSpans( + Collection allSpans, Collection parentSpans) { + return allSpans.stream() + .filter( + span -> + parentSpans.stream() + .filter( + parentSpan -> + parentSpan.getSpanContext().equals(span.getParentSpanContext())) + .findAny() + .isPresent()) + .collect(Collectors.toList()); + } + + /** Basic test for creating spans with INSERT statement. */ + @Test(groups = "short") + public void simpleTracingInsertTest() { + final Collection spans = + collectSpans( + PrecisionLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + session.execute("INSERT INTO t(k, v) VALUES (4, 2)"); + session.execute("INSERT INTO t(k, v) VALUES (2, 1)"); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 2); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.consistency_level")), "ONE"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylla.fetch_size"))); // was not set explicitly + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.db.operation"))); + // no such information with PrecisionLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.statement"))); + // no such information with operation INSERT: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + } + + /** Basic test for creating spans with UPDATE statement. */ + @Test(groups = "short") + public void simpleTracingUpdateTest() { + final Collection spans = + collectSpans( + PrecisionLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + final BatchStatement batch = new BatchStatement(); + batch.addAll( + new ArrayList() { + { + add("UPDATE t SET v=0 WHERE k=1"); + add("UPDATE t SET v=0 WHERE k=2"); + add("UPDATE t SET v=0 WHERE k=3"); + add("UPDATE t SET v=0 WHERE k=4"); + } + }.stream().map(SimpleStatement::new).collect(Collectors.toList())); + + session.execute(batch); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 1); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 1); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 1); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.consistency_level")), "ONE"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.statement_type")), "batch"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + // tags specific to BatchStatement + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.batch_size")), "4"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylla.fetch_size"))); // was not set explicitly + // no such information in BatchStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.db.operation"))); + // no such information with PrecisionLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.statement"))); + // no such information with operation UPDATE: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + } + + /** Basic test for creating spans with DELETE statement. */ + @Test(groups = "short") + public void simpleTracingDeleteTest() { + final Collection spans = + collectSpans( + PrecisionLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + final PreparedStatement ps = session.prepare("DELETE FROM t WHERE k=?"); + + session.execute(ps.bind(7)); + session.execute(ps.bind(8)); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter( + span -> + span.getParentSpanContext().equals(userSpan.getSpanContext()) + && span.toSpanData() + .getAttributes() + .get(AttributeKey.stringKey("db.scylla.statement_type")) + != null) // to exclude preparation spans + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 2); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.consistency_level")), "ONE"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.statement_type")), "prepared"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + // tags specific to PreparedStatement + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.keyspace")), keyspace); + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.table")), "t"); + String partitionKey = tags.get(AttributeKey.stringKey("db.scylla.partition_key")); + assertTrue(partitionKey.equals("k=7") || partitionKey.equals("k=8")); + assertNull( + tags.get( + AttributeKey.stringKey("db.scylla.db.operation"))); // not supported so far + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylla.fetch_size"))); // was not set explicitly + // no such information in BatchStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.batch_size"))); + // no such information with PrecisionLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.statement"))); + // no such information with operation DELETE: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + } + + /** Basic test for creating spans with TRUNCATE statement. */ + @Test(groups = "short") + public void simpleTracingTruncateTest() { + final Collection spans = + collectSpans( + PrecisionLevel.FULL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + session.execute("TRUNCATE t"); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 1); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 1); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 1); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.consistency_level")), "ONE"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + + assertNull( + tags.get( + AttributeKey.stringKey("db.scylla.fetch_size"))); // was not set explicitly + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.db.operation"))); + // present with PrecisionLevel.FULL: + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.statement")), "TRUNCATE t"); + // no such information with operation TRUNCATE: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.rows_count"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.has_more_pages"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + } + + /** Basic test for creating spans with SELECT statement. */ + @Test(groups = "short") + public void simpleTracingSelectTest() { + final Collection spans = + collectSpans( + PrecisionLevel.NORMAL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + SimpleStatement s = + new SimpleStatement("SELECT k FROM t WHERE v = 7 ALLOW FILTERING"); + s.setFetchSize(2); + s.setIdempotent(true); + s.setRetryPolicy(FallthroughRetryPolicy.INSTANCE); + s.setConsistencyLevel(ConsistencyLevel.QUORUM); + + session.execute(s).all(); + + scope.close(); + userSpan.end(); + }); + + // Retrieve the span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) // The root span has no parent. + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( // Each span is either the root span or has a valid parent span. + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 4); + + final Collection speculativeExecutionsSpans = + getChildrenOfSpans(spans, requestSpans); + assertEquals(speculativeExecutionsSpans.size(), 4); + + final Collection attemptSpans = + getChildrenOfSpans(spans, speculativeExecutionsSpans); + assertEquals(attemptSpans.size(), 4); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + // tags generic for any (reasonable) statement + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.consistency_level")), "QUORUM"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.fetch_size")), "2"); + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.statement_type")), "regular"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.load_balancing_policy")), + "PagingOptimizingLoadBalancingPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.speculative_execution_policy")), + "NoSpeculativeExecutionPolicy"); + assertEquals( + tags.get(AttributeKey.stringKey("db.scylla.retry_policy")), + "FallthroughRetryPolicy"); + + // tags specific for SELECT statement + final String hasMorePages = + tags.get(AttributeKey.stringKey("db.scylla.has_more_pages")); + final String rowsCount = tags.get(AttributeKey.stringKey("db.scylla.rows_count")); + assertTrue( + (hasMorePages.equals("false") && rowsCount.equals("1")) + || (hasMorePages.equals("true") && rowsCount.equals("2"))); + + // no such information in RegularStatement: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.batch_size"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.keyspace"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.table"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.partition_key"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.db.operation"))); + // no such information with PrecisionLevel.NORMAL: + assertNull(tags.get(AttributeKey.stringKey("db.scylla.statement"))); + // no such tags in "request" span: + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + + speculativeExecutionsSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "speculative_execution"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertEquals(tags.get(AttributeKey.stringKey("db.scylla.attempt_count")), "1"); + assertNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + + attemptSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "attempt"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.OK); + Attributes tags = spanData.getAttributes(); + + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.name"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.ip"))); + assertNotNull(tags.get(AttributeKey.stringKey("net.peer.port"))); + assertNotNull(tags.get(AttributeKey.stringKey("db.scylla.shard_id"))); + }); + } + + /** Basic test for creating spans with an erroneous statement. */ + @Test(groups = "short") + public void simpleRequestErrorTracingTest() { + final Collection spans = + collectSpans( + PrecisionLevel.FULL, + (tracer, tracingInfoFactory) -> { + Span userSpan = tracer.spanBuilder("user span").startSpan(); + Scope scope = userSpan.makeCurrent(); + + try { + session.execute("INSERT ONTO t(k, v) VALUES (4, 2)"); + // ^ syntax error here + assert false; // exception should be thrown before this line is executed + } catch (SyntaxError error) { + // pass + } + + try { + session.execute("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)"); + // ^ too many values + assert false; // exception should be thrown before this line is executed + } catch (InvalidQueryException error) { + // pass + } + + scope.close(); + userSpan.end(); + }); + + // Retrieve span created directly by tracer. + final List userSpans = + spans.stream() + .filter(span -> !span.getParentSpanContext().isValid()) + .collect(Collectors.toList()); + assertEquals(userSpans.size(), 1); + final ReadableSpan userSpan = userSpans.get(0); + + for (ReadableSpan span : spans) { + assertTrue(span.getSpanContext().isValid()); + assertTrue( + span.getSpanContext().equals(userSpan.getSpanContext()) + || span.getParentSpanContext().isValid()); + } + + // Retrieve spans representing requests. + final Collection requestSpans = + spans.stream() + .filter(span -> span.getParentSpanContext().equals(userSpan.getSpanContext())) + .collect(Collectors.toList()); + assertEquals(requestSpans.size(), 2); + + requestSpans.stream() + .map(ReadableSpan::toSpanData) + .forEach( + spanData -> { + assertEquals(spanData.getName(), "request"); + assertEquals(spanData.getStatus().getStatusCode(), StatusCode.ERROR); + final String collectedStatement = + spanData.getAttributes().get(AttributeKey.stringKey("db.scylla.statement")); + assert collectedStatement.equals("INSERT INTO t(k, v) VALUES (2, 1, 3, 7)") + || collectedStatement.equals("INSERT ONTO t(k, v) VALUES (4, 2)") + : "Bad statement gathered"; + }); + } +}