Skip to content

Commit 59e947a

Browse files
committed
fix
1 parent 0d53e01 commit 59e947a

File tree

3 files changed

+76
-81
lines changed

3 files changed

+76
-81
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/internal/DebeziumChangeFetcher.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,11 @@ private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEve
230230
for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) {
231231
SourceRecord record = event.value();
232232
updateMessageTimestamp(record);
233-
fetchDelay = isInDbSnapshotPhase ? -1L : processTime - messageTimestamp;
233+
if (messageTimestamp == 0L) {
234234

235+
} else {
236+
fetchDelay = isInDbSnapshotPhase ? -1L : processTime - messageTimestamp;
237+
}
235238
if (isHeartbeatEvent(record)) {
236239
// keep offset update
237240
synchronized (checkpointLock) {
@@ -268,15 +271,15 @@ private void emitRecordsUnderCheckpointLock(
268271
T record;
269272
while ((record = records.poll()) != null) {
270273
// If the snapshot does not end or no latest data is entered, -1 is reported
271-
if (messageTimestamp == 0) {
274+
if (messageTimestamp == 0L) {
272275

273276
} else {
274277
emitDelay =
275278
isInDbSnapshotPhase
276279
? -1L
277280
: System.currentTimeMillis() - messageTimestamp;
278-
sourceContext.collect(record);
279281
}
282+
sourceContext.collect(record);
280283
}
281284
// update offset to state
282285
debeziumOffset.setSourcePartition(sourcePartition);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/pom.xml

-8
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,6 @@ limitations under the License.
107107
<type>test-jar</type>
108108
<scope>test</scope>
109109
</dependency>
110-
<!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-mongodb-cdc -->
111-
<dependency>
112-
<groupId>com.ververica</groupId>
113-
<artifactId>flink-connector-mongodb-cdc</artifactId>
114-
<version>3.0.1</version>
115-
<scope>test</scope>
116-
</dependency>
117-
118110

119111
<dependency>
120112
<groupId>org.apache.flink</groupId>

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/MongoDBMetricCase.java

+70-70
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
package org.apache.flink.cdc.connectors.mongodb.source;
1+
package org.apache.flink.cdc.connectors.mongodb;
22

3-
import com.mongodb.client.MongoCollection;
4-
import com.mongodb.client.MongoDatabase;
5-
import com.mongodb.client.model.Filters;
6-
import com.mongodb.client.model.Updates;
73
import org.apache.flink.api.common.typeutils.TypeSerializer;
4+
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
5+
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
86
import org.apache.flink.core.execution.JobClient;
97
import org.apache.flink.metrics.Gauge;
108
import org.apache.flink.metrics.Metric;
@@ -14,23 +12,23 @@
1412
import org.apache.flink.streaming.api.datastream.DataStream;
1513
import org.apache.flink.streaming.api.datastream.DataStreamSource;
1614
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
15+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
1716
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
1817
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
1918
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
2019
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
20+
21+
import com.mongodb.client.MongoCollection;
22+
import com.mongodb.client.MongoDatabase;
23+
import com.mongodb.client.model.Filters;
24+
import com.mongodb.client.model.Updates;
2125
import org.bson.Document;
2226
import org.junit.Test;
2327
import org.junit.runner.RunWith;
2428
import org.junit.runners.Parameterized;
25-
import org.apache.flink.streaming.api.functions.source.SourceFunction;
26-
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
27-
import com.ververica.cdc.connectors.mongodb.MongoDBSource;
2829

2930
import java.time.Duration;
30-
import java.util.Arrays;
31-
import java.util.List;
32-
import java.util.Map;
33-
import java.util.UUID;
31+
import java.util.*;
3432
import java.util.stream.Collectors;
3533
import java.util.stream.Stream;
3634

@@ -40,7 +38,6 @@
4038
import static org.junit.Assert.assertEquals;
4139
import static org.junit.Assert.assertTrue;
4240

43-
4441
/** IT tests for {@link MongoDBSource}. */
4542
@RunWith(Parameterized.class)
4643
public class MongoDBMetricCase extends MongoDBSourceTestBase {
@@ -60,32 +57,38 @@ public void testSourceMetrics() throws Exception {
6057
String customerDatabase = mongoContainer.executeCommandFileInSeparateDatabase("customer");
6158
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
6259
env.setParallelism(1);
63-
SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
64-
.hosts(mongoContainer.getHostAndPort())
65-
.username(FLINK_USER)
66-
.password(FLINK_USER_PASSWORD)
67-
.databaseList(customerDatabase) // 设置捕获的数据库,支持正则表达式
68-
.collectionList(getCollectionNameRegex(customerDatabase, new String[] {"customers"})) //设置捕获的集合,支持正则表达式
69-
.deserializer(new JsonDebeziumDeserializationSchema())
70-
.build();
71-
DataStreamSource<String> stream =
72-
env.addSource(sourceFunction, "MongoDB CDC Source");
60+
env.enableCheckpointing(200L);
61+
SourceFunction<String> sourceFunction =
62+
MongoDBSource.<String>builder()
63+
.hosts(mongoContainer.getHostAndPort())
64+
.username(FLINK_USER)
65+
.password(FLINK_USER_PASSWORD)
66+
.databaseList(customerDatabase) // 设置捕获的数据库,支持正则表达式
67+
.collectionList(
68+
getCollectionNameRegex(
69+
customerDatabase,
70+
new String[] {"customers"})) // 设置捕获的集合,支持正则表达式
71+
.deserializer(new JsonDebeziumDeserializationSchema())
72+
.build();
73+
DataStreamSource<String> stream = env.addSource(sourceFunction, "MongoDB");
7374
CollectResultIterator<String> iterator = addCollector(env, stream);
7475
JobClient jobClient = env.executeAsync();
7576
iterator.setJobClient(jobClient);
7677

77-
// ---------------------------- Snapshot phase ------------------------------
78-
// Wait until we receive all 21 snapshot records
78+
// // ---------------------------- Snapshot phase ------------------------------
79+
// // Wait until we receive all 21 snapshot records
7980
int numSnapshotRecordsExpected = 21;
8081
int numSnapshotRecordsReceived = 0;
82+
8183
while (numSnapshotRecordsReceived < numSnapshotRecordsExpected && iterator.hasNext()) {
8284
iterator.next();
8385
numSnapshotRecordsReceived++;
8486
}
8587

8688
// Check metrics
8789
List<OperatorMetricGroup> metricGroups =
88-
metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MongoDB CDC Source");
90+
metricReporter.findOperatorMetricGroups(jobClient.getJobID(), "MongoDB");
91+
8992
// There should be only 1 parallelism of source, so it's safe to get the only group
9093
OperatorMetricGroup group = metricGroups.get(0);
9194
Map<String, Metric> metrics = metricReporter.getMetricsByGroup(group);
@@ -100,51 +103,48 @@ public void testSourceMetrics() throws Exception {
100103
Gauge<Long> currentEmitEventTimeLag =
101104
(Gauge<Long>) metrics.get(MetricNames.CURRENT_EMIT_EVENT_TIME_LAG);
102105
assertEquals(
103-
// InternalSourceReaderMetricGroup.UNDEFINED,
104-
-1L,
106+
InternalSourceReaderMetricGroup.UNDEFINED,
105107
(long) currentEmitEventTimeLag.getValue());
106-
107108
// currentFetchEventTimeLag should be UNDEFINED during snapshot phase
108-
// assertTrue(metrics.containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG));
109-
// Gauge<Long> currentFetchEventTimeLag =
110-
// (Gauge<Long>) metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
111-
// assertEquals(
112-
// -1L, (long) currentFetchEventTimeLag.getValue());
113-
114-
// // sourceIdleTime should be positive (we can't know the exact value)
115-
// assertTrue(metrics.containsKey(MetricNames.SOURCE_IDLE_TIME));
116-
// Gauge<Long> sourceIdleTime = (Gauge<Long>) metrics.get(MetricNames.SOURCE_IDLE_TIME);
117-
// assertTrue(sourceIdleTime.getValue() > 0);
118-
// assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());
119-
//
120-
// // --------------------------------- Binlog phase -----------------------------
121-
// makeFirstPartChangeStreamEvents(
122-
// mongodbClient.getDatabase(customerDatabase), "customers");
123-
// // Wait until we receive 4 changes made above
124-
// int numBinlogRecordsExpected = 4;
125-
// int numBinlogRecordsReceived = 0;
126-
// while (numBinlogRecordsReceived < numBinlogRecordsExpected && iterator.hasNext()) {
127-
// iterator.next();
128-
// numBinlogRecordsReceived++;
129-
// }
130-
//
131-
// // Check metrics
132-
// // numRecordsOut
133-
// assertEquals(
134-
// numSnapshotRecordsExpected + numBinlogRecordsExpected,
135-
// group.getIOMetricGroup().getNumRecordsOutCounter().getCount());
136-
//
137-
// // currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
138-
// assertTrue(currentEmitEventTimeLag.getValue() > 0);
139-
// assertTrue(currentEmitEventTimeLag.getValue() < TIMEOUT.toMillis());
140-
//
141-
// // currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
142-
// assertTrue(currentFetchEventTimeLag.getValue() > 0);
143-
// assertTrue(currentFetchEventTimeLag.getValue() < TIMEOUT.toMillis());
144-
//
145-
// // currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
146-
// assertTrue(sourceIdleTime.getValue() > 0);
147-
// assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());
109+
assertTrue(metrics.containsKey(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG));
110+
Gauge<Long> currentFetchEventTimeLag =
111+
(Gauge<Long>) metrics.get(MetricNames.CURRENT_FETCH_EVENT_TIME_LAG);
112+
assertEquals(
113+
InternalSourceReaderMetricGroup.UNDEFINED,
114+
(long) currentFetchEventTimeLag.getValue());
115+
// sourceIdleTime should be positive (we can't know the exact value)
116+
assertTrue(metrics.containsKey(MetricNames.SOURCE_IDLE_TIME));
117+
Gauge<Long> sourceIdleTime = (Gauge<Long>) metrics.get(MetricNames.SOURCE_IDLE_TIME);
118+
assertTrue(sourceIdleTime.getValue() > 0);
119+
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());
120+
121+
// --------------------------------- Binlog phase -----------------------------
122+
makeFirstPartChangeStreamEvents(mongodbClient.getDatabase(customerDatabase), "customers");
123+
// Wait until we receive 4 changes made above
124+
int numBinlogRecordsExpected = 4;
125+
int numBinlogRecordsReceived = 0;
126+
while (numBinlogRecordsReceived < numBinlogRecordsExpected && iterator.hasNext()) {
127+
iterator.next();
128+
numBinlogRecordsReceived++;
129+
}
130+
131+
// Check metrics
132+
// numRecordsOut
133+
assertEquals(
134+
numSnapshotRecordsExpected + numBinlogRecordsExpected,
135+
group.getIOMetricGroup().getNumRecordsOutCounter().getCount());
136+
137+
// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
138+
assertTrue(currentEmitEventTimeLag.getValue() > 0);
139+
assertTrue(currentEmitEventTimeLag.getValue() < TIMEOUT.toMillis());
140+
141+
// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
142+
assertTrue(currentFetchEventTimeLag.getValue() > 0);
143+
assertTrue(currentFetchEventTimeLag.getValue() < TIMEOUT.toMillis());
144+
145+
// currentEmitEventTimeLag should be reasonably positive (we can't know the exact value)
146+
assertTrue(sourceIdleTime.getValue() > 0);
147+
assertTrue(sourceIdleTime.getValue() < TIMEOUT.toMillis());
148148

149149
jobClient.cancel().get();
150150
iterator.close();
@@ -153,7 +153,7 @@ public void testSourceMetrics() throws Exception {
153153
private <T> CollectResultIterator<T> addCollector(
154154
StreamExecutionEnvironment env, DataStream<T> stream) {
155155
TypeSerializer<T> serializer =
156-
stream.getTransformation().getOutputType().createSerializer(env.getConfig());
156+
stream.getTransformation().getOutputType().createSerializer(env.getConfig()); //
157157
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
158158
CollectSinkOperatorFactory<T> factory =
159159
new CollectSinkOperatorFactory<>(serializer, accumulatorName);

0 commit comments

Comments
 (0)