diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index e363d837c..a559e3a54 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -88,31 +88,40 @@ public void testIncreasingRecordBasedCounters() throws Exception { try (final KafkaWriter writer = createWriterWithConfiguration( - getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) { + getKafkaClientConfiguration(), + DeliveryGuarantee.AT_LEAST_ONCE, + metricGroup)) { final Counter numBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); final Counter numRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter(); - assertThat(numBytesOut.getCount()).isEqualTo(0L); - assertThat(numRecordsOut.getCount()).isEqualTo(0); + + // ApiVersionsRequest etc. is sent on initialization, so establish some baseline + writer.write(0, SINK_WRITER_CONTEXT); + writer.flush(false); // ensure data is actually written + timeService.trigger(); // sync byte count + long baselineCount = numBytesOut.getCount(); + assertThat(numRecordsOut.getCount()).isEqualTo(1); assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); // elements for which the serializer returns null should be silently skipped writer.write(null, SINK_WRITER_CONTEXT); - timeService.trigger(); - assertThat(numBytesOut.getCount()).isEqualTo(0L); - assertThat(numRecordsOut.getCount()).isEqualTo(0); + writer.flush(false); + timeService.trigger(); // sync byte count + assertThat(numBytesOut.getCount()).isEqualTo(baselineCount); + assertThat(numRecordsOut.getCount()).isEqualTo(1); assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); // but elements for which a non-null producer record is returned should count writer.write(1, SINK_WRITER_CONTEXT); - timeService.trigger(); - assertThat(numRecordsOut.getCount()).isEqualTo(1); + writer.flush(false); + timeService.trigger(); // sync byte count + assertThat(numBytesOut.getCount()).isGreaterThan(baselineCount); + assertThat(numRecordsOut.getCount()).isEqualTo(2); assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); - assertThat(numBytesOut.getCount()).isGreaterThan(0L); } }