From b2256246004895821bd5257a9c523c07ae6390fb Mon Sep 17 00:00:00 2001 From: Seung-Min Lee Date: Sun, 11 Aug 2024 03:45:30 +0900 Subject: [PATCH 1/5] ADD: added metrics for data change events --- .../metrics/MySqlSourceReaderMetrics.java | 43 +++++++++++++++++++ .../source/reader/MySqlRecordEmitter.java | 31 +++++++++++-- 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java index 76ef2ed66f4..f789d82d450 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -17,11 +17,19 @@ package org.apache.flink.cdc.connectors.mysql.source.metrics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.event.OperationType; import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.MetricNames; +import io.debezium.relational.TableId; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** A collection class for handling metrics in {@link MySqlSourceReader}. */ public class MySqlSourceReaderMetrics { @@ -35,6 +43,15 @@ public class MySqlSourceReaderMetrics { */ private volatile long fetchDelay = UNDEFINED; + private final Map, Counter> numRecordsOutByDataChangeRecordMap = + new ConcurrentHashMap(); + public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_INSERT = + "numRecordsOutByDataChangeRecordInsert"; + public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_UPDATE = + "numRecordsOutByDataChangeRecordUpdate"; + public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_DELETE = + "numRecordsOutByDataChangeRecordDelete"; + public MySqlSourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; } @@ -51,4 +68,30 @@ public long getFetchDelay() { public void recordFetchDelay(long fetchDelay) { this.fetchDelay = fetchDelay; } + + public void numRecordsOutByDataChangeRecord(TableId tableId, OperationType op) { + Counter counter = + numRecordsOutByDataChangeRecordMap.computeIfAbsent( + new Tuple2<>(tableId, op), + k -> { + switch (op) { + case INSERT: + return metricGroup.counter( + IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_INSERT); + case UPDATE: + return metricGroup.counter( + IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_UPDATE); + case DELETE: + return metricGroup.counter( + IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_DELETE); + default: + throw new UnsupportedOperationException( + "Unsupported operation type for " + + "numRecordsOutByDataChangeRecord Metrics " + + op); + } + }); + + counter.inc(); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index e3c504113ca..5bd421f19aa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.mysql.source.reader; import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.cdc.common.event.OperationType; import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState; @@ -28,9 +29,12 @@ import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.util.Collector; +import io.debezium.data.Envelope; import io.debezium.document.Array; +import io.debezium.relational.TableId; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.TableChanges; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,13 +124,32 @@ private void emitElement(SourceRecord element, SourceOutput output) throws Ex debeziumDeserializationSchema.deserialize(element, outputCollector); } - private void reportMetrics(SourceRecord element) { - - Long messageTimestamp = RecordUtils.getMessageTimestamp(element); + private void reportMetrics(SourceRecord record) { + Struct value = (Struct) record.value(); + if (value != null) { + TableId tableId = RecordUtils.getTableId(record); + Envelope.Operation op = + Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION)); + switch (op) { + case CREATE: + sourceReaderMetrics.numRecordsOutByDataChangeRecord( + tableId, OperationType.INSERT); + break; + case UPDATE: + sourceReaderMetrics.numRecordsOutByDataChangeRecord( + tableId, OperationType.UPDATE); + break; + case DELETE: + sourceReaderMetrics.numRecordsOutByDataChangeRecord( + tableId, OperationType.DELETE); + break; + } + } + Long messageTimestamp = RecordUtils.getMessageTimestamp(record); if (messageTimestamp != null && messageTimestamp > 0L) { // report fetch delay - Long fetchTimestamp = RecordUtils.getFetchTimestamp(element); + Long fetchTimestamp = RecordUtils.getFetchTimestamp(record); if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) { // report fetch delay sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp); From 8d4faa701e1581d4a4b1cc833c73743217903e68 Mon Sep 17 00:00:00 2001 From: Seung-Min Lee Date: Sun, 11 Aug 2024 04:08:11 +0900 Subject: [PATCH 2/5] ADD: added metrics to mysql-cdc.md --- docs/content/docs/connectors/flink-sources/mysql-cdc.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 8bee137efdf..c05a6b39622 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -1138,4 +1138,11 @@ The example for different spatial data types mapping is as follows: +## Metrics +The mysql-cdc connector offers three additional metrics for each type of data change records. +- `numRecordsOutByDataChangeRecordInsert`: The number of data change records of INSERT. +- `numRecordsOutByDataChangeRecordUpdate`: The number of data change records of UPDATE. +- `numRecordsOutByDataChangeRecordDelete`: The number of data change records of DELETE. + + {{< top >}} From 254aebaad8fb05060cb9eb942940b7cb6ceb5b5a Mon Sep 17 00:00:00 2001 From: Seung-Min Lee Date: Sun, 11 Aug 2024 04:35:30 +0900 Subject: [PATCH 3/5] ADD: added numRecordOutRate for each Operation Type --- .../source/metrics/MySqlSourceReaderMetrics.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java index f789d82d450..8b91e6689cb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.MetricNames; @@ -47,10 +48,16 @@ public class MySqlSourceReaderMetrics { new ConcurrentHashMap(); public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_INSERT = "numRecordsOutByDataChangeRecordInsert"; + public static final String IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_INSERT = + "numRecordsOutByPerSecondDataChangeRecordInsert"; public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_UPDATE = "numRecordsOutByDataChangeRecordUpdate"; + public static final String IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_UPDATE = + "numRecordsOutByPerSecondDataChangeRecordUpdate"; public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_DELETE = "numRecordsOutByDataChangeRecordDelete"; + public static final String IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_DELETE = + "numRecordsOutByPerSecondDataChangeRecordDelete"; public MySqlSourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; @@ -59,6 +66,15 @@ public MySqlSourceReaderMetrics(MetricGroup metricGroup) { public void registerMetrics() { metricGroup.gauge( MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, (Gauge) this::getFetchDelay); + this.metricGroup.meter( + IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_INSERT, + new MeterView(metricGroup.counter(IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_INSERT))); + this.metricGroup.meter( + IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_UPDATE, + new MeterView(metricGroup.counter(IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_UPDATE))); + this.metricGroup.meter( + IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_DELETE, + new MeterView(metricGroup.counter(IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_DELETE))); } public long getFetchDelay() { From c362386683ee8302054677b7454a0e34aa90a591 Mon Sep 17 00:00:00 2001 From: Seung-Min Lee Date: Sun, 11 Aug 2024 04:41:09 +0900 Subject: [PATCH 4/5] ADD: added comments for metrics about numRecordsOutByRates --- .../docs/connectors/flink-sources/mysql-cdc.md | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index c05a6b39622..bade4c26efc 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -1139,10 +1139,13 @@ The example for different spatial data types mapping is as follows: ## Metrics -The mysql-cdc connector offers three additional metrics for each type of data change records. -- `numRecordsOutByDataChangeRecordInsert`: The number of data change records of INSERT. -- `numRecordsOutByDataChangeRecordUpdate`: The number of data change records of UPDATE. -- `numRecordsOutByDataChangeRecordDelete`: The number of data change records of DELETE. +The mysql-cdc connector offers six additional metrics for each type of data change record. +- `numRecordsOutByDataChangeRecordInsert`: The number of `INSERT` data change records. +- `numRecordsOutByDataChangeRecordUpdate`: The number of `UPDATE` data change records. +- `numRecordsOutByDataChangeRecordDelete`: The number of `DELETE` data change records. +- `numRecordsOutByRateDataChangeRecordInsert`: The number of `INSERT` data change records per second. +- `numRecordsOutByRateDataChangeRecordUpdate`: The number of `UPDATE` data change records per second. +- `numRecordsOutByRateDataChangeRecordDelete`: The number of `DELETE` data change records per second. {{< top >}} From a5987b4031fceae4adf462dcb1b71c8f9f4663d5 Mon Sep 17 00:00:00 2001 From: Seung-Min Lee Date: Sun, 11 Aug 2024 05:31:14 +0900 Subject: [PATCH 5/5] REFACTOR optimized codes --- .../metrics/MySqlSourceReaderMetrics.java | 70 ++++++++----------- 1 file changed, 31 insertions(+), 39 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java index 8b91e6689cb..6a8128ba184 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.MetricNames; @@ -35,6 +36,14 @@ public class MySqlSourceReaderMetrics { public static final long UNDEFINED = -1; + private static final Map DATA_CHANGE_RECORD_MAP = + new ConcurrentHashMap() { + { + put(OperationType.INSERT, "DataChangeRecordInsert"); + put(OperationType.UPDATE, "DataChangeRecordUpdate"); + put(OperationType.DELETE, "DataChangeRecordDelete"); + } + }; private final MetricGroup metricGroup; @@ -46,18 +55,8 @@ public class MySqlSourceReaderMetrics { private final Map, Counter> numRecordsOutByDataChangeRecordMap = new ConcurrentHashMap(); - public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_INSERT = - "numRecordsOutByDataChangeRecordInsert"; - public static final String IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_INSERT = - "numRecordsOutByPerSecondDataChangeRecordInsert"; - public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_UPDATE = - "numRecordsOutByDataChangeRecordUpdate"; - public static final String IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_UPDATE = - "numRecordsOutByPerSecondDataChangeRecordUpdate"; - public static final String IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_DELETE = - "numRecordsOutByDataChangeRecordDelete"; - public static final String IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_DELETE = - "numRecordsOutByPerSecondDataChangeRecordDelete"; + private final Map, Meter> + numRecordsOutByRateDataChangeRecordMap = new ConcurrentHashMap(); public MySqlSourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; @@ -66,15 +65,6 @@ public MySqlSourceReaderMetrics(MetricGroup metricGroup) { public void registerMetrics() { metricGroup.gauge( MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, (Gauge) this::getFetchDelay); - this.metricGroup.meter( - IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_INSERT, - new MeterView(metricGroup.counter(IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_INSERT))); - this.metricGroup.meter( - IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_UPDATE, - new MeterView(metricGroup.counter(IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_UPDATE))); - this.metricGroup.meter( - IO_NUM_RECORDS_OUT_RATE_DATA_CHANGE_RECORD_DELETE, - new MeterView(metricGroup.counter(IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_DELETE))); } public long getFetchDelay() { @@ -86,25 +76,27 @@ public void recordFetchDelay(long fetchDelay) { } public void numRecordsOutByDataChangeRecord(TableId tableId, OperationType op) { + Tuple2 metricMapKey = new Tuple2<>(tableId, op); + Counter counter = - numRecordsOutByDataChangeRecordMap.computeIfAbsent( - new Tuple2<>(tableId, op), - k -> { - switch (op) { - case INSERT: - return metricGroup.counter( - IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_INSERT); - case UPDATE: - return metricGroup.counter( - IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_UPDATE); - case DELETE: - return metricGroup.counter( - IO_NUM_RECORDS_OUT_DATA_CHANGE_RECORD_DELETE); - default: - throw new UnsupportedOperationException( - "Unsupported operation type for " - + "numRecordsOutByDataChangeRecord Metrics " - + op); + numRecordsOutByDataChangeRecordMap.compute( + metricMapKey, + (keyForCounter, existingCounter) -> { + if (existingCounter == null) { + Counter newCounter = + metricGroup.counter( + MetricNames.IO_NUM_RECORDS_OUT + + DATA_CHANGE_RECORD_MAP.get(op)); + numRecordsOutByRateDataChangeRecordMap.computeIfAbsent( + metricMapKey, + keyForMeter -> + metricGroup.meter( + MetricNames.IO_NUM_RECORDS_OUT_RATE + + DATA_CHANGE_RECORD_MAP.get(op), + new MeterView(newCounter))); + return newCounter; + } else { + return existingCounter; } });