Skip to content

Commit fa41484

Browse files
authored
otel_apm_service_map: Added support for deriving remote service and remote operation (opensearch-project#6539)
* otel_apm_service_map: Added support for deriving remote service and remote operation Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Fixed license header check failures Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent 8e09a2a commit fa41484

File tree

15 files changed

+957
-28
lines changed

15 files changed

+957
-28
lines changed

data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/model/internal/SpanStateData.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.Map;
2020
import java.util.Objects;
2121

22-
// TODO : 1. Add new rules as per Producer/Consumers/LocalRoot
2322
@Getter
2423
public class SpanStateData implements Serializable {
2524
private String serviceName;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.common;
12+
13+
import java.io.BufferedReader;
14+
import java.io.IOException;
15+
import java.io.InputStream;
16+
import java.io.InputStreamReader;
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
class AwsServiceMappingsProvider implements ServiceMappingsProvider {
24+
private static final Logger LOG = LoggerFactory.getLogger(AwsServiceMappingsProvider.class);
25+
private static final String SERVICE_MAPPINGS_FILE = "aws_service_mappings";
26+
private Map<String, String> serviceMappings;
27+
28+
public AwsServiceMappingsProvider() {
29+
try (
30+
final InputStream inputStream = getClass().getClassLoader().getResourceAsStream(SERVICE_MAPPINGS_FILE);
31+
) {
32+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
33+
serviceMappings = reader.lines()
34+
.filter(line -> line.contains(","))
35+
.collect(Collectors.toMap(
36+
line -> line.split(",", 2)[0].trim(),
37+
line -> line.split(",", 2)[1].trim(),
38+
(oldValue, newValue) -> newValue
39+
));
40+
} catch (IOException e) {
41+
throw e;
42+
}
43+
} catch (final Exception e) {
44+
LOG.error("An exception occurred while initializing service mappings for Data Prepper", e);
45+
serviceMappings = null;
46+
}
47+
}
48+
49+
public Map<String, String> getServiceMappings() {
50+
return serviceMappings;
51+
}
52+
53+
}

data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/common/OTelSpanDerivationUtil.java

Lines changed: 122 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,14 @@
1414
import org.slf4j.Logger;
1515
import org.slf4j.LoggerFactory;
1616

17+
import java.net.URL;
18+
import java.net.MalformedURLException;
1719
import java.util.HashMap;
1820
import java.util.List;
1921
import java.util.Map;
2022

2123
import static org.opensearch.dataprepper.model.metric.JacksonMetric.ATTRIBUTES_KEY;
24+
import io.opentelemetry.proto.trace.v1.Span.SpanKind;
2225

2326
/**
2427
* Utility class for deriving fault, error, operation, and environment attributes from OpenTelemetry spans.
@@ -33,8 +36,9 @@ public class OTelSpanDerivationUtil {
3336
public static final String DERIVED_ERROR_ATTRIBUTE = "derived.error";
3437
public static final String DERIVED_OPERATION_ATTRIBUTE = "derived.operation";
3538
public static final String DERIVED_ENVIRONMENT_ATTRIBUTE = "derived.environment";
39+
public static final String DERIVED_REMOTE_SERVICE_ATTRIBUTE = "derived.remote_service";
3640
private static final Logger LOG = LoggerFactory.getLogger(OTelSpanDerivationUtil.class);
37-
private static final String SPAN_KIND_SERVER = "SPAN_KIND_SERVER";
41+
private static final String SERVICE_MAPPINGS_FILE = "service_mappings";
3842

3943
/**
4044
* Derives fault, error, operation, and environment attributes for SERVER spans in the provided list.
@@ -48,12 +52,23 @@ public static void deriveServerSpanAttributes(final List<Span> spans) {
4852
}
4953

5054
for (final Span span : spans) {
51-
if (span != null && SPAN_KIND_SERVER.equals(span.getKind())) {
55+
if (span != null && isServerSpan(span)) {
5256
deriveAttributesForSpan(span);
5357
}
5458
}
5559
}
5660

61+
private static boolean isServerSpan(Span span) {
62+
return SpanKind.SPAN_KIND_SERVER.name().equals(span.getKind());
63+
}
64+
65+
private static boolean isClientSpan(Span span) {
66+
return SpanKind.SPAN_KIND_CLIENT.name().equals(span.getKind());
67+
}
68+
69+
private static boolean isProducerSpan(Span span) {
70+
return SpanKind.SPAN_KIND_PRODUCER.name().equals(span.getKind());
71+
}
5772

5873
/**
5974
* Adds an attribute to the span. This method just delegates to span.putAttribute()
@@ -85,14 +100,23 @@ public static void deriveAttributesForSpan(final Span span) {
85100

86101
final ErrorFaultResult errorFault = computeErrorAndFault(span.getStatus(), spanAttributes);
87102

88-
final String operationName = computeOperationName(span.getName(), spanAttributes);
103+
String operationName = null;
104+
if (isServerSpan(span)) {
105+
operationName = computeOperationName(span.getName(), spanAttributes);
106+
} else if (isClientSpan(span) || isProducerSpan(span)) {
107+
final RemoteOperationAndService remoteOperationAndService = computeRemoteOperationAndService(spanAttributes);
108+
operationName = remoteOperationAndService.getOperation();
109+
putAttribute(span, DERIVED_REMOTE_SERVICE_ATTRIBUTE, remoteOperationAndService.getService());
110+
}
89111

90112
final String environment = computeEnvironment(spanAttributes);
91113

92114
// Add derived attributes using our safe attribute setting method
93115
putAttribute(span, DERIVED_FAULT_ATTRIBUTE, String.valueOf(errorFault.fault));
94116
putAttribute(span, DERIVED_ERROR_ATTRIBUTE, String.valueOf(errorFault.error));
95-
putAttribute(span, DERIVED_OPERATION_ATTRIBUTE, operationName);
117+
if (operationName != null) {
118+
putAttribute(span, DERIVED_OPERATION_ATTRIBUTE, operationName);
119+
}
96120
putAttribute(span, DERIVED_ENVIRONMENT_ATTRIBUTE, environment);
97121

98122
LOG.debug("Derived attributes for SERVER span {}: fault={}, error={}, operation={}, environment={}",
@@ -186,6 +210,15 @@ private static boolean isSpanStatusError(final Object spanStatus) {
186210
return "ERROR".equalsIgnoreCase(statusString) ||
187211
"2".equals(statusString) ||
188212
statusString.toLowerCase().contains("error");
213+
214+
}
215+
216+
private static String extractFirstPathFromUrl(final String url) {
217+
int colonDoubleSlash = url.indexOf("://");
218+
int firstSlash = url.indexOf("/", colonDoubleSlash+3);
219+
int secondSlash = url.indexOf("/", firstSlash+1);
220+
String result= (secondSlash > 0) ? url.substring(firstSlash, secondSlash) : url.substring(firstSlash);
221+
return result;
189222
}
190223

191224
/**
@@ -231,6 +264,87 @@ public static String computeOperationName(final String spanName, final Map<Strin
231264
return spanName != null ? spanName : "UnknownOperation";
232265
}
233266

267+
public static RemoteOperationAndService computeRemoteOperationAndService(final Map<String, Object> spanAttributes) {
268+
OTelSpanDerivationUtil oTelSpanDerivationUtil = new OTelSpanDerivationUtil();
269+
Map<String, String> awsServiceMappings = (new AwsServiceMappingsProvider()).getServiceMappings();
270+
RemoteOperationAndServiceProviders remoteOperationAndServiceProviders = new RemoteOperationAndServiceProviders();
271+
ServiceAddressPortAttributesProvider serviceAddressPortAttributesProvider = new ServiceAddressPortAttributesProvider();
272+
List<ServiceAddressPortAttributesProvider.AddressPortAttributeKeys> addressPortAttributeKeysList = serviceAddressPortAttributesProvider.getAddressPortAttributeKeysList();
273+
274+
RemoteOperationAndService remoteOperationAndService = new RemoteOperationAndService(null, null);
275+
if (remoteOperationAndServiceProviders.AwsRpcRemoteOperationServiceExtractor.appliesToSpan(spanAttributes)) {
276+
remoteOperationAndService = remoteOperationAndServiceProviders.AwsRpcRemoteOperationServiceExtractor.getRemoteOperationAndService(spanAttributes, awsServiceMappings);
277+
}
278+
if (remoteOperationAndServiceProviders.DbRemoteOperationServiceExtractor.appliesToSpan(spanAttributes)) {
279+
remoteOperationAndService = remoteOperationAndServiceProviders.DbRemoteOperationServiceExtractor.getRemoteOperationAndService(spanAttributes, null);
280+
}
281+
if (remoteOperationAndServiceProviders.DbQueryRemoteOperationServiceExtractor.appliesToSpan(spanAttributes)) {
282+
remoteOperationAndService = remoteOperationAndServiceProviders.DbQueryRemoteOperationServiceExtractor.getRemoteOperationAndService(spanAttributes, null);
283+
}
284+
if (remoteOperationAndServiceProviders.FaasRemoteOperationServiceExtractor.appliesToSpan(spanAttributes)) {
285+
remoteOperationAndService = remoteOperationAndServiceProviders.FaasRemoteOperationServiceExtractor.getRemoteOperationAndService(spanAttributes, null);
286+
}
287+
if (remoteOperationAndServiceProviders.MessagingSystemRemoteOperationServiceExtractor.appliesToSpan(spanAttributes)) {
288+
remoteOperationAndService = remoteOperationAndServiceProviders.MessagingSystemRemoteOperationServiceExtractor.getRemoteOperationAndService(spanAttributes, null);
289+
}
290+
if (remoteOperationAndServiceProviders.GraphQlRemoteOperationServiceExtractor.appliesToSpan(spanAttributes)) {
291+
remoteOperationAndService = remoteOperationAndServiceProviders.GraphQlRemoteOperationServiceExtractor.getRemoteOperationAndService(spanAttributes, null);
292+
}
293+
if (remoteOperationAndServiceProviders.AwsRpcRemoteOperationServiceExtractor.appliesToSpan(spanAttributes)) {
294+
remoteOperationAndService = remoteOperationAndServiceProviders.AwsRpcRemoteOperationServiceExtractor.getRemoteOperationAndService(spanAttributes, awsServiceMappings);
295+
}
296+
if (remoteOperationAndServiceProviders.PeerServiceRemoteOperationServiceExtractor.appliesToSpan(spanAttributes)) {
297+
remoteOperationAndService = remoteOperationAndServiceProviders.PeerServiceRemoteOperationServiceExtractor.getRemoteOperationAndService(spanAttributes, null);
298+
}
299+
300+
if (!remoteOperationAndService.hasNullValues()) {
301+
return remoteOperationAndService;
302+
}
303+
304+
// Fallback: derive from URL or network attributes
305+
final String urlString = getStringAttribute(spanAttributes, "url.full") != null
306+
? getStringAttribute(spanAttributes, "url.full")
307+
: getStringAttribute(spanAttributes, "http.url");
308+
309+
String remoteOperation = remoteOperationAndService.getOperation();
310+
String remoteService = remoteOperationAndService.getService();
311+
if (remoteService == null) {
312+
remoteService = deriveServiceFromNetwork(spanAttributes, urlString, addressPortAttributeKeysList);
313+
}
314+
315+
if (remoteOperation == null && urlString != null) {
316+
final String httpMethod = getStringAttribute(spanAttributes, "http.request.method") != null
317+
? getStringAttribute(spanAttributes, "http.request.method")
318+
: getStringAttribute(spanAttributes, "http.method");
319+
remoteOperation = httpMethod != null ? httpMethod + " " + extractFirstPathFromUrl(urlString) : urlString;
320+
}
321+
322+
return new RemoteOperationAndService(
323+
remoteOperation != null ? remoteOperation : "UnknownRemoteOperation",
324+
remoteService != null ? remoteService : "UnknownRemoteService");
325+
}
326+
327+
private static String deriveServiceFromNetwork(final Map<String, Object> spanAttributes, final String urlString, final List<ServiceAddressPortAttributesProvider.AddressPortAttributeKeys> addressPortAttributeKeysList) {
328+
329+
for (ServiceAddressPortAttributesProvider.AddressPortAttributeKeys addressPortAttributeKeys : addressPortAttributeKeysList) {
330+
final String address = getStringAttribute(spanAttributes, addressPortAttributeKeys.getAddress());
331+
if (address != null) {
332+
final String port = getStringAttribute(spanAttributes, addressPortAttributeKeys.getPort());
333+
return port != null ? address + ":" + port : address;
334+
}
335+
}
336+
337+
if (urlString != null) {
338+
try {
339+
final URL url = new URL(urlString);
340+
final int port = url.getPort() == -1 ? url.getDefaultPort() : url.getPort();
341+
return url.getHost() + ":" + port;
342+
} catch (MalformedURLException ignored) {}
343+
}
344+
345+
return null;
346+
}
347+
234348
/**
235349
* Compute environment from resource attributes.
236350
* Package-private for testing purposes only.
@@ -239,29 +353,11 @@ public static String computeOperationName(final String spanName, final Map<Strin
239353
* @return Computed environment string
240354
*/
241355
public static String computeEnvironment(final Map<String, Object> spanAttributes) {
242-
try {
243-
// Navigate: spanAttributes -> "resource" -> "attributes" -> deployment keys
244-
@SuppressWarnings("unchecked")
245-
Map<String, Object> resourceAttrs = (Map<String, Object>)
246-
((Map<String, Object>) spanAttributes.get("resource")).get("attributes");
247-
248-
// Extract from resource.attributes.deployment.environment.name
249-
String env = getStringAttribute(resourceAttrs, "deployment.environment.name");
250-
if (env != null && !env.trim().isEmpty()) {
251-
return env;
252-
}
253-
254-
// Fall back to resource.attributes.deployment.environment
255-
env = getStringAttribute(resourceAttrs, "deployment.environment");
256-
if (env != null && !env.trim().isEmpty()) {
257-
return env;
258-
}
259-
} catch (Exception ignored) {
260-
// Any navigation failure falls through to default
356+
String env = ServiceEnvironmentProviders.getAwsServiceEnvironment(spanAttributes);
357+
if (env == null) {
358+
env = ServiceEnvironmentProviders.getDeploymentEnvironment(spanAttributes);
261359
}
262-
263-
// Default: 'generic:default'
264-
return "generic:default";
360+
return env;
265361
}
266362

267363
/**
@@ -290,7 +386,6 @@ static String getStringAttributeFromMap(final Map<String, Object> map, final Str
290386
return getStringAttribute(map, key);
291387
}
292388

293-
294389
/**
295390
* Simple data class to hold error and fault computation results.
296391
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.otel.common;
12+
13+
class RemoteOperationAndService {
14+
final String remoteService;
15+
final String remoteOperation;
16+
public RemoteOperationAndService(final String remoteOperation, final String remoteService) {
17+
this.remoteOperation = remoteOperation;
18+
this.remoteService = remoteService;
19+
}
20+
21+
public String getOperation() {
22+
return remoteOperation;
23+
}
24+
25+
public String getService() {
26+
return remoteService;
27+
}
28+
29+
public boolean hasNullValues() {
30+
return remoteOperation == null || remoteService == null;
31+
}
32+
}
33+

0 commit comments

Comments
 (0)