Skip to content

Commit 31f484f

Browse files
chukunxjack-berg
andauthored
#6454 log warning and adjust maxExportBatchSize when exceeds maxQueueSize. (#7045)
Co-authored-by: Jack Berg <[email protected]>
1 parent 2de5a2c commit 31f484f

File tree

4 files changed

+110
-0
lines changed

4 files changed

+110
-0
lines changed

sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,17 @@
1111
import io.opentelemetry.api.metrics.MeterProvider;
1212
import java.time.Duration;
1313
import java.util.concurrent.TimeUnit;
14+
import java.util.logging.Level;
15+
import java.util.logging.Logger;
1416

1517
/**
1618
* Builder class for {@link BatchLogRecordProcessor}.
1719
*
1820
* @since 1.27.0
1921
*/
2022
public final class BatchLogRecordProcessorBuilder {
23+
private static final Logger logger =
24+
Logger.getLogger(BatchLogRecordProcessorBuilder.class.getName());
2125

2226
// Visible for testing
2327
static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 1000;
@@ -103,6 +107,9 @@ long getExporterTimeoutNanos() {
103107
*/
104108
public BatchLogRecordProcessorBuilder setMaxQueueSize(int maxQueueSize) {
105109
checkArgument(maxQueueSize > 0, "maxQueueSize must be positive.");
110+
if (maxExportBatchSize > maxQueueSize) {
111+
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
112+
}
106113
this.maxQueueSize = maxQueueSize;
107114
return this;
108115
}
@@ -124,6 +131,9 @@ int getMaxQueueSize() {
124131
*/
125132
public BatchLogRecordProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
126133
checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive.");
134+
if (maxExportBatchSize > maxQueueSize) {
135+
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
136+
}
127137
this.maxExportBatchSize = maxExportBatchSize;
128138
return this;
129139
}
@@ -150,6 +160,10 @@ int getMaxExportBatchSize() {
150160
* @return a new {@link BatchLogRecordProcessor}.
151161
*/
152162
public BatchLogRecordProcessor build() {
163+
if (maxExportBatchSize > maxQueueSize) {
164+
maxExportBatchSize = maxQueueSize;
165+
logger.log(Level.FINE, "Using maxExportBatchSize: {0}", maxExportBatchSize);
166+
}
153167
return new BatchLogRecordProcessor(
154168
logRecordExporter,
155169
meterProvider,

sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,21 @@
1010
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1111
import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
1212
import static org.awaitility.Awaitility.await;
13+
import static org.mockito.ArgumentMatchers.any;
1314
import static org.mockito.ArgumentMatchers.anyList;
1415
import static org.mockito.ArgumentMatchers.argThat;
1516
import static org.mockito.Mockito.doThrow;
1617
import static org.mockito.Mockito.reset;
18+
import static org.mockito.Mockito.times;
19+
import static org.mockito.Mockito.verify;
1720
import static org.mockito.Mockito.when;
1821

1922
import io.opentelemetry.api.internal.GuardedBy;
2023
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
2124
import io.opentelemetry.sdk.common.CompletableResultCode;
2225
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
2326
import io.opentelemetry.sdk.logs.data.LogRecordData;
27+
import java.time.Duration;
2428
import java.util.ArrayList;
2529
import java.util.Arrays;
2630
import java.util.Collection;
@@ -117,6 +121,44 @@ void builderInvalidConfig() {
117121
.hasMessage("maxQueueSize must be positive.");
118122
}
119123

124+
@Test
125+
void builderAdjustMaxBatchSize() {
126+
LogRecordExporter dummyExporter = new CompletableLogRecordExporter();
127+
128+
BatchLogRecordProcessorBuilder builder =
129+
BatchLogRecordProcessor.builder(dummyExporter)
130+
.setMaxQueueSize(513)
131+
.setMaxExportBatchSize(1000);
132+
builder.build();
133+
134+
assertThat(builder.getMaxExportBatchSize()).isEqualTo(513);
135+
assertThat(builder.getMaxQueueSize()).isEqualTo(513);
136+
}
137+
138+
@Test
139+
void maxExportBatchSizeExceedsQueueSize() throws InterruptedException {
140+
// Given a processor configured with a maxExportBatchSize > maxQueueSize, ensure that after n =
141+
// maxQueueSize logs are emitted, export is triggered and that the queue is fully drained and
142+
// exported.
143+
int maxQueueSize = 2048;
144+
when(mockLogRecordExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess());
145+
SdkLoggerProvider sdkLoggerProvider =
146+
SdkLoggerProvider.builder()
147+
.addLogRecordProcessor(
148+
BatchLogRecordProcessor.builder(mockLogRecordExporter)
149+
.setScheduleDelay(Duration.ofSeconds(Integer.MAX_VALUE))
150+
.setMaxExportBatchSize(2049)
151+
.setMaxQueueSize(maxQueueSize)
152+
.build())
153+
.build();
154+
155+
for (int i = 0; i < maxQueueSize; i++) {
156+
emitLog(sdkLoggerProvider, "log " + i);
157+
}
158+
159+
await().untilAsserted(() -> verify(mockLogRecordExporter, times(1)).export(any()));
160+
}
161+
120162
@Test
121163
void emitMultipleLogs() {
122164
WaitingLogRecordExporter waitingLogRecordExporter =

sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111
import io.opentelemetry.api.metrics.MeterProvider;
1212
import java.time.Duration;
1313
import java.util.concurrent.TimeUnit;
14+
import java.util.logging.Level;
15+
import java.util.logging.Logger;
1416

1517
/** Builder class for {@link BatchSpanProcessor}. */
1618
public final class BatchSpanProcessorBuilder {
19+
private static final Logger logger = Logger.getLogger(BatchSpanProcessorBuilder.class.getName());
1720

1821
// Visible for testing
1922
static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 5000;
@@ -111,6 +114,9 @@ long getExporterTimeoutNanos() {
111114
*/
112115
public BatchSpanProcessorBuilder setMaxQueueSize(int maxQueueSize) {
113116
checkArgument(maxQueueSize > 0, "maxQueueSize must be positive.");
117+
if (maxExportBatchSize > maxQueueSize) {
118+
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
119+
}
114120
this.maxQueueSize = maxQueueSize;
115121
return this;
116122
}
@@ -132,6 +138,9 @@ int getMaxQueueSize() {
132138
*/
133139
public BatchSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
134140
checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive.");
141+
if (maxExportBatchSize > maxQueueSize) {
142+
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
143+
}
135144
this.maxExportBatchSize = maxExportBatchSize;
136145
return this;
137146
}
@@ -158,6 +167,10 @@ int getMaxExportBatchSize() {
158167
* @return a new {@link BatchSpanProcessor}.
159168
*/
160169
public BatchSpanProcessor build() {
170+
if (maxExportBatchSize > maxQueueSize) {
171+
maxExportBatchSize = maxQueueSize;
172+
logger.log(Level.FINE, "Using maxExportBatchSize: {0}", maxExportBatchSize);
173+
}
161174
return new BatchSpanProcessor(
162175
spanExporter,
163176
exportUnsampledSpans,

sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import static org.mockito.ArgumentMatchers.argThat;
1515
import static org.mockito.Mockito.doThrow;
1616
import static org.mockito.Mockito.reset;
17+
import static org.mockito.Mockito.times;
18+
import static org.mockito.Mockito.verify;
1719
import static org.mockito.Mockito.when;
1820

1921
import io.opentelemetry.api.internal.GuardedBy;
@@ -26,6 +28,7 @@
2628
import io.opentelemetry.sdk.trace.data.SpanData;
2729
import io.opentelemetry.sdk.trace.samplers.Sampler;
2830
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
31+
import java.time.Duration;
2932
import java.util.ArrayList;
3033
import java.util.Arrays;
3134
import java.util.Collection;
@@ -126,6 +129,44 @@ void builderInvalidConfig() {
126129
.hasMessage("maxQueueSize must be positive.");
127130
}
128131

132+
@Test
133+
void builderAdjustMaxBatchSize() {
134+
SpanExporter dummyExporter = new CompletableSpanExporter();
135+
136+
BatchSpanProcessorBuilder builder =
137+
BatchSpanProcessor.builder(dummyExporter).setMaxQueueSize(513).setMaxExportBatchSize(1000);
138+
builder.build();
139+
140+
assertThat(builder.getMaxExportBatchSize()).isEqualTo(513);
141+
assertThat(builder.getMaxQueueSize()).isEqualTo(513);
142+
}
143+
144+
@Test
145+
void maxExportBatchSizeExceedsQueueSize() throws InterruptedException {
146+
// Given a processor configured with a maxExportBatchSize > maxQueueSize, ensure that after n =
147+
// maxQueueSize spans are ended, export is triggered and that the queue is fully drained and
148+
// exported.
149+
int maxQueueSize = 2048;
150+
when(mockSpanExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess());
151+
sdkTracerProvider =
152+
SdkTracerProvider.builder()
153+
.addSpanProcessor(
154+
BatchSpanProcessor.builder(mockSpanExporter)
155+
.setScheduleDelay(Duration.ofSeconds(Integer.MAX_VALUE))
156+
.setMaxExportBatchSize(2049)
157+
.setMaxQueueSize(maxQueueSize)
158+
.build())
159+
.build();
160+
161+
for (int i = 0; i < maxQueueSize; i++) {
162+
createEndedSpan("span " + i);
163+
}
164+
165+
Thread.sleep(10);
166+
167+
await().untilAsserted(() -> verify(mockSpanExporter, times(1)).export(any()));
168+
}
169+
129170
@Test
130171
void startEndRequirements() {
131172
BatchSpanProcessor spansProcessor =

0 commit comments

Comments
 (0)