Skip to content

Commit

Permalink
[FLINK-36441] Fix KafkaWriterITCase#testIncreasingRecordBasedCounters
Browse files Browse the repository at this point in the history
The test tried to assert on byte counts which are written async. Commit adds flushing and establishes a baseline so that metadata request don't interfere with assertions.
  • Loading branch information
AHeise committed Oct 9, 2024
1 parent 28836c6 commit 5eeafd6
Showing 1 changed file with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,31 +88,40 @@ public void testIncreasingRecordBasedCounters() throws Exception {

try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) {
getKafkaClientConfiguration(),
DeliveryGuarantee.AT_LEAST_ONCE,
metricGroup)) {
final Counter numBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
final Counter numRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter();
assertThat(numBytesOut.getCount()).isEqualTo(0L);
assertThat(numRecordsOut.getCount()).isEqualTo(0);

// ApiVersionsRequest etc. is sent on initialization, so establish some baseline
writer.write(0, SINK_WRITER_CONTEXT);
writer.flush(false); // ensure data is actually written
timeService.trigger(); // sync byte count
long baselineCount = numBytesOut.getCount();
assertThat(numRecordsOut.getCount()).isEqualTo(1);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);

// elements for which the serializer returns null should be silently skipped
writer.write(null, SINK_WRITER_CONTEXT);
timeService.trigger();
assertThat(numBytesOut.getCount()).isEqualTo(0L);
assertThat(numRecordsOut.getCount()).isEqualTo(0);
writer.flush(false);
timeService.trigger(); // sync byte count
assertThat(numBytesOut.getCount()).isEqualTo(baselineCount);
assertThat(numRecordsOut.getCount()).isEqualTo(1);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);

// but elements for which a non-null producer record is returned should count
writer.write(1, SINK_WRITER_CONTEXT);
timeService.trigger();
assertThat(numRecordsOut.getCount()).isEqualTo(1);
writer.flush(false);
timeService.trigger(); // sync byte count
assertThat(numBytesOut.getCount()).isGreaterThan(baselineCount);
assertThat(numRecordsOut.getCount()).isEqualTo(2);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
assertThat(numBytesOut.getCount()).isGreaterThan(0L);
}
}

Expand Down

0 comments on commit 5eeafd6

Please sign in to comment.