Skip to content

Commit c8a097d

Browse files
[logs] Fix pooling issues when wrapping batch export processor (#5255)
Co-authored-by: Cijo Thomas <[email protected]>
1 parent 98f6873 commit c8a097d

9 files changed

+144
-27
lines changed

src/OpenTelemetry/Batch.cs

+16-3
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,11 @@ public void Dispose()
7171
T item = this.circularBuffer.Read();
7272
if (typeof(T) == typeof(LogRecord))
7373
{
74-
LogRecordSharedPool.Current.Return((LogRecord)(object)item);
74+
var logRecord = (LogRecord)(object)item;
75+
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
76+
{
77+
LogRecordSharedPool.Current.Return(logRecord);
78+
}
7579
}
7680
}
7781
}
@@ -134,7 +138,11 @@ public struct Enumerator : IEnumerator<T>
134138

135139
if (currentItem != null)
136140
{
137-
LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem);
141+
var logRecord = (LogRecord)(object)currentItem;
142+
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
143+
{
144+
LogRecordSharedPool.Current.Return(logRecord);
145+
}
138146
}
139147

140148
if (circularBuffer!.RemovedCount < enumerator.targetCount)
@@ -215,7 +223,12 @@ public void Dispose()
215223
var currentItem = this.current;
216224
if (currentItem != null)
217225
{
218-
LogRecordSharedPool.Current.Return((LogRecord)(object)currentItem);
226+
var logRecord = (LogRecord)(object)currentItem;
227+
if (logRecord.Source == LogRecord.LogRecordSource.FromSharedPool)
228+
{
229+
LogRecordSharedPool.Current.Return(logRecord);
230+
}
231+
219232
this.current = null;
220233
}
221234
}

src/OpenTelemetry/CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@
1313
state for cumulative temporality.
1414
[#5230](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5230)
1515

16+
* Fixed an issue causing `LogRecord`s to be incorrectly reused when wrapping an
17+
instance of `BatchLogRecordExportProcessor` inside another
18+
`BaseProcessor<LogRecord>` which leads to missing or incorrect data during
19+
export.
20+
[#5255](https://github.com/open-telemetry/opentelemetry-dotnet/pull/5255)
21+
1622
## 1.7.0
1723

1824
Released 2023-Dec-08

src/OpenTelemetry/Logs/BatchLogRecordExportProcessor.cs

+19-5
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,27 @@ public override void OnEnd(LogRecord data)
4242
// happen here.
4343
Debug.Assert(data != null, "LogRecord was null.");
4444

45-
data!.Buffer();
45+
switch (data!.Source)
46+
{
47+
case LogRecord.LogRecordSource.FromSharedPool:
48+
data.Buffer();
49+
data.AddReference();
50+
if (!this.TryExport(data))
51+
{
52+
LogRecordSharedPool.Current.Return(data);
53+
}
4654

47-
data.AddReference();
55+
break;
56+
case LogRecord.LogRecordSource.CreatedManually:
57+
data.Buffer();
58+
this.TryExport(data);
59+
break;
60+
default:
61+
Debug.Assert(data.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "LogRecord source was something unexpected");
4862

49-
if (!this.TryExport(data))
50-
{
51-
LogRecordSharedPool.Current.Return(data);
63+
// Note: If we are using ThreadStatic pool we make a copy of the record.
64+
this.TryExport(data.Copy());
65+
break;
5266
}
5367
}
5468
}

src/OpenTelemetry/Logs/LogRecord.cs

+19
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public sealed class LogRecord
2121
internal IReadOnlyList<KeyValuePair<string, object?>>? AttributeData;
2222
internal List<KeyValuePair<string, object?>>? AttributeStorage;
2323
internal List<object?>? ScopeStorage;
24+
internal LogRecordSource Source = LogRecordSource.CreatedManually;
2425
internal int PoolReferenceCount = int.MaxValue;
2526

2627
private static readonly Action<object?, List<object?>> AddScopeToBufferedList = (object? scope, List<object?> state) =>
@@ -80,6 +81,24 @@ internal LogRecord(
8081
}
8182
}
8283

84+
internal enum LogRecordSource
85+
{
86+
/// <summary>
87+
/// A <see cref="LogRecord"/> created manually.
88+
/// </summary>
89+
CreatedManually,
90+
91+
/// <summary>
92+
/// A <see cref="LogRecord"/> rented from the <see cref="LogRecordThreadStaticPool"/>.
93+
/// </summary>
94+
FromThreadStaticPool,
95+
96+
/// <summary>
97+
/// A <see cref="LogRecord"/> rented from the <see cref="LogRecordSharedPool"/>.
98+
/// </summary>
99+
FromSharedPool,
100+
}
101+
83102
/// <summary>
84103
/// Gets or sets the log timestamp.
85104
/// </summary>

