Skip to content

Commit

Permalink
set binaryLogClient with server-id
Browse files Browse the repository at this point in the history
  • Loading branch information
Thorne-coder committed Jul 15, 2024
1 parent b52cfd4 commit fe9216a
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,15 @@ private static Map<String, String> querySystemVariables(
return variables;
}

public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
public static BinlogOffset findBinlogOffset(
long targetMs, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());

if (mySqlSourceConfig.getServerIdRange() != null) {
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
}
List<String> binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {
Expand Down Expand Up @@ -324,22 +327,11 @@ private static long getBinlogTimestamp(BinaryLogClient client, String binlogFile
client.setBinlogFilename(binlogFile);
client.setBinlogPosition(0);

LOG.info("Begin parse binlog: {}", binlogFile);
LOG.info("begin parse binlog: {}", binlogFile);
client.connect();
} finally {
client.unregisterEventListener(eventListener);
}
if (binlogTimestamps.isEmpty()) {
try {
if (client.isConnected()) {
client.disconnect();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
LOG.warn("Failed to register eventListener and try to register it again");
return getBinlogTimestamp(client, binlogFile);
}
return binlogTimestamps.take();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ protected MySqlOffsetContext loadStartingOffsetState(
mySqlSplit.isSnapshotSplit()
? BinlogOffset.ofEarliest()
: initializeEffectiveOffset(
mySqlSplit.asBinlogSplit().getStartingOffset(), connection);
mySqlSplit.asBinlogSplit().getStartingOffset(),
connection,
sourceConfig);

LOG.info("Starting offset is initialized to {}", offset);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;

import io.debezium.connector.mysql.MySqlConnection;

Expand All @@ -45,13 +46,14 @@ public class BinlogOffsetUtils {
* </ul>
*/
public static BinlogOffset initializeEffectiveOffset(
BinlogOffset offset, MySqlConnection connection) {
BinlogOffset offset, MySqlConnection connection, MySqlSourceConfig mySqlSourceConfig) {
BinlogOffsetKind offsetKind = offset.getOffsetKind();
switch (offsetKind) {
case EARLIEST:
return BinlogOffset.ofBinlogFilePosition("", 0);
case TIMESTAMP:
return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
return DebeziumUtils.findBinlogOffset(
offset.getTimestampSec() * 1000, connection, mySqlSourceConfig);
case LATEST:
return DebeziumUtils.currentBinlogOffset(connection);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,8 @@ protected MySqlOffsetContext loadStartingOffsetState(
? BinlogOffset.ofEarliest()
: initializeEffectiveOffset(
mySqlSplit.asBinlogSplit().getStartingOffset(),
getConnection());
getConnection(),
getSourceConfig());

LOG.info("Starting offset is initialized to {}", offset);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
Expand Down Expand Up @@ -263,16 +264,20 @@ void testBinlogSplitFromTimestampOffset() throws Exception {
// Purge binary log at first
purgeBinaryLogs();

long t0 = System.currentTimeMillis();
String servedId0 = "5400";
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000004", 0),
DebeziumUtils.findBinlogOffset(System.currentTimeMillis(), connection));
DebeziumUtils.findBinlogOffset(
t0, connection, getMySqlSourceConfig(t0, servedId0)));

executeStatements(
String.format(
"INSERT INTO %s VALUES (15213, 'Alice', 'Rome', '123456987');",
customers.getTableId()));
Thread.sleep(1000);
long t1 = System.currentTimeMillis();
String servedId1 = "5401";
flushLogs();

executeStatements(
Expand All @@ -281,6 +286,7 @@ void testBinlogSplitFromTimestampOffset() throws Exception {
customers.getTableId()));
Thread.sleep(1000);
long t2 = System.currentTimeMillis();
String servedId2 = "5402";
flushLogs();

executeStatements(
Expand All @@ -289,6 +295,7 @@ void testBinlogSplitFromTimestampOffset() throws Exception {
customers.getTableId()));
Thread.sleep(1000);
long t3 = System.currentTimeMillis();
String servedId3 = "5403";
flushLogs();

executeStatements(
Expand All @@ -297,6 +304,7 @@ void testBinlogSplitFromTimestampOffset() throws Exception {
customers.getTableId()));
Thread.sleep(1000);
long t4 = System.currentTimeMillis();
String servedId4 = "5404";
flushLogs();

executeStatements(
Expand All @@ -305,28 +313,35 @@ void testBinlogSplitFromTimestampOffset() throws Exception {
customers.getTableId()));
Thread.sleep(1000);
long t5 = System.currentTimeMillis();
String servedId5 = "5405";
flushLogs();

Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000005", 0),
DebeziumUtils.findBinlogOffset(t1, connection));
DebeziumUtils.findBinlogOffset(
t1, connection, getMySqlSourceConfig(t1, servedId1)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000006", 0),
DebeziumUtils.findBinlogOffset(t2, connection));
DebeziumUtils.findBinlogOffset(
t2, connection, getMySqlSourceConfig(t1, servedId2)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000007", 0),
DebeziumUtils.findBinlogOffset(t3, connection));
DebeziumUtils.findBinlogOffset(
t3, connection, getMySqlSourceConfig(t1, servedId3)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000008", 0),
DebeziumUtils.findBinlogOffset(t4, connection));
DebeziumUtils.findBinlogOffset(
t4, connection, getMySqlSourceConfig(t1, servedId4)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0),
DebeziumUtils.findBinlogOffset(t5, connection));
DebeziumUtils.findBinlogOffset(
t5, connection, getMySqlSourceConfig(t1, servedId5)));

purgeBinaryLogs();
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0),
DebeziumUtils.findBinlogOffset(t3, connection));
DebeziumUtils.findBinlogOffset(
t5, connection, getMySqlSourceConfig(t1, servedId5)));
}

@Test
Expand Down Expand Up @@ -440,6 +455,15 @@ private MySqlConnection getConnection() {
return DebeziumUtils.createMySqlConnection(configuration, new Properties());
}

private MySqlSourceConfig getMySqlSourceConfig(Long timestamp, String serverId) {
return getSourceBuilder()
.startupOptions(StartupOptions.timestamp(timestamp))
.serverId(serverId)
.build()
.getConfigFactory()
.createConfig(0);
}

private void executeStatements(String... statements) throws Exception {
connection.execute(statements);
connection.commit();
Expand Down

0 comments on commit fe9216a

Please sign in to comment.