Skip to content

Commit 81d779b

Browse files
committed
[FLINK-32344][connectors/mongodb] Support unbounded streaming read via ChangeStream feature.
1 parent 49b7550 commit 81d779b

File tree

49 files changed

+3308
-334
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+3308
-334
lines changed

flink-connector-mongodb-e2e-tests/src/test/java/org/apache/flink/tests/util/mongodb/MongoE2ECase.java

+77-21
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@
2222
import org.apache.flink.connector.testframe.container.FlinkContainers;
2323
import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
2424
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
25+
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
2526
import org.apache.flink.test.resources.ResourceTestUtils;
2627
import org.apache.flink.test.util.SQLJobSubmission;
2728

2829
import com.mongodb.client.MongoClient;
2930
import com.mongodb.client.MongoClients;
31+
import com.mongodb.client.MongoCollection;
3032
import com.mongodb.client.MongoDatabase;
33+
import com.mongodb.client.model.Filters;
34+
import com.mongodb.client.model.Updates;
3135
import org.bson.Document;
3236
import org.bson.types.ObjectId;
3337
import org.junit.jupiter.api.AfterAll;
@@ -47,11 +51,13 @@
4751
import java.nio.file.Paths;
4852
import java.time.Duration;
4953
import java.util.ArrayList;
54+
import java.util.Arrays;
5055
import java.util.List;
5156
import java.util.stream.Collectors;
5257

5358
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.MONGODB_HOSTNAME;
5459
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.MONGO_4_0;
60+
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.MONGO_IMAGE_PREFIX;
5561
import static org.assertj.core.api.Assertions.assertThat;
5662

