Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] [Mongo-cdc] Fallback to timestamp startup mode when resume token has expired #8754

Open
wants to merge 26 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d272789
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode conflict…
jw-itq Aug 19, 2024
4ebc66b
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
6810d80
[Bug] [sink elasticsearch] the savemode of sink-es conficts with es a…
jw-itq Aug 19, 2024
ac50c3e
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
6e8ccc9
[Fix] [sink elasticsearch] Fix the issue of sink-es saveMode and es a…
jw-itq Aug 21, 2024
3936038
[Doc] Add IGNORE savemode type into docment #7443
jw-itq Aug 22, 2024
3bc24c2
Merge branch 'apache:dev' into dev
jw-itq Aug 22, 2024
c26e89d
Merge branch 'apache:dev' into dev
jw-itq Aug 26, 2024
b5f5162
Merge branch 'apache:dev' into dev
jw-itq Aug 28, 2024
0347da2
Merge branch 'apache:dev' into dev
jw-itq Sep 17, 2024
4bce27f
Merge branch 'apache:dev' into dev
jw-itq Nov 11, 2024
d4993f3
Merge branch 'apache:dev' into dev
jw-itq Nov 12, 2024
5d61e76
Merge branch 'apache:dev' into dev
jw-itq Nov 19, 2024
9f85118
Merge branch 'apache:dev' into dev
jw-itq Dec 2, 2024
bc41c79
Merge branch 'apache:dev' into dev
jw-itq Dec 4, 2024
4b21c83
Merge branch 'apache:dev' into dev
jw-itq Jan 2, 2025
af53478
Merge branch 'apache:dev' into dev
jw-itq Jan 14, 2025
1771001
Merge branch 'apache:dev' into dev
jw-itq Jan 17, 2025
2ea8f05
Merge branch 'apache:dev' into dev
jw-itq Jan 24, 2025
9e8179d
Merge branch 'apache:dev' into dev
jw-itq Jan 28, 2025
c65e6cb
Merge branch 'apache:dev' into dev
jw-itq Feb 5, 2025
5f2d00b
Merge branch 'apache:dev' into dev
jw-itq Feb 17, 2025
16b96dd
Avoid mongodb source to read data after high_watermark in backfill phase
jw-itq Feb 17, 2025
a33630e
Fallback to timestamp startup mode when resume token has expired
jw-itq Feb 17, 2025
e8346c3
fix the issue of mongo restore offset
jw-itq Feb 19, 2025
b1ed777
add mongo test case
jw-itq Feb 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Arrays.asList;

