Skip to content

Commit 2ae01a7

Browse files
[otlp] Add Log and Mertic Exporter to transmit custom serialized data (#5977)
Co-authored-by: Mikel Blanchard <[email protected]>
1 parent 1e7397e commit 2ae01a7

9 files changed

+286
-10
lines changed

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogFieldNumberConstants.cs

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer
55

66
internal static class ProtobufOtlpLogFieldNumberConstants
77
{
8-
// Resource Logs
98
#pragma warning disable SA1310 // Field names should not contain underscore
9+
10+
// Logs data
11+
internal const int LogsData_Resource_Logs = 1;
12+
13+
// Resource Logs
1014
internal const int ResourceLogs_Resource = 1;
1115
internal const int ResourceLogs_Scope_Logs = 2;
1216
internal const int ResourceLogs_Schema_Url = 3;

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpLogSerializer.cs

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ internal static class ProtobufOtlpLogSerializer
2121

2222
internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOptions sdkLimitOptions, ExperimentalOptions experimentalOptions, Resources.Resource? resource, in Batch<LogRecord> logRecordBatch)
2323
{
24+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpLogFieldNumberConstants.LogsData_Resource_Logs, ProtobufWireType.LEN);
25+
int logsDataLengthPosition = writePosition;
26+
writePosition += ReserveSizeForLength;
27+
2428
foreach (var logRecord in logRecordBatch)
2529
{
2630
var scopeName = logRecord.Logger.Name;
@@ -34,6 +38,7 @@ internal static int WriteLogsData(byte[] buffer, int writePosition, SdkLimitOpti
3438
}
3539

3640
writePosition = WriteResourceLogs(buffer, writePosition, sdkLimitOptions, experimentalOptions, resource, ScopeLogsList);
41+
ProtobufSerializer.WriteReservedLength(buffer, logsDataLengthPosition, writePosition - (logsDataLengthPosition + ReserveSizeForLength));
3742
ReturnLogRecordListToPool();
3843

3944
return writePosition;

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/Serializer/ProtobufOtlpMetricSerializer.cs

+5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ internal static class ProtobufOtlpMetricSerializer
1919

2020
internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources.Resource? resource, in Batch<Metric> batch)
2121
{
22+
writePosition = ProtobufSerializer.WriteTag(buffer, writePosition, ProtobufOtlpMetricFieldNumberConstants.MetricsData_Resource_Metrics, ProtobufWireType.LEN);
23+
int mericsDataLengthPosition = writePosition;
24+
writePosition += ReserveSizeForLength;
25+
2226
foreach (var metric in batch)
2327
{
2428
var metricName = metric.MeterName;
@@ -32,6 +36,7 @@ internal static int WriteMetricsData(byte[] buffer, int writePosition, Resources
3236
}
3337

3438
writePosition = WriteResourceMetrics(buffer, writePosition, resource, ScopeMetricsList);
39+
ProtobufSerializer.WriteReservedLength(buffer, mericsDataLengthPosition, writePosition - (mericsDataLengthPosition + ReserveSizeForLength));
3540
ReturnMetricListToPool();
3641

3742
return writePosition;

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpLogExporterHelperExtensions.cs

+10-4
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,16 @@ internal static BaseProcessor<LogRecord> BuildOtlpLogExporter(
305305
* "OtlpLogExporter");
306306
*/
307307

308-
BaseExporter<LogRecord> otlpExporter = new OtlpLogExporter(
309-
exporterOptions!,
310-
sdkLimitOptions!,
311-
experimentalOptions!);
308+
BaseExporter<LogRecord> otlpExporter;
309+
310+
if (experimentalOptions != null && experimentalOptions.UseCustomProtobufSerializer)
311+
{
312+
otlpExporter = new ProtobufOtlpLogExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!);
313+
}
314+
else
315+
{
316+
otlpExporter = new OtlpLogExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!);
317+
}
312318

313319
if (configureExporterInstance != null)
314320
{

src/OpenTelemetry.Exporter.OpenTelemetryProtocol/OtlpMetricExporterExtensions.cs

+10-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,16 @@ internal static MetricReader BuildOtlpExporterMetricReader(
175175

176176
exporterOptions!.TryEnableIHttpClientFactoryIntegration(serviceProvider!, "OtlpMetricExporter");
177177

178-
BaseExporter<Metric> metricExporter = new OtlpMetricExporter(exporterOptions!, experimentalOptions!);
178+
BaseExporter<Metric> metricExporter;
179+
180+
if (experimentalOptions != null && experimentalOptions.UseCustomProtobufSerializer)
181+
{
182+
metricExporter = new ProtobufOtlpMetricExporter(exporterOptions!, experimentalOptions!);
183+
}
184+
else
185+
{
186+
metricExporter = new OtlpMetricExporter(exporterOptions!, experimentalOptions!);
187+
}
179188

180189
if (configureExporterInstance != null)
181190
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System.Buffers.Binary;
5+
using System.Diagnostics;
6+
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
7+
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;
8+
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
9+
using OpenTelemetry.Logs;
10+
using OpenTelemetry.Resources;
11+
12+
namespace OpenTelemetry.Exporter;
13+
14+
/// <summary>
15+
/// Exporter consuming <see cref="LogRecord"/> and exporting the data using
16+
/// the OpenTelemetry protocol (OTLP).
17+
/// </summary>
18+
internal sealed class ProtobufOtlpLogExporter : BaseExporter<LogRecord>
19+
{
20+
private readonly SdkLimitOptions sdkLimitOptions;
21+
private readonly ExperimentalOptions experimentalOptions;
22+
private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler;
23+
private readonly int startWritePosition;
24+
25+
private Resource? resource;
26+
27+
// Initial buffer size set to ~732KB.
28+
// This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB,
29+
// by the 7th doubling to maintain efficient allocation without frequent resizing.
30+
private byte[] buffer = new byte[750000];
31+
32+
/// <summary>
33+
/// Initializes a new instance of the <see cref="ProtobufOtlpLogExporter"/> class.
34+
/// </summary>
35+
/// <param name="options">Configuration options for the exporter.</param>
36+
public ProtobufOtlpLogExporter(OtlpExporterOptions options)
37+
: this(options, sdkLimitOptions: new(), experimentalOptions: new(), transmissionHandler: null)
38+
{
39+
}
40+
41+
/// <summary>
42+
/// Initializes a new instance of the <see cref="ProtobufOtlpLogExporter"/> class.
43+
/// </summary>
44+
/// <param name="exporterOptions"><see cref="OtlpExporterOptions"/>.</param>
45+
/// <param name="sdkLimitOptions"><see cref="SdkLimitOptions"/>.</param>
46+
/// <param name="experimentalOptions"><see cref="ExperimentalOptions"/>.</param>
47+
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
48+
internal ProtobufOtlpLogExporter(
49+
OtlpExporterOptions exporterOptions,
50+
SdkLimitOptions sdkLimitOptions,
51+
ExperimentalOptions experimentalOptions,
52+
ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null)
53+
{
54+
Debug.Assert(exporterOptions != null, "exporterOptions was null");
55+
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");
56+
Debug.Assert(experimentalOptions != null, "experimentalOptions was null");
57+
58+
this.experimentalOptions = experimentalOptions!;
59+
this.sdkLimitOptions = sdkLimitOptions!;
60+
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
61+
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!);
62+
}
63+
64+
internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
65+
66+
/// <inheritdoc/>
67+
public override ExportResult Export(in Batch<LogRecord> logRecordBatch)
68+
{
69+
// Prevents the exporter's gRPC and HTTP operations from being instrumented.
70+
using var scope = SuppressInstrumentationScope.Begin();
71+
72+
try
73+
{
74+
int writePosition = ProtobufOtlpLogSerializer.WriteLogsData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.experimentalOptions, this.Resource, logRecordBatch);
75+
76+
if (this.startWritePosition == 5)
77+
{
78+
// Grpc payload consists of 3 parts
79+
// byte 0 - Specifying if the payload is compressed.
80+
// 1-4 byte - Specifies the length of payload in big endian format.
81+
// 5 and above - Protobuf serialized data.
82+
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
83+
var dataLength = writePosition - 5;
84+
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
85+
}
86+
87+
if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition))
88+
{
89+
return ExportResult.Failure;
90+
}
91+
}
92+
catch (IndexOutOfRangeException)
93+
{
94+
if (!this.IncreaseBufferSize())
95+
{
96+
throw;
97+
}
98+
}
99+
catch (Exception ex)
100+
{
101+
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
102+
return ExportResult.Failure;
103+
}
104+
105+
return ExportResult.Success;
106+
}
107+
108+
/// <inheritdoc />
109+
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler?.Shutdown(timeoutMilliseconds) ?? true;
110+
111+
// TODO: Consider moving this to a shared utility class.
112+
private bool IncreaseBufferSize()
113+
{
114+
var newBufferSize = this.buffer.Length * 2;
115+
116+
if (newBufferSize > 100 * 1024 * 1024)
117+
{
118+
return false;
119+
}
120+
121+
var newBuffer = new byte[newBufferSize];
122+
this.buffer.CopyTo(newBuffer, 0);
123+
this.buffer = newBuffer;
124+
125+
return true;
126+
}
127+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System.Buffers.Binary;
5+
using System.Diagnostics;
6+
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
7+
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;
8+
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
9+
using OpenTelemetry.Metrics;
10+
using OpenTelemetry.Resources;
11+
12+
namespace OpenTelemetry.Exporter;
13+
14+
/// <summary>
15+
/// Exporter consuming <see cref="Metric"/> and exporting the data using
16+
/// the OpenTelemetry protocol (OTLP).
17+
/// </summary>
18+
internal sealed class ProtobufOtlpMetricExporter : BaseExporter<Metric>
19+
{
20+
private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler;
21+
private readonly int startWritePosition;
22+
23+
private Resource? resource;
24+
25+
// Initial buffer size set to ~732KB.
26+
// This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB,
27+
// by the 7th doubling to maintain efficient allocation without frequent resizing.
28+
private byte[] buffer = new byte[750000];
29+
30+
/// <summary>
31+
/// Initializes a new instance of the <see cref="ProtobufOtlpMetricExporter"/> class.
32+
/// </summary>
33+
/// <param name="options">Configuration options for the exporter.</param>
34+
public ProtobufOtlpMetricExporter(OtlpExporterOptions options)
35+
: this(options, experimentalOptions: new(), transmissionHandler: null)
36+
{
37+
}
38+
39+
/// <summary>
40+
/// Initializes a new instance of the <see cref="ProtobufOtlpMetricExporter"/> class.
41+
/// </summary>
42+
/// <param name="exporterOptions"><see cref="OtlpExporterOptions"/>.</param>
43+
/// <param name="experimentalOptions"><see cref="ExperimentalOptions"/>.</param>
44+
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
45+
internal ProtobufOtlpMetricExporter(
46+
OtlpExporterOptions exporterOptions,
47+
ExperimentalOptions experimentalOptions,
48+
ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null)
49+
{
50+
Debug.Assert(exporterOptions != null, "exporterOptions was null");
51+
Debug.Assert(experimentalOptions != null, "experimentalOptions was null");
52+
53+
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
54+
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions!);
55+
}
56+
57+
internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
58+
59+
/// <inheritdoc />
60+
public override ExportResult Export(in Batch<Metric> metrics)
61+
{
62+
// Prevents the exporter's gRPC and HTTP operations from being instrumented.
63+
using var scope = SuppressInstrumentationScope.Begin();
64+
65+
try
66+
{
67+
int writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(this.buffer, this.startWritePosition, this.Resource, metrics);
68+
69+
if (this.startWritePosition == 5)
70+
{
71+
// Grpc payload consists of 3 parts
72+
// byte 0 - Specifying if the payload is compressed.
73+
// 1-4 byte - Specifies the length of payload in big endian format.
74+
// 5 and above - Protobuf serialized data.
75+
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
76+
var dataLength = writePosition - 5;
77+
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
78+
}
79+
80+
if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition))
81+
{
82+
return ExportResult.Failure;
83+
}
84+
}
85+
catch (IndexOutOfRangeException)
86+
{
87+
if (!this.IncreaseBufferSize())
88+
{
89+
throw;
90+
}
91+
}
92+
catch (Exception ex)
93+
{
94+
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
95+
return ExportResult.Failure;
96+
}
97+
98+
return ExportResult.Success;
99+
}
100+
101+
/// <inheritdoc />
102+
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler.Shutdown(timeoutMilliseconds);
103+
104+
// TODO: Consider moving this to a shared utility class.
105+
private bool IncreaseBufferSize()
106+
{
107+
var newBufferSize = this.buffer.Length * 2;
108+
109+
if (newBufferSize > 100 * 1024 * 1024)
110+
{
111+
return false;
112+
}
113+
114+
var newBuffer = new byte[newBufferSize];
115+
this.buffer.CopyTo(newBuffer, 0);
116+
this.buffer = newBuffer;
117+
118+
return true;
119+
}
120+
}

