Skip to content

Commit 6d937c3

Browse files
committed
[FLINK-35989][Connectors/AWS] Log errors on partially failed requests for AWS Kinesis Stream sink
1 parent ca96d84 commit 6d937c3

File tree

4 files changed

+442
-28
lines changed

4 files changed

+442
-28
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.kinesis.sink;
19+
20+
import org.apache.flink.annotation.Internal;
21+
22+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
23+
24+
/**
25+
* Provider interface for KinesisAsyncClient instances. This is primarily used for testing to inject
26+
* mock clients.
27+
*/
28+
@Internal
29+
interface KinesisClientProvider {
30+
/**
31+
* Returns a KinesisAsyncClient instance.
32+
*
33+
* @return The KinesisAsyncClient instance
34+
*/
35+
KinesisAsyncClient get();
36+
37+
/** Closes any resources held by this provider. */
38+
void close();
39+
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.annotation.Internal;
2121
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.connector.base.sink.AsyncSinkBase;
2324
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
2425
import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -72,6 +73,8 @@ public class KinesisStreamsSink<InputT> extends AsyncSinkBase<InputT, PutRecords
7273
private final String streamArn;
7374
private final Properties kinesisClientProperties;
7475

76+
@VisibleForTesting private KinesisClientProvider kinesisClientProvider;
77+
7578
KinesisStreamsSink(
7679
ElementConverter<InputT, PutRecordsRequestEntry> elementConverter,
7780
Integer maxBatchSize,
@@ -124,6 +127,19 @@ public static <InputT> KinesisStreamsSinkBuilder<InputT> builder() {
124127
return new KinesisStreamsSinkBuilder<>();
125128
}
126129

130+
/**
131+
* Set a custom KinesisAsyncClient provider for testing purposes. This method is only intended
132+
* to be used in tests.
133+
*
134+
* @param kinesisClientProvider The provider that supplies the KinesisAsyncClient
135+
*/
136+
@VisibleForTesting
137+
void setKinesisClientProvider(KinesisClientProvider kinesisClientProvider) {
138+
this.kinesisClientProvider =
139+
Preconditions.checkNotNull(
140+
kinesisClientProvider, "The kinesisClientProvider must not be null.");
141+
}
142+
127143
@Internal
128144
@Override
129145
public StatefulSinkWriter<InputT, BufferedRequestState<PutRecordsRequestEntry>> createWriter(
@@ -141,7 +157,8 @@ public StatefulSinkWriter<InputT, BufferedRequestState<PutRecordsRequestEntry>>
141157
streamName,
142158
streamArn,
143159
kinesisClientProperties,
144-
Collections.emptyList());
160+
Collections.emptyList(),
161+
kinesisClientProvider);
145162
}
146163

147164
@Internal
@@ -170,6 +187,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<PutRecordsRequestEntry>>
170187
streamName,
171188
streamArn,
172189
kinesisClientProperties,
173-
recoveredState);
190+
recoveredState,
191+
kinesisClientProvider);
174192
}
175193
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java

Lines changed: 145 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@
4545
import java.util.ArrayList;
4646
import java.util.Collection;
4747
import java.util.Collections;
48+
import java.util.HashMap;
4849
import java.util.List;
50+
import java.util.Map;
4951
import java.util.Properties;
5052
import java.util.concurrent.CompletableFuture;
5153
import java.util.function.Consumer;
@@ -102,6 +104,9 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
102104
/* The asynchronous Kinesis client - construction is by kinesisClientProperties */
103105
private final KinesisAsyncClient kinesisClient;
104106

107+
/* The client provider used for testing */
108+
private final KinesisClientProvider kinesisClientProvider;
109+
105110
/* Flag to whether fatally fail any time we encounter an exception when persisting records */
106111
private final boolean failOnError;
107112

@@ -148,6 +153,38 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
148153
String streamArn,
149154
Properties kinesisClientProperties,
150155
Collection<BufferedRequestState<PutRecordsRequestEntry>> states) {
156+
this(
157+
elementConverter,
158+
context,
159+
maxBatchSize,
160+
maxInFlightRequests,
161+
maxBufferedRequests,
162+
maxBatchSizeInBytes,
163+
maxTimeInBufferMS,
164+
maxRecordSizeInBytes,
165+
failOnError,
166+
streamName,
167+
streamArn,
168+
kinesisClientProperties,
169+
states,
170+
null);
171+
}
172+
173+
KinesisStreamsSinkWriter(
174+
ElementConverter<InputT, PutRecordsRequestEntry> elementConverter,
175+
Sink.InitContext context,
176+
int maxBatchSize,
177+
int maxInFlightRequests,
178+
int maxBufferedRequests,
179+
long maxBatchSizeInBytes,
180+
long maxTimeInBufferMS,
181+
long maxRecordSizeInBytes,
182+
boolean failOnError,
183+
String streamName,
184+
String streamArn,
185+
Properties kinesisClientProperties,
186+
Collection<BufferedRequestState<PutRecordsRequestEntry>> states,
187+
KinesisClientProvider kinesisClientProvider) {
151188
super(
152189
elementConverter,
153190
context,
@@ -167,8 +204,18 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
167204
this.streamArn = streamArn;
168205
this.metrics = context.metricGroup();
169206
this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
170-
this.httpClient = AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
171-
this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient);
207+
208+
this.kinesisClientProvider = kinesisClientProvider;
209+
210+
if (kinesisClientProvider != null) {
211+
// Use the provided client for testing
212+
this.httpClient = null;
213+
this.kinesisClient = kinesisClientProvider.get();
214+
} else {
215+
// Create a new client as before
216+
this.httpClient = AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
217+
this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient);
218+
}
172219
}
173220

