diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 8bee137efdf..bade4c26efc 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -1138,4 +1138,14 @@ The example for different spatial data types mapping is as follows: +## Metrics +The mysql-cdc connector offers six additional metrics for each type of data change record. +- `numRecordsOutByDataChangeRecordInsert`: The number of `INSERT` data change records. +- `numRecordsOutByDataChangeRecordUpdate`: The number of `UPDATE` data change records. +- `numRecordsOutByDataChangeRecordDelete`: The number of `DELETE` data change records. +- `numRecordsOutByRateDataChangeRecordInsert`: The number of `INSERT` data change records per second. +- `numRecordsOutByRateDataChangeRecordUpdate`: The number of `UPDATE` data change records per second. +- `numRecordsOutByRateDataChangeRecordDelete`: The number of `DELETE` data change records per second. + + {{< top >}} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java index 76ef2ed66f4..6a8128ba184 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -17,15 +17,33 @@ package org.apache.flink.cdc.connectors.mysql.source.metrics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.event.OperationType; import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.MetricNames; +import io.debezium.relational.TableId; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** A collection class for handling metrics in {@link MySqlSourceReader}. */ public class MySqlSourceReaderMetrics { public static final long UNDEFINED = -1; + private static final Map DATA_CHANGE_RECORD_MAP = + new ConcurrentHashMap() { + { + put(OperationType.INSERT, "DataChangeRecordInsert"); + put(OperationType.UPDATE, "DataChangeRecordUpdate"); + put(OperationType.DELETE, "DataChangeRecordDelete"); + } + }; private final MetricGroup metricGroup; @@ -35,6 +53,11 @@ public class MySqlSourceReaderMetrics { */ private volatile long fetchDelay = UNDEFINED; + private final Map, Counter> numRecordsOutByDataChangeRecordMap = + new ConcurrentHashMap(); + private final Map, Meter> + numRecordsOutByRateDataChangeRecordMap = new ConcurrentHashMap(); + public MySqlSourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; } @@ -51,4 +74,32 @@ public long getFetchDelay() { public void recordFetchDelay(long fetchDelay) { this.fetchDelay = fetchDelay; } + + public void numRecordsOutByDataChangeRecord(TableId tableId, OperationType op) { + Tuple2 metricMapKey = new Tuple2<>(tableId, op); + + Counter counter = + numRecordsOutByDataChangeRecordMap.compute( + metricMapKey, + (keyForCounter, existingCounter) -> { + if (existingCounter == null) { + Counter newCounter = + metricGroup.counter( + MetricNames.IO_NUM_RECORDS_OUT + + DATA_CHANGE_RECORD_MAP.get(op)); + numRecordsOutByRateDataChangeRecordMap.computeIfAbsent( + metricMapKey, + keyForMeter -> + metricGroup.meter( + MetricNames.IO_NUM_RECORDS_OUT_RATE + + DATA_CHANGE_RECORD_MAP.get(op), + new MeterView(newCounter))); + return newCounter; + } else { + return existingCounter; + } + }); + + counter.inc(); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index e3c504113ca..5bd421f19aa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.mysql.source.reader; import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.cdc.common.event.OperationType; import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState; @@ -28,9 +29,12 @@ import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.util.Collector; +import io.debezium.data.Envelope; import io.debezium.document.Array; +import io.debezium.relational.TableId; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.TableChanges; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,13 +124,32 @@ private void emitElement(SourceRecord element, SourceOutput output) throws Ex debeziumDeserializationSchema.deserialize(element, outputCollector); } - private void reportMetrics(SourceRecord element) { - - Long messageTimestamp = RecordUtils.getMessageTimestamp(element); + private void reportMetrics(SourceRecord record) { + Struct value = (Struct) record.value(); + if (value != null) { + TableId tableId = RecordUtils.getTableId(record); + Envelope.Operation op = + Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION)); + switch (op) { + case CREATE: + sourceReaderMetrics.numRecordsOutByDataChangeRecord( + tableId, OperationType.INSERT); + break; + case UPDATE: + sourceReaderMetrics.numRecordsOutByDataChangeRecord( + tableId, OperationType.UPDATE); + break; + case DELETE: + sourceReaderMetrics.numRecordsOutByDataChangeRecord( + tableId, OperationType.DELETE); + break; + } + } + Long messageTimestamp = RecordUtils.getMessageTimestamp(record); if (messageTimestamp != null && messageTimestamp > 0L) { // report fetch delay - Long fetchTimestamp = RecordUtils.getFetchTimestamp(element); + Long fetchTimestamp = RecordUtils.getFetchTimestamp(record); if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) { // report fetch delay sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);