test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpLogExporterTests.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1888,9 +1888,9 @@ private static OtlpCollector.ExportLogsServiceRequest CreateLogsExportRequest(Sd
18881888
var buffer = new byte[4096];
18891889
var writePosition = ProtobufOtlpLogSerializer.WriteLogsData(buffer, 0, sdkOptions, experimentalOptions, resource, batch);
18901890
using var stream = new MemoryStream(buffer, 0, writePosition);
1891-
var logsData = OtlpLogs.ResourceLogs.Parser.ParseFrom(stream);
1891+
var logsData = OtlpLogs.LogsData.Parser.ParseFrom(stream);
18921892
var request = new OtlpCollector.ExportLogsServiceRequest();
1893-
request.ResourceLogs.Add(logsData);
1893+
request.ResourceLogs.Add(logsData.ResourceLogs);
18941894
return request;
18951895
}
18961896

test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/OtlpMetricsExporterTests.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1070,10 +1070,10 @@ private static OtlpCollector.ExportMetricsServiceRequest CreateMetricExportReque
10701070
var writePosition = ProtobufOtlpMetricSerializer.WriteMetricsData(buffer, 0, resource, in batch);
10711071
using var stream = new MemoryStream(buffer, 0, writePosition);
10721072

1073-
var metricsData = OtlpMetrics.ResourceMetrics.Parser.ParseFrom(stream);
1073+
var metricsData = OtlpMetrics.MetricsData.Parser.ParseFrom(stream);
10741074

10751075
var request = new OtlpCollector.ExportMetricsServiceRequest();
1076-
request.ResourceMetrics.Add(metricsData);
1076+
request.ResourceMetrics.Add(metricsData.ResourceMetrics);
10771077
return request;
10781078
}
10791079
}

0 commit comments

Comments
 (0)