Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37096][cdc-connect] fix The purpose of the issue is to change the value of the delay curve from 20098d to -1ms in the full phase. #3850

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ build-targetatlassian-ide-plugin.xml
docs/_build
**/.flattened-pom.xml
**/dependency-reduced-pom.xml
.idea/vcs.xml
.gitignore
.idea/vcs.xml
3 changes: 1 addition & 2 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@ public class DebeziumChangeFetcher<T> {
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private volatile long fetchDelay = 0L;
private volatile long fetchDelay = -1L;

/**
* emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the
* source operator.
*/
private volatile long emitDelay = 0L;
private volatile long emitDelay = -1L;

/** The number of records that failed to parse or deserialize. */
private volatile AtomicLong numRecordInErrors = new AtomicLong(0);
Expand Down Expand Up @@ -230,8 +230,11 @@ private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEve
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
SourceRecord record = event.value();
updateMessageTimestamp(record);
fetchDelay = isInDbSnapshotPhase ? 0L : processTime - messageTimestamp;
if (messageTimestamp == 0L) {

} else {
fetchDelay = isInDbSnapshotPhase ? -1L : processTime - messageTimestamp;
}
if (isHeartbeatEvent(record)) {
// keep offset update
synchronized (checkpointLock) {
Expand All @@ -248,7 +251,6 @@ private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEve
LOG.error("Failed to deserialize record {}", record, t);
throw t;
}

if (isInDbSnapshotPhase && !isSnapshotRecord(record)) {
LOG.debug("Snapshot phase finishes.");
isInDbSnapshotPhase = false;
Expand All @@ -268,8 +270,15 @@ private void emitRecordsUnderCheckpointLock(
synchronized (checkpointLock) {
T record;
while ((record = records.poll()) != null) {
emitDelay =
isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - messageTimestamp;
// If the snapshot does not end or no latest data is entered, -1 is reported
if (messageTimestamp == 0L) {

} else {
emitDelay =
isInDbSnapshotPhase
? -1L
: System.currentTimeMillis() - messageTimestamp;
}
sourceContext.collect(record);
}
// update offset to state
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package org.apache.flink.cdc.connectors.mongodb;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import org.bson.Document;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** IT tests for {@link MongoDBSource}. */
@RunWith(Parameterized.class)
public class MongoDBMetricCase extends MongoDBSourceTestBase {
public static final Duration TIMEOUT = Duration.ofSeconds(300);

public MongoDBMetricCase(String mongoVersion) {
super(mongoVersion);
}

@Parameterized.Parameters(name = "mongoVersion: {0}")
public static Object[] parameters() {
return Stream.of(getMongoVersions()).map(e -> new Object[] {e}).toArray();
}

@Test
public void testSourceMetrics() throws Exception {
String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(200L);
SourceFunction<String> sourceFunction =
MongoDBSource.<String>builder()
.hosts(mongoContainer.getHostAndPort())
.username(FLINK_USER)
.password(FLINK_USER_PASSWORD)
.databaseList(customerDatabase)
.collectionList(
getCollectionNameRegex(
customerDatabase, new String[] {"customers"}))
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> stream = env.addSource(sourceFunction, "MongoDB CDC Source");
CollectResultIterator<String> iterator = addCollector(env, stream);
JobClient jobClient = env.executeAsync();
iterator.setJobClient(jobClient);

// // ---------------------------- Snapshot phase ------------------------------
// // Wait until we receive all 21 snapshot records
int numSnapshotRecordsExpected = 21;
int numSnapshotRecordsReceived = 0;

while (numSnapshotRecordsReceived < numSnapshotRecordsExpected && iterator.hasNext()) {
iterator.next();
numSnapshotRecordsReceived++;
}

// Check metrics
List<OperatorMetricGroup> metricGroups =
metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MongoDB CDC Source");

// There should be only 1 parallelism of source, so it's safe to get the only group
OperatorMetricGroup group = metricGroups.get(0);
Map<String, Metric> metrics = metricReporter.getMetricsByGroup(group);

// numRecordsOut
assertEquals(
numSnapshotRecordsExpected,
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());

// currentEmitEventTimeLag should be UNDEFINED during snapshot phase
assertTrue(metrics.containsKey(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG));
Gauge<Long> currentEmitEventTimeLag =
(Gauge<Long>) metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG);
assertEquals(
InternalSourceReaderMetricGroup.UNDEFINED,
(long) currentEmitEventTimeLag.getValue());
// currentFetchEventTimeLag should be UNDEFINED during snapshot phase
assertTrue(metrics.containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG));
Gauge<Long> currentFetchEventTimeLag =
(Gauge<Long>) metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
assertEquals(
InternalSourceReaderMetricGroup.UNDEFINED,
(long) currentFetchEventTimeLag.getValue());
// sourceIdleTime should be positive (we can't know the exact value)
assertTrue(metrics.containsKey(MetricNames.SOURCE_IDLE_TIME));
Gauge<Long> sourceIdleTime = (Gauge<Long>) metrics.get(MetricNames.SOURCE_IDLE_TIME);
assertTrue(sourceIdleTime.getValue() > 0);
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());

// --------------------------------- Binlog phase -----------------------------
makeFirstPartChangeStreamEvents(mongodbClient.getDatabase(customerDatabase), "customers");
// Wait until we receive 4 changes made above
int numBinlogRecordsExpected = 4;
int numBinlogRecordsReceived = 0;
while (numBinlogRecordsReceived < numBinlogRecordsExpected && iterator.hasNext()) {
iterator.next();
numBinlogRecordsReceived++;
}

// Check metrics
// numRecordsOut
assertEquals(
numSnapshotRecordsExpected + numBinlogRecordsExpected,
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(currentEmitEventTimeLag.getValue() > 0);
assertTrue(currentEmitEventTimeLag.getValue() < TIMEOUT.toMillis());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(currentFetchEventTimeLag.getValue() > 0);
assertTrue(currentFetchEventTimeLag.getValue() < TIMEOUT.toMillis());

// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
assertTrue(sourceIdleTime.getValue() > 0);
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());

jobClient.cancel().get();
iterator.close();
}

private <T> CollectResultIterator<T> addCollector(
StreamExecutionEnvironment env, DataStream<T> stream) {
TypeSerializer<T> serializer =
stream.getTransformation().getOutputType().createSerializer(env.getConfig()); //
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectSinkOperatorFactory<T> factory =
new CollectSinkOperatorFactory<>(serializer, accumulatorName);
CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator();
CollectResultIterator<T> iterator =
new CollectResultIterator<>(
operator.getOperatorIdFuture(),
serializer,
accumulatorName,
env.getCheckpointConfig(),
10000L);
CollectStreamSink<T> sink = new CollectStreamSink<>(stream, factory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());
return iterator;
}

private void makeFirstPartChangeStreamEvents(MongoDatabase mongoDatabase, String collection) {
MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(collection);
mongoCollection.updateOne(Filters.eq("cid", 101L), Updates.set("address", "Hangzhou"));
mongoCollection.deleteOne(Filters.eq("cid", 102L));
mongoCollection.insertOne(customerDocOf(102L, "user_2", "Shanghai", "123567891234"));
mongoCollection.updateOne(Filters.eq("cid", 103L), Updates.set("address", "Hangzhou"));
}

private Document customerDocOf(Long cid, String name, String address, String phoneNumber) {
Document document = new Document();
document.put("cid", cid);
document.put("name", name);
document.put("address", address);
document.put("phone_number", phoneNumber);
return document;
}

private String getCollectionNameRegex(String database, String[] captureCustomerCollections) {
checkState(captureCustomerCollections.length > 0);
if (captureCustomerCollections.length == 1) {
return database + "." + captureCustomerCollections[0];
} else {
// pattern that matches multiple collections
return Arrays.stream(captureCustomerCollections)
.map(coll -> "^(" + database + "." + coll + ")$")
.collect(Collectors.joining("|"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.flink.cdc.connectors.mongodb.source;

import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;

Expand All @@ -37,6 +39,7 @@

/** MongoDBSourceTestBase for MongoDB >= 5.0.3. */
public class MongoDBSourceTestBase {
protected InMemoryReporter metricReporter = InMemoryReporter.createWithRetainedMetrics();

public MongoDBSourceTestBase(String mongoVersion) {
this.mongoContainer =
Expand Down Expand Up @@ -68,6 +71,8 @@ public static String[] getMongoVersions() {
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.withHaLeadershipControl()
.setConfiguration(
metricReporter.addToConfiguration(new Configuration()))
.build());

@Before
Expand Down
Loading