src/OpenTelemetry/Logs/Pool/LogRecordSharedPool.cs

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4+
using System.Diagnostics;
45
using System.Diagnostics.CodeAnalysis;
56
using OpenTelemetry.Internal;
67

@@ -17,7 +18,7 @@ internal sealed class LogRecordSharedPool : ILogRecordPool
1718
private long rentIndex;
1819
private long returnIndex;
1920

20-
public LogRecordSharedPool(int capacity)
21+
private LogRecordSharedPool(int capacity)
2122
{
2223
this.Capacity = capacity;
2324
this.pool = new LogRecord?[capacity];
@@ -54,18 +55,24 @@ public LogRecord Rent()
5455
continue;
5556
}
5657

58+
Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromSharedPool, "logRecord.Source was not FromSharedPool");
5759
logRecord.ResetReferenceCount();
5860
return logRecord;
5961
}
6062
}
6163

62-
var newLogRecord = new LogRecord();
64+
var newLogRecord = new LogRecord()
65+
{
66+
Source = LogRecord.LogRecordSource.FromSharedPool,
67+
};
6368
newLogRecord.ResetReferenceCount();
6469
return newLogRecord;
6570
}
6671

6772
public void Return(LogRecord logRecord)
6873
{
74+
Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromSharedPool, "logRecord.Source was not FromSharedPool");
75+
6976
if (logRecord.RemoveReference() != 0)
7077
{
7178
return;

src/OpenTelemetry/Logs/Pool/LogRecordThreadStaticPool.cs

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4+
using System.Diagnostics;
5+
46
namespace OpenTelemetry.Logs;
57

68
internal sealed class LogRecordThreadStaticPool : ILogRecordPool
@@ -19,15 +21,23 @@ public LogRecord Rent()
1921
var logRecord = Storage;
2022
if (logRecord != null)
2123
{
24+
Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool");
2225
Storage = null;
23-
return logRecord;
26+
}
27+
else
28+
{
29+
logRecord = new()
30+
{
31+
Source = LogRecord.LogRecordSource.FromThreadStaticPool,
32+
};
2433
}
2534

26-
return new();
35+
return logRecord;
2736
}
2837

2938
public void Return(LogRecord logRecord)
3039
{
40+
Debug.Assert(logRecord.Source == LogRecord.LogRecordSource.FromThreadStaticPool, "logRecord.Source was not FromThreadStaticPool");
3141
if (Storage == null)
3242
{
3343
LogRecordPoolHelper.Clear(logRecord);

test/OpenTelemetry.Tests/Logs/BatchLogRecordExportProcessorTests.cs

+46-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ public void StateValuesAndScopeBufferingTest()
2323

2424
using var scope = scopeProvider.Push(exportedItems);
2525

26-
var logRecord = new LogRecord();
26+
var pool = LogRecordSharedPool.Current;
27+
28+
var logRecord = pool.Rent();
2729

2830
var state = new LogRecordTest.DisposingState("Hello world");
2931

@@ -60,6 +62,7 @@ public void StateValuesAndScopeBufferingTest()
6062
processor.Shutdown();
6163

6264
Assert.Single(exportedItems);
65+
Assert.Same(logRecord, exportedItems[0]);
6366
}
6467

6568
[Fact]
@@ -74,14 +77,19 @@ public void StateBufferingTest()
7477
using var processor = new BatchLogRecordExportProcessor(
7578
new InMemoryExporter<LogRecord>(exportedItems));
7679

77-
var logRecord = new LogRecord();
80+
var pool = LogRecordSharedPool.Current;
81+
82+
var logRecord = pool.Rent();
7883

7984
var state = new LogRecordTest.DisposingState("Hello world");
8085
logRecord.State = state;
8186

8287
processor.OnEnd(logRecord);
8388
processor.Shutdown();
8489

90+
Assert.Single(exportedItems);
91+
Assert.Same(logRecord, exportedItems[0]);
92+
8593
state.Dispose();
8694

8795
Assert.Throws<ObjectDisposedException>(() =>
@@ -93,5 +101,41 @@ public void StateBufferingTest()
93101
}
94102
});
95103
}
104+
105+
[Fact]
106+
public void CopyMadeWhenLogRecordIsFromThreadStaticPoolTest()
107+
{
108+
List<LogRecord> exportedItems = new();
109+
110+
using var processor = new BatchLogRecordExportProcessor(
111+
new InMemoryExporter<LogRecord>(exportedItems));
112+
113+
var pool = LogRecordThreadStaticPool.Instance;
114+
115+
var logRecord = pool.Rent();
116+
117+
processor.OnEnd(logRecord);
118+
processor.Shutdown();
119+
120+
Assert.Single(exportedItems);
121+
Assert.NotSame(logRecord, exportedItems[0]);
122+
}
123+
124+
[Fact]
125+
public void LogRecordAddedToBatchIfNotFromAnyPoolTest()
126+
{
127+
List<LogRecord> exportedItems = new();
128+
129+
using var processor = new BatchLogRecordExportProcessor(
130+
new InMemoryExporter<LogRecord>(exportedItems));
131+
132+
var logRecord = new LogRecord();
133+
134+
processor.OnEnd(logRecord);
135+
processor.Shutdown();
136+
137+
Assert.Single(exportedItems);
138+
Assert.Same(logRecord, exportedItems[0]);
139+
}
96140
}
97141
#endif

