Skip to content

Commit 61da6da

Browse files
committed
[FLINK-36441] Fix KafkaWriterITCase#testIncreasingRecordBasedCounters
The test tried to assert on byte counts which are written async. Commit adds flushing and establishes a baseline so that metadata request don't interfere with assertions.
1 parent fbd1b7b commit 61da6da

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,31 +88,40 @@ public void testIncreasingRecordBasedCounters() throws Exception {
8888

8989
try (final KafkaWriter<Integer> writer =
9090
createWriterWithConfiguration(
91-
getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) {
91+
getKafkaClientConfiguration(),
92+
DeliveryGuarantee.AT_LEAST_ONCE,
93+
metricGroup)) {
9294
final Counter numBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
9395
final Counter numRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
9496
final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
9597
final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter();
96-
assertThat(numBytesOut.getCount()).isEqualTo(0L);
97-
assertThat(numRecordsOut.getCount()).isEqualTo(0);
98+
99+
// ApiVersionsRequest etc. is sent on initialization, so establish some baseline
100+
writer.write(0, SINK_WRITER_CONTEXT);
101+
writer.flush(false); // ensure data is actually written
102+
timeService.trigger(); // sync byte count
103+
long baselineCount = numBytesOut.getCount();
104+
assertThat(numRecordsOut.getCount()).isEqualTo(1);
98105
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
99106
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
100107

101108
// elements for which the serializer returns null should be silently skipped
102109
writer.write(null, SINK_WRITER_CONTEXT);
103-
timeService.trigger();
104-
assertThat(numBytesOut.getCount()).isEqualTo(0L);
105-
assertThat(numRecordsOut.getCount()).isEqualTo(0);
110+
writer.flush(false);
111+
timeService.trigger(); // sync byte count
112+
assertThat(numBytesOut.getCount()).isEqualTo(baselineCount);
113+
assertThat(numRecordsOut.getCount()).isEqualTo(1);
106114
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
107115
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
108116

109117
// but elements for which a non-null producer record is returned should count
110118
writer.write(1, SINK_WRITER_CONTEXT);
111-
timeService.trigger();
112-
assertThat(numRecordsOut.getCount()).isEqualTo(1);
119+
writer.flush(false);
120+
timeService.trigger(); // sync byte count
121+
assertThat(numBytesOut.getCount()).isGreaterThan(baselineCount);
122+
assertThat(numRecordsOut.getCount()).isEqualTo(2);
113123
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
114124
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
115-
assertThat(numBytesOut.getCount()).isGreaterThan(0L);
116125
}
117126
}
118127

0 commit comments

Comments
 (0)