public class MongodbSourceOptions extends SourceOptions {

Expand Down Expand Up @@ -76,6 +80,25 @@ public class MongodbSourceOptions extends SourceOptions {

public static final int ILLEGAL_OPERATION_ERROR = 20;

public static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
public static final int CHANGE_STREAM_FATAL_ERROR = 280;
public static final int CHANGE_STREAM_HISTORY_LOST = 286;
public static final int BSON_OBJECT_TOO_LARGE = 10334;

public static final Set<Integer> INVALID_CHANGE_STREAM_ERRORS =
new HashSet<>(
asList(
INVALIDATED_RESUME_TOKEN_ERROR,
CHANGE_STREAM_FATAL_ERROR,
CHANGE_STREAM_HISTORY_LOST,
BSON_OBJECT_TOO_LARGE));

public static final String RESUME_TOKEN = "resume token";
public static final String NOT_FOUND = "not found";
public static final String DOES_NOT_EXIST = "does not exist";
public static final String INVALID_RESUME_TOKEN = "invalid resume token";
public static final String NO_LONGER_IN_THE_OPLOG = "no longer be in the oplog";

public static final int UNKNOWN_FIELD_ERROR = 40415;

public static final String DROPPED_FIELD = "dropped";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils;

import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -117,7 +118,23 @@ public void execute(Context context) {
this.taskRunning = true;
try {
while (taskRunning) {
Optional<BsonDocument> next = Optional.ofNullable(changeStreamCursor.tryNext());
Optional<BsonDocument> next;
try {
next = Optional.ofNullable(changeStreamCursor.tryNext());
} catch (MongoCommandException e) {
if (MongodbUtils.checkIfChangeStreamCursorExpires(e)) {
log.warn("Change stream cursor has expired, trying to recreate cursor");
boolean resumeTokenExpires = MongodbUtils.checkIfResumeTokenExpires(e);
if (resumeTokenExpires) {
log.warn(
"Resume token has expired, fallback to timestamp restart mode");
}
changeStreamCursor = openChangeStreamCursor(descriptor, resumeTokenExpires);
next = Optional.ofNullable(changeStreamCursor.tryNext());
} else {
throw e;
}
}
SourceRecord changeRecord = null;
if (!next.isPresent()) {
long untilNext = nextUpdate - time.milliseconds();
Expand Down Expand Up @@ -158,14 +175,18 @@ public void execute(Context context) {
valueDocument);
}

if (changeRecord != null) {
if (changeRecord != null && !isBoundedRead()) {
queue.enqueue(new DataChangeEvent(changeRecord));
}

if (isBoundedRead()) {
ChangeStreamOffset currentOffset;
if (changeRecord != null) {
currentOffset = new ChangeStreamOffset(getResumeToken(changeRecord));
// The log after the high watermark won't emit.
if (currentOffset.isAtOrBefore(streamSplit.getStopOffset())) {
queue.enqueue(new DataChangeEvent(changeRecord));
}
} else {
// Heartbeat is not turned on or there is no update event
currentOffset = new ChangeStreamOffset(getCurrentClusterTime(mongoClient));
Expand Down Expand Up @@ -215,6 +236,11 @@ public IncrementalSplit getSplit() {

private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(
ChangeStreamDescriptor changeStreamDescriptor) {
return openChangeStreamCursor(changeStreamDescriptor, false);
}

private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(
ChangeStreamDescriptor changeStreamDescriptor, boolean forceTimestampStartup) {
ChangeStreamOffset offset =
new ChangeStreamOffset(streamSplit.getStartupOffset().getOffset());

Expand All @@ -224,7 +250,7 @@ private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(
BsonDocument resumeToken = offset.getResumeToken();
BsonTimestamp timestamp = offset.getTimestamp();

if (resumeToken != null) {
if (resumeToken != null && !forceTimestampStartup) {
if (supportsStartAfter) {
log.info("Open the change stream after the previous offset: {}", resumeToken);
changeStreamIterable.startAfter(resumeToken);
Expand All @@ -238,6 +264,11 @@ private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(
if (supportsStartAtOperationTime) {
log.info("Open the change stream at the timestamp: {}", timestamp);
changeStreamIterable.startAtOperationTime(timestamp);
} else if (forceTimestampStartup) {
log.error("Open change stream failed. Unable to resume from timestamp");
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT,
"Open change stream failed. Unable to resume from timestamp");
} else {
log.warn("Open the change stream of the latest offset");
}
Expand Down Expand Up @@ -273,6 +304,9 @@ private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(
"Unauthorized $changeStream operation: %s %s",
e.getErrorMessage(), e.getErrorCode()));

} else if (!forceTimestampStartup && MongodbUtils.checkIfResumeTokenExpires(e)) {
log.info("Failed to open cursor with resume token, fallback to timestamp startup");
return openChangeStreamCursor(changeStreamDescriptor, true);
} else {
throw new MongodbConnectorException(ILLEGAL_ARGUMENT, "Open change stream failed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.bson.conversions.Bson;

import com.mongodb.ConnectionString;
import com.mongodb.MongoCommandException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
Expand All @@ -49,6 +50,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -63,13 +65,20 @@
import static com.mongodb.client.model.Sorts.ascending;
import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ADD_NS_FIELD_NAME;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.CHANGE_STREAM_FATAL_ERROR;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COMMAND_SUCCEED_FLAG;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOES_NOT_EXIST;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DROPPED_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.INVALID_CHANGE_STREAM_ERRORS;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.INVALID_RESUME_TOKEN;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.MAX_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.MIN_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NOT_FOUND;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NO_LONGER_IN_THE_OPLOG;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.RESUME_TOKEN;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SHARD_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.UUID_FIELD;
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.CollectionDiscoveryUtils.ADD_NS_FIELD;
Expand Down Expand Up @@ -404,4 +413,25 @@ public static String encodeValue(String value) {
throw new MongodbConnectorException(ILLEGAL_ARGUMENT, e.getMessage());
}
}

// Checks if given exception is caused by change stream cursor issues, including
// network connection failures, sharded cluster changes, or invalidate events.
// See: https://www.mongodb.com/docs/manual/changeStreams/ for more details.
public static boolean checkIfChangeStreamCursorExpires(final MongoCommandException e) {
return INVALID_CHANGE_STREAM_ERRORS.contains(e.getCode());
}

// This check is stricter than checkIfChangeStreamCursorExpires, which specifically
// checks if given exception is caused by an expired resume token.
public static boolean checkIfResumeTokenExpires(final MongoCommandException e) {
if (e.getCode() != CHANGE_STREAM_FATAL_ERROR) {
return false;
}
String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT);
return (errorMessage.contains(RESUME_TOKEN))
&& (errorMessage.contains(NOT_FOUND)
|| errorMessage.contains(DOES_NOT_EXIST)
|| errorMessage.contains(INVALID_RESUME_TOKEN)
|| errorMessage.contains(NO_LONGER_IN_THE_OPLOG));
}
}
Loading