Skip to content

Commit 20a00d4

Browse files
morozov姜卓君
authored and
姜卓君
committed
[FLINK-36891[[source-connector][mysql] Fix corrupted state in case of serialization failure in MySQL CDC Source
This closes #3794.
1 parent a6b236f commit 20a00d4

File tree

4 files changed

+168
-42
lines changed

4 files changed

+168
-42
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java

+25-21
Original file line numberDiff line numberDiff line change
@@ -80,28 +80,32 @@ public byte[] serialize(PendingSplitsState state) throws IOException {
8080
}
8181
final DataOutputSerializer out = SERIALIZER_CACHE.get();
8282

83-
out.writeInt(splitSerializer.getVersion());
84-
if (state instanceof SnapshotPendingSplitsState) {
85-
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
86-
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
87-
} else if (state instanceof StreamPendingSplitsState) {
88-
out.writeInt(STREAM_PENDING_SPLITS_STATE_FLAG);
89-
serializeStreamPendingSplitsState((StreamPendingSplitsState) state, out);
90-
} else if (state instanceof HybridPendingSplitsState) {
91-
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
92-
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
93-
} else {
94-
throw new IOException(
95-
"Unsupported to serialize PendingSplitsState class: "
96-
+ state.getClass().getName());
97-
}
83+
try {
84+
out.writeInt(splitSerializer.getVersion());
85+
if (state instanceof SnapshotPendingSplitsState) {
86+
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
87+
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
88+
} else if (state instanceof StreamPendingSplitsState) {
89+
out.writeInt(STREAM_PENDING_SPLITS_STATE_FLAG);
90+
serializeStreamPendingSplitsState((StreamPendingSplitsState) state, out);
91+
} else if (state instanceof HybridPendingSplitsState) {
92+
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
93+
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
94+
} else {
95+
throw new IOException(
96+
"Unsupported to serialize PendingSplitsState class: "
97+
+ state.getClass().getName());
98+
}
99+
100+
final byte[] result = out.getCopyOfBuffer();
101+
// optimization: cache the serialized from, so we avoid the byte work during repeated
102+
// serialization
103+
state.serializedFormCache = result;
98104

99-
final byte[] result = out.getCopyOfBuffer();
100-
// optimization: cache the serialized from, so we avoid the byte work during repeated
101-
// serialization
102-
state.serializedFormCache = result;
103-
out.clear();
104-
return result;
105+
return result;
106+
} finally {
107+
out.clear();
108+
}
105109
}
106110

107111
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.base.source.assigner.state;
19+
20+
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
21+
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
22+
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
23+
24+
import io.debezium.relational.TableId;
25+
import org.junit.Test;
26+
27+
import java.io.IOException;
28+
import java.util.Map;
29+
30+
import static org.junit.Assert.assertArrayEquals;
31+
import static org.junit.Assert.assertThrows;
32+
33+
/** Tests for {@link PendingSplitsStateSerializer}. */
34+
public class PendingSplitsStateSerializerTest {
35+
36+
private final TableId tableId = TableId.parse("catalog.schema.table1");
37+
38+
@Test
39+
public void testOutputIsFinallyCleared() throws Exception {
40+
PendingSplitsStateSerializer serializer =
41+
new PendingSplitsStateSerializer(constructSourceSplitSerializer());
42+
StreamPendingSplitsState state = new StreamPendingSplitsState(true);
43+
44+
final byte[] ser1 = serializer.serialize(state);
45+
state.serializedFormCache = null;
46+
47+
PendingSplitsState unsupportedState = new UnsupportedPendingSplitsState();
48+
49+
assertThrows(IOException.class, () -> serializer.serialize(unsupportedState));
50+
51+
final byte[] ser2 = serializer.serialize(state);
52+
assertArrayEquals(ser1, ser2);
53+
}
54+
55+
private SourceSplitSerializer constructSourceSplitSerializer() {
56+
return new SourceSplitSerializer() {
57+
@Override
58+
public OffsetFactory getOffsetFactory() {
59+
return new OffsetFactory() {
60+
@Override
61+
public Offset newOffset(Map<String, String> offset) {
62+
return null;
63+
}
64+
65+
@Override
66+
public Offset newOffset(String filename, Long position) {
67+
return null;
68+
}
69+
70+
@Override
71+
public Offset newOffset(Long position) {
72+
return null;
73+
}
74+
75+
@Override
76+
public Offset createTimestampOffset(long timestampMillis) {
77+
return null;
78+
}
79+
80+
@Override
81+
public Offset createInitialOffset() {
82+
return null;
83+
}
84+
85+
@Override
86+
public Offset createNoStoppingOffset() {
87+
return null;
88+
}
89+
};
90+
}
91+
};
92+
}
93+
94+
/** An implementation for {@link PendingSplitsState} which will cause a serialization error. */
95+
static class UnsupportedPendingSplitsState extends PendingSplitsState {}
96+
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java

+25-21
Original file line numberDiff line numberDiff line change
@@ -72,28 +72,32 @@ public byte[] serialize(PendingSplitsState state) throws IOException {
7272
}
7373
final DataOutputSerializer out = SERIALIZER_CACHE.get();
7474

75-
out.writeInt(splitSerializer.getVersion());
76-
if (state instanceof SnapshotPendingSplitsState) {
77-
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
78-
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
79-
} else if (state instanceof BinlogPendingSplitsState) {
80-
out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
81-
serializeBinlogPendingSplitsState((BinlogPendingSplitsState) state, out);
82-
} else if (state instanceof HybridPendingSplitsState) {
83-
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
84-
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
85-
} else {
86-
throw new IOException(
87-
"Unsupported to serialize PendingSplitsState class: "
88-
+ state.getClass().getName());
89-
}
75+
try {
76+
out.writeInt(splitSerializer.getVersion());
77+
if (state instanceof SnapshotPendingSplitsState) {
78+
out.writeInt(SNAPSHOT_PENDING_SPLITS_STATE_FLAG);
79+
serializeSnapshotPendingSplitsState((SnapshotPendingSplitsState) state, out);
80+
} else if (state instanceof BinlogPendingSplitsState) {
81+
out.writeInt(BINLOG_PENDING_SPLITS_STATE_FLAG);
82+
serializeBinlogPendingSplitsState((BinlogPendingSplitsState) state, out);
83+
} else if (state instanceof HybridPendingSplitsState) {
84+
out.writeInt(HYBRID_PENDING_SPLITS_STATE_FLAG);
85+
serializeHybridPendingSplitsState((HybridPendingSplitsState) state, out);
86+
} else {
87+
throw new IOException(
88+
"Unsupported to serialize PendingSplitsState class: "
89+
+ state.getClass().getName());
90+
}
91+
92+
final byte[] result = out.getCopyOfBuffer();
93+
// optimization: cache the serialized from, so we avoid the byte work during repeated
94+
// serialization
95+
state.serializedFormCache = result;
9096

91-
final byte[] result = out.getCopyOfBuffer();
92-
// optimization: cache the serialized from, so we avoid the byte work during repeated
93-
// serialization
94-
state.serializedFormCache = result;
95-
out.clear();
96-
return result;
97+
return result;
98+
} finally {
99+
out.clear();
100+
}
97101
}
98102

99103
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializerTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.junit.runner.RunWith;
3434
import org.junit.runners.Parameterized;
3535

36+
import java.io.IOException;
3637
import java.util.ArrayList;
3738
import java.util.Arrays;
3839
import java.util.Collection;
@@ -42,8 +43,10 @@
4243
import java.util.Map;
4344

4445
import static org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit.generateSplitId;
46+
import static org.junit.Assert.assertArrayEquals;
4547
import static org.junit.Assert.assertEquals;
4648
import static org.junit.Assert.assertSame;
49+
import static org.junit.Assert.assertThrows;
4750

4851
/**
4952
* Tests for {@link
@@ -102,6 +105,22 @@ public void testRepeatedSerializationCache() throws Exception {
102105
assertSame(ser1, ser3);
103106
}
104107

108+
@Test
109+
public void testOutputIsFinallyCleared() throws Exception {
110+
final PendingSplitsStateSerializer serializer =
111+
new PendingSplitsStateSerializer(MySqlSplitSerializer.INSTANCE);
112+
113+
final byte[] ser1 = serializer.serialize(state);
114+
state.serializedFormCache = null;
115+
116+
PendingSplitsState unsupportedState = new UnsupportedPendingSplitsState();
117+
118+
assertThrows(IOException.class, () -> serializer.serialize(unsupportedState));
119+
120+
final byte[] ser2 = serializer.serialize(state);
121+
assertArrayEquals(ser1, ser2);
122+
}
123+
105124
static PendingSplitsState serializeAndDeserializeSourceEnumState(PendingSplitsState state)
106125
throws Exception {
107126
final PendingSplitsStateSerializer serializer =
@@ -270,4 +289,7 @@ public TableEditor edit() {
270289
throw new UnsupportedOperationException("Not implemented.");
271290
}
272291
}
292+
293+
/** An implementation for {@link PendingSplitsState} which will cause a serialization error. */
294+
static class UnsupportedPendingSplitsState extends PendingSplitsState {}
273295
}

0 commit comments

Comments
 (0)