174221
private KinesisAsyncClient buildClient(
@@ -244,34 +291,121 @@ private void handleFullyFailedRequest(
244291

245292
@Override
246293
public void close() {
247-
AWSGeneralUtil.closeResources(httpClient, kinesisClient);
294+
if (kinesisClientProvider != null) {
295+
kinesisClientProvider.close();
296+
} else {
297+
AWSGeneralUtil.closeResources(httpClient, kinesisClient);
298+
}
248299
}
249300

250301
private void handlePartiallyFailedRequest(
251302
PutRecordsResponse response,
252303
List<PutRecordsRequestEntry> requestEntries,
253304
Consumer<List<PutRecordsRequestEntry>> requestResult) {
254-
LOG.warn(
255-
"KDS Sink failed to write and will retry {} entries to KDS",
256-
response.failedRecordCount());
257-
numRecordsOutErrorsCounter.inc(response.failedRecordCount());
305+
int failedRecordCount = response.failedRecordCount();
306+
LOG.warn("KDS Sink failed to write and will retry {} entries to KDS", failedRecordCount);
307+
numRecordsOutErrorsCounter.inc(failedRecordCount);
258308

259309
if (failOnError) {
260310
getFatalExceptionCons()
261311
.accept(new KinesisStreamsException.KinesisStreamsFailFastException());
262312
return;
263313
}
264-
List<PutRecordsRequestEntry> failedRequestEntries =
265-
new ArrayList<>(response.failedRecordCount());
314+
315+
List<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(failedRecordCount);
266316
List<PutRecordsResultEntry> records = response.records();
267317

318+
// Collect error information and build the list of failed entries
319+
Map<String, ErrorSummary> errorSummaries =
320+
collectErrorSummaries(records, requestEntries, failedRequestEntries);
321+
322+
// Log aggregated error information
323+
logErrorSummaries(errorSummaries);
324+
325+
requestResult.accept(failedRequestEntries);
326+
}
327+
328+
/**
329+
* Collect error summaries from failed records and build a list of failed request entries.
330+
*
331+
* @param records The result entries from the Kinesis response
332+
* @param requestEntries The original request entries
333+
* @param failedRequestEntries List to populate with failed entries (modified as a side effect)
334+
* @return A map of error codes to their summaries
335+
*/
336+
private Map<String, ErrorSummary> collectErrorSummaries(
337+
List<PutRecordsResultEntry> records,
338+
List<PutRecordsRequestEntry> requestEntries,
339+
List<PutRecordsRequestEntry> failedRequestEntries) {
340+
341+
// We capture error info while minimizing logging overhead in the data path,
342+
// which is critical for maintaining throughput performance
343+
Map<String, ErrorSummary> errorSummaries = new HashMap<>();
344+
268345
for (int i = 0; i < records.size(); i++) {
269-
if (records.get(i).errorCode() != null) {
346+
PutRecordsResultEntry resultEntry = records.get(i);
347+
String errorCode = resultEntry.errorCode();
348+
349+
if (errorCode != null) {
350+
// Track the frequency of each error code to identify patterns
351+
ErrorSummary summary =
352+
errorSummaries.computeIfAbsent(
353+
errorCode, code -> new ErrorSummary(resultEntry.errorMessage()));
354+
summary.incrementCount();
355+
270356
failedRequestEntries.add(requestEntries.get(i));
271357
}
272358
}
273359

274-
requestResult.accept(failedRequestEntries);
360+
return errorSummaries;
361+
}
362+
363+
/**
364+
* Log aggregated error information at WARN level.
365+
*
366+
* @param errorSummaries Map of error codes to their summaries
367+
*/
368+
private void logErrorSummaries(Map<String, ErrorSummary> errorSummaries) {
369+
// We log aggregated error information at WARN level to ensure visibility in production
370+
// while avoiding the performance impact of logging each individual failure
371+
if (!errorSummaries.isEmpty()) {
372+
StringBuilder errorSummary = new StringBuilder("Kinesis errors summary: ");
373+
errorSummaries.forEach(
374+
(code, summary) ->
375+
errorSummary.append(
376+
String.format(
377+
"[%s: %d records, example: %s] ",
378+
code,
379+
summary.getCount(),
380+
summary.getExampleMessage())));
381+
382+
// Using a single WARN log with aggregated information provides operational
383+
// visibility into errors without flooding logs in high-throughput scenarios
384+
LOG.warn("KDS Sink failed to write, " + errorSummary.toString());
385+
}
386+
}
387+
388+
/** Helper class to store error summary information. */
389+
private static class ErrorSummary {
390+
private final String exampleMessage;
391+
private int count;
392+
393+
ErrorSummary(String exampleMessage) {
394+
this.exampleMessage = exampleMessage;
395+
this.count = 0;
396+
}
397+
398+
void incrementCount() {
399+
count++;
400+
}
401+
402+
int getCount() {
403+
return count;
404+
}
405+
406+
String getExampleMessage() {
407+
return exampleMessage;
408+
}
275409
}
276410

277411
private boolean isRetryable(Throwable err) {

0 commit comments

Comments
 (0)