5763
/** End-to-end test for the MongoDB connectors. */
@@ -65,11 +71,11 @@ class MongoE2ECase {
6571
private static final Path SQL_CONNECTOR_MONGODB_JAR =
6672
ResourceTestUtils.getResource(".*mongodb.jar");
6773

68-
private static final int TEST_ORDERS_COUNT = 5;
74+
private static final int TEST_ORDERS_INITIAL_COUNT = 5;
6975

7076
@Container
7177
static final MongoDBContainer MONGO_CONTAINER =
72-
new MongoDBContainer(MONGO_4_0)
78+
new MongoDBContainer(MONGO_IMAGE_PREFIX + MONGO_4_0)
7379
.withLogConsumer(new Slf4jLogConsumer(LOG))
7480
.withNetwork(NETWORK)
7581
.withNetworkAliases(MONGODB_HOSTNAME);
@@ -85,7 +91,12 @@ class MongoE2ECase {
8591
public static final FlinkContainers FLINK =
8692
FlinkContainers.builder()
8793
.withFlinkContainersSettings(
88-
FlinkContainersSettings.builder().numTaskManagers(2).build())
94+
FlinkContainersSettings.builder()
95+
.setConfigOption(
96+
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
97+
Duration.ofSeconds(1))
98+
.numTaskManagers(2)
99+
.build())
89100
.withTestcontainersSettings(TESTCONTAINERS_SETTINGS)
90101
.build();
91102

@@ -107,12 +118,12 @@ static void teardown() {
107118
public void testUpsertSink() throws Exception {
108119
MongoDatabase db = mongoClient.getDatabase("test_upsert");
109120

110-
List<Document> orders = mockOrders();
121+
List<Document> orders = generateOrders();
111122
db.getCollection("orders").insertMany(orders);
112123

113124
executeSqlStatements(readSqlFile("e2e_upsert.sql"));
114125

115-
List<Document> ordersBackup = readAllBackupOrders(db);
126+
List<Document> ordersBackup = readAllBackupOrders(db, orders.size());
116127

117128
assertThat(ordersBackup).containsExactlyInAnyOrderElementsOf(orders);
118129
}
@@ -121,43 +132,88 @@ public void testUpsertSink() throws Exception {
121132
public void testAppendOnlySink() throws Exception {
122133
MongoDatabase db = mongoClient.getDatabase("test_append_only");
123134

124-
List<Document> orders = mockOrders();
135+
List<Document> orders = generateOrders();
125136
db.getCollection("orders").insertMany(orders);
126137

127138
executeSqlStatements(readSqlFile("e2e_append_only.sql"));
128139

129-
List<Document> ordersBackup = readAllBackupOrders(db);
140+
List<Document> ordersBackup = readAllBackupOrders(db, orders.size());
130141

131142
List<Document> expected = removeIdField(orders);
132143
assertThat(removeIdField(ordersBackup)).containsExactlyInAnyOrderElementsOf(expected);
133144
}
134145

135-
private static List<Document> readAllBackupOrders(MongoDatabase db) throws Exception {
136-
Deadline deadline = Deadline.fromNow(Duration.ofSeconds(20));
137-
List<Document> backupOrders;
138-
do {
139-
Thread.sleep(1000);
140-
backupOrders = db.getCollection("orders_bak").find().into(new ArrayList<>());
141-
} while (deadline.hasTimeLeft() && backupOrders.size() < TEST_ORDERS_COUNT);
146+
@Test
147+
public void testUnboundedSink() throws Exception {
148+
MongoDatabase db = mongoClient.getDatabase("test_unbounded");
149+
MongoCollection<Document> coll = db.getCollection("orders");
150+
151+
List<Document> orders = generateOrders();
152+
coll.insertMany(orders);
153+
154+
executeSqlStatements(readSqlFile("e2e_unbounded.sql"));
155+
156+
// -- scan records --
157+
List<Document> ordersBackup = readAllBackupOrders(db, orders.size());
158+
assertThat(ordersBackup).containsExactlyInAnyOrderElementsOf(orders);
159+
160+
// -- stream records --
161+
// insert 3 records
162+
List<Document> newOrders =
163+
Arrays.asList(generateOrder(6), generateOrder(7), generateOrder(8));
164+
coll.insertMany(newOrders);
165+
orders.addAll(newOrders);
166+
167+
// assert inserted
168+
ordersBackup = readAllBackupOrders(db, orders.size());
169+
assertThat(ordersBackup).containsExactlyInAnyOrderElementsOf(orders);
142170

143-
return backupOrders;
171+
// update 1 record
172+
Document updateOrder = orders.get(0);
173+
coll.updateOne(Filters.eq("_id", updateOrder.get("_id")), Updates.set("quantity", 1000L));
174+
175+
// replace 1 record
176+
Document replacement = Document.parse(orders.get(1).toJson());
177+
replacement.put("quantity", 1001L);
178+
coll.replaceOne(Filters.eq("_id", replacement.remove("_id")), replacement);
179+
180+
// asert updated
181+
ordersBackup = readAllBackupOrders(db, orders.size());
182+
assertThat(ordersBackup).containsExactlyInAnyOrderElementsOf(orders);
183+
}
184+
185+
private static List<Document> readAllBackupOrders(MongoDatabase db, int expectSize)
186+
throws Exception {
187+
Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30));
188+
MongoCollection<Document> coll = db.getCollection("orders_bak");
189+
while (deadline.hasTimeLeft()) {
190+
if (coll.countDocuments() < expectSize) {
191+
Thread.sleep(1000L);
192+
} else {
193+
break;
194+
}
195+
}
196+
return coll.find().into(new ArrayList<>());
144197
}
145198

146199
private static List<Document> removeIdField(List<Document> documents) {
147200
return documents.stream().peek(doc -> doc.remove("_id")).collect(Collectors.toList());
148201
}
149202

150-
private static List<Document> mockOrders() {
203+
private static List<Document> generateOrders() {
151204
List<Document> orders = new ArrayList<>();
152-
for (int i = 1; i <= TEST_ORDERS_COUNT; i++) {
153-
orders.add(
154-
new Document("_id", new ObjectId())
155-
.append("code", "ORDER_" + i)
156-
.append("quantity", i * 10L));
205+
for (int i = 1; i <= TEST_ORDERS_INITIAL_COUNT; i++) {
206+
orders.add(generateOrder(i));
157207
}
158208
return orders;
159209
}
160210

211+
private static Document generateOrder(int index) {
212+
return new Document("_id", new ObjectId())
213+
.append("code", "ORDER_" + index)
214+
.append("quantity", index * 10L);
215+
}
216+
161217
private static List<String> readSqlFile(final String resourceName) throws Exception {
162218
return Files.readAllLines(
163219
Paths.get(MongoE2ECase.class.getResource("/" + resourceName).toURI()));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
--/*
2+
-- * Licensed to the Apache Software Foundation (ASF) under one
3+
-- * or more contributor license agreements. See the NOTICE file
4+
-- * distributed with this work for additional information
5+
-- * regarding copyright ownership. The ASF licenses this file
6+
-- * to you under the Apache License, Version 2.0 (the
7+
-- * "License"); you may not use this file except in compliance
8+
-- * with the License. You may obtain a copy of the License at
9+
-- *
10+
-- * http://www.apache.org/licenses/LICENSE-2.0
11+
-- *
12+
-- * Unless required by applicable law or agreed to in writing, software
13+
-- * distributed under the License is distributed on an "AS IS" BASIS,
14+
-- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
-- * See the License for the specific language governing permissions and
16+
-- * limitations under the License.
17+
-- */
18+
19+
DROP TABLE IF EXISTS orders;
20+
DROP TABLE IF EXISTS orders_bak;
21+
22+
CREATE TABLE orders (
23+
`_id` STRING,
24+
`code` STRING,
25+
`quantity` BIGINT,
26+
PRIMARY KEY (_id) NOT ENFORCED
27+
) WITH (
28+
'connector' = 'mongodb',
29+
'uri' = 'mongodb://mongodb:27017',
30+
'database' = 'test_unbounded',
31+
'collection' = 'orders',
32+
'scan.startup.mode' = 'initial'
33+
);
34+
35+
CREATE TABLE orders_bak (
36+
`_id` STRING,
37+
`code` STRING,
38+
`quantity` BIGINT,
39+
PRIMARY KEY (_id) NOT ENFORCED
40+
) WITH (
41+
'connector' = 'mongodb',
42+
'uri' = 'mongodb://mongodb:27017',
43+
'database' = 'test_unbounded',
44+
'collection' = 'orders_bak'
45+
);
46+
47+
INSERT INTO orders_bak SELECT * FROM orders;

flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoConstants.java

+20
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,18 @@ public class MongoConstants {
5959

6060
public static final String DROPPED_FIELD = "dropped";
6161

62+
public static final String CLUSTER_TIME_FIELD = "clusterTime";
63+
64+
public static final String RESUME_TOKEN_FIELD = "resumeToken";
65+
66+
public static final String OPERATION_TYPE_FIELD = "operationType";
67+
68+
public static final String DOCUMENT_KEY_FIELD = "documentKey";
69+
70+
public static final String FULL_DOCUMENT_FIELD = "fullDocument";
71+
72+
public static final String FULL_DOCUMENT_BEFORE_CHANGE_FIELD = "fullDocumentBeforeChange";
73+
6274
public static final BsonValue BSON_MIN_KEY = new BsonMinKey();
6375

6476
public static final BsonValue BSON_MAX_KEY = new BsonMaxKey();
@@ -68,5 +80,13 @@ public class MongoConstants {
6880
public static final JsonWriterSettings DEFAULT_JSON_WRITER_SETTINGS =
6981
JsonWriterSettings.builder().outputMode(JsonMode.EXTENDED).build();
7082

83+
public static final int FAILED_TO_PARSE_ERROR = 9;
84+
85+
public static final int UNAUTHORIZED_ERROR = 13;
86+
87+
public static final int ILLEGAL_OPERATION_ERROR = 20;
88+
89+
public static final int UNKNOWN_FIELD_ERROR = 40415;
90+
7191
private MongoConstants() {}
7292
}

0 commit comments

Comments
 (0)