test/OpenTelemetry.Tests/Logs/LogRecordSharedPoolTests.cs

+12-9
Original file line numberDiff line numberDiff line change
@@ -44,20 +44,23 @@ public void RentReturnTests()
4444

4545
Assert.Equal(1, pool.Count);
4646

47-
// Note: This is ignored because logRecord manually created has PoolReferenceCount = int.MaxValue.
48-
LogRecord manualRecord = new();
49-
Assert.Equal(int.MaxValue, manualRecord.PoolReferenceCount);
50-
pool.Return(manualRecord);
47+
var logRecordWithReferencesAdded = pool.Rent();
5148

52-
Assert.Equal(1, pool.Count);
49+
// Note: This record won't be returned to the pool because we add a reference to it.
50+
logRecordWithReferencesAdded.AddReference();
51+
52+
Assert.Equal(2, logRecordWithReferencesAdded.PoolReferenceCount);
53+
pool.Return(logRecordWithReferencesAdded);
54+
55+
Assert.Equal(0, pool.Count);
5356

5457
pool.Return(logRecord2);
5558

56-
Assert.Equal(2, pool.Count);
59+
Assert.Equal(1, pool.Count);
5760

5861
logRecord1 = pool.Rent();
5962
Assert.NotNull(logRecord1);
60-
Assert.Equal(1, pool.Count);
63+
Assert.Equal(0, pool.Count);
6164

6265
logRecord2 = pool.Rent();
6366
Assert.NotNull(logRecord2);
@@ -70,7 +73,7 @@ public void RentReturnTests()
7073

7174
pool.Return(logRecord1);
7275
pool.Return(logRecord2);
73-
pool.Return(logRecord3);
76+
pool.Return(logRecord3); // <- Discarded due to pool size of 2
7477
pool.Return(logRecord4); // <- Discarded due to pool size of 2
7578

7679
Assert.Equal(2, pool.Count);
@@ -163,7 +166,7 @@ public async Task ExportTest(bool warmup)
163166
{
164167
for (int i = 0; i < LogRecordSharedPool.DefaultMaxPoolSize; i++)
165168
{
166-
pool.Return(new LogRecord { PoolReferenceCount = 1 });
169+
pool.Return(new LogRecord { Source = LogRecord.LogRecordSource.FromSharedPool, PoolReferenceCount = 1 });
167170
}
168171
}
169172

test/OpenTelemetry.Tests/Logs/LogRecordThreadStaticPoolTests.cs

+5-4
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,17 @@ public void RentReturnTests()
2222
Assert.NotNull(LogRecordThreadStaticPool.Storage);
2323
Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage);
2424

25-
LogRecordThreadStaticPool.Instance.Return(new());
25+
// Note: This record will be ignored because there is already something in the ThreadStatic storage.
26+
LogRecordThreadStaticPool.Instance.Return(new() { Source = LogRecord.LogRecordSource.FromThreadStaticPool });
2627
Assert.NotNull(LogRecordThreadStaticPool.Storage);
2728
Assert.Equal(logRecord, LogRecordThreadStaticPool.Storage);
2829

2930
LogRecordThreadStaticPool.Storage = null;
3031

31-
var manual = new LogRecord();
32-
LogRecordThreadStaticPool.Instance.Return(manual);
32+
var newLogRecord = new LogRecord() { Source = LogRecord.LogRecordSource.FromThreadStaticPool };
33+
LogRecordThreadStaticPool.Instance.Return(newLogRecord);
3334
Assert.NotNull(LogRecordThreadStaticPool.Storage);
34-
Assert.Equal(manual, LogRecordThreadStaticPool.Storage);
35+
Assert.Equal(newLogRecord, LogRecordThreadStaticPool.Storage);
3536
}
3637

3738
[Fact]

0 commit comments

Comments
 (0)