From fe9216ab61ddae30c22ec7df3c70b8ee6a83c724 Mon Sep 17 00:00:00 2001 From: fffy <63960540@qq.com> Date: Mon, 15 Jul 2024 17:16:17 +0800 Subject: [PATCH] set binaryLogClient with server-id --- .../mysql/debezium/DebeziumUtils.java | 20 +++------- .../task/context/StatefulTaskContext.java | 4 +- .../source/offset/BinlogOffsetUtils.java | 6 ++- .../reader/BinlogSplitReaderTest.java | 3 +- .../source/SpecificStartingOffsetITCase.java | 38 +++++++++++++++---- 5 files changed, 46 insertions(+), 25 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java index 29ab3f1beae..ac3b20c4fed 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -240,12 +240,15 @@ private static Map querySystemVariables( return variables; } - public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) { + public static BinlogOffset findBinlogOffset( + long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) { MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig(); BinaryLogClient client = new BinaryLogClient( config.hostname(), config.port(), config.username(), config.password()); - + if (mySqlSourceConfig.getServerIdRange() != null) { + client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId()); + } List binlogFiles = new ArrayList<>(); JdbcConnection.ResultSetConsumer rsc = rs -> { @@ -324,22 +327,11 @@ private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile client.setBinlogFilename(binlogFile); client.setBinlogPosition(0); - LOG.info("Begin parse binlog: {}", binlogFile); + LOG.info("begin parse binlog: {}", binlogFile); client.connect(); } finally { client.unregisterEventListener(eventListener); } - if (binlogTimestamps.isEmpty()) { - try { - if (client.isConnected()) { - client.disconnect(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - LOG.warn("Failed to register eventListener and try to register it again"); - return getBinlogTimestamp(client, binlogFile); - } return binlogTimestamps.take(); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index 5fc342a4c2d..fbce9cdd780 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -193,7 +193,9 @@ protected MySqlOffsetContext loadStartingOffsetState( mySqlSplit.isSnapshotSplit() ? BinlogOffset.ofEarliest() : initializeEffectiveOffset( - mySqlSplit.asBinlogSplit().getStartingOffset(), connection); + mySqlSplit.asBinlogSplit().getStartingOffset(), + connection, + sourceConfig); LOG.info("Starting offset is initialized to {}", offset); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java index ce82d6a1b83..3c192eaa779 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import io.debezium.connector.mysql.MySqlConnection; @@ -45,13 +46,14 @@ public class BinlogOffsetUtils { * */ public static BinlogOffset initializeEffectiveOffset( - BinlogOffset offset, MySqlConnection connection) { + BinlogOffset offset, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) { BinlogOffsetKind offsetKind = offset.getOffsetKind(); switch (offsetKind) { case EARLIEST: return BinlogOffset.ofBinlogFilePosition("", 0); case TIMESTAMP: - return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection); + return DebeziumUtils.findBinlogOffset( + offset.getTimestampSec() * 1000, connection, mySqlSourceConfig); case LATEST: return DebeziumUtils.currentBinlogOffset(connection); default: diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 05e603c00ab..f9c070d964b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -1262,7 +1262,8 @@ protected MySqlOffsetContext loadStartingOffsetState( ? BinlogOffset.ofEarliest() : initializeEffectiveOffset( mySqlSplit.asBinlogSplit().getStartingOffset(), - getConnection()); + getConnection(), + getSourceConfig()); LOG.info("Starting offset is initialized to {}", offset); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java index 9761a6f3837..44d72a64b88 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; @@ -263,9 +264,12 @@ void testBinlogSplitFromTimestampOffset() throws Exception { // Purge binary log at first purgeBinaryLogs(); + long t0 = System.currentTimeMillis(); + String servedId0 = "5400"; Assert.equals( BinlogOffset.ofBinlogFilePosition("mysql-bin.000004", 0), - DebeziumUtils.findBinlogOffset(System.currentTimeMillis(), connection)); + DebeziumUtils.findBinlogOffset( + t0, connection, getMySqlSourceConfig(t0, servedId0))); executeStatements( String.format( @@ -273,6 +277,7 @@ void testBinlogSplitFromTimestampOffset() throws Exception { customers.getTableId())); Thread.sleep(1000); long t1 = System.currentTimeMillis(); + String servedId1 = "5401"; flushLogs(); executeStatements( @@ -281,6 +286,7 @@ void testBinlogSplitFromTimestampOffset() throws Exception { customers.getTableId())); Thread.sleep(1000); long t2 = System.currentTimeMillis(); + String servedId2 = "5402"; flushLogs(); executeStatements( @@ -289,6 +295,7 @@ void testBinlogSplitFromTimestampOffset() throws Exception { customers.getTableId())); Thread.sleep(1000); long t3 = System.currentTimeMillis(); + String servedId3 = "5403"; flushLogs(); executeStatements( @@ -297,6 +304,7 @@ void testBinlogSplitFromTimestampOffset() throws Exception { customers.getTableId())); Thread.sleep(1000); long t4 = System.currentTimeMillis(); + String servedId4 = "5404"; flushLogs(); executeStatements( @@ -305,28 +313,35 @@ void testBinlogSplitFromTimestampOffset() throws Exception { customers.getTableId())); Thread.sleep(1000); long t5 = System.currentTimeMillis(); + String servedId5 = "5405"; flushLogs(); Assert.equals( BinlogOffset.ofBinlogFilePosition("mysql-bin.000005", 0), - DebeziumUtils.findBinlogOffset(t1, connection)); + DebeziumUtils.findBinlogOffset( + t1, connection, getMySqlSourceConfig(t1, servedId1))); Assert.equals( BinlogOffset.ofBinlogFilePosition("mysql-bin.000006", 0), - DebeziumUtils.findBinlogOffset(t2, connection)); + DebeziumUtils.findBinlogOffset( + t2, connection, getMySqlSourceConfig(t1, servedId2))); Assert.equals( BinlogOffset.ofBinlogFilePosition("mysql-bin.000007", 0), - DebeziumUtils.findBinlogOffset(t3, connection)); + DebeziumUtils.findBinlogOffset( + t3, connection, getMySqlSourceConfig(t1, servedId3))); Assert.equals( BinlogOffset.ofBinlogFilePosition("mysql-bin.000008", 0), - DebeziumUtils.findBinlogOffset(t4, connection)); + DebeziumUtils.findBinlogOffset( + t4, connection, getMySqlSourceConfig(t1, servedId4))); Assert.equals( BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0), - DebeziumUtils.findBinlogOffset(t5, connection)); + DebeziumUtils.findBinlogOffset( + t5, connection, getMySqlSourceConfig(t1, servedId5))); purgeBinaryLogs(); Assert.equals( BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0), - DebeziumUtils.findBinlogOffset(t3, connection)); + DebeziumUtils.findBinlogOffset( + t5, connection, getMySqlSourceConfig(t1, servedId5))); } @Test @@ -440,6 +455,15 @@ private MySqlConnection getConnection() { return DebeziumUtils.createMySqlConnection(configuration, new Properties()); } + private MySqlSourceConfig getMySqlSourceConfig(Long timestamp, String serverId) { + return getSourceBuilder() + .startupOptions(StartupOptions.timestamp(timestamp)) + .serverId(serverId) + .build() + .getConfigFactory() + .createConfig(0); + } + private void executeStatements(String... statements) throws Exception { connection.execute(statements); connection.commit();