Skip to content

Commit 1c6face

Browse files
committed
Resolve conflicts
1 parent 9310d92 commit 1c6face

File tree

1 file changed

+21
-19
lines changed

1 file changed

+21
-19
lines changed

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java

+21-19
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ public int getVersion() {
263263
@Override
264264
public byte[] serialize(SchemaManager schemaManager) throws IOException {
265265
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
266-
DataOutputStream out = new DataOutputStream(baos)) {
266+
DataOutputStream out = new DataOutputStream(baos)) {
267267
serializeSchemaMap(schemaManager.evolvedSchemas, out);
268268
serializeSchemaMap(schemaManager.upstreamSchemas, out);
269269
out.writeUTF(schemaManager.getBehavior().name());
@@ -278,7 +278,8 @@ private static void serializeSchemaMap(
278278
SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
279279
// Number of tables
280280
out.writeInt(schemaMap.size());
281-
for (Map.Entry<TableId, SortedMap<Integer, Schema>> tableSchema : schemaMap.entrySet()) {
281+
for (Map.Entry<TableId, SortedMap<Integer, Schema>> tableSchema :
282+
schemaMap.entrySet()) {
282283
// Table ID
283284
TableId tableId = tableSchema.getKey();
284285
tableIdSerializer.serialize(tableId, new DataOutputViewStreamWrapper(out));
@@ -300,26 +301,26 @@ private static void serializeSchemaMap(
300301
@Override
301302
public SchemaManager deserialize(int version, byte[] serialized) throws IOException {
302303
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
303-
DataInputStream in = new DataInputStream(bais)) {
304+
DataInputStream in = new DataInputStream(bais)) {
304305
switch (version) {
305306
case 0:
306307
case 1:
307-
{
308-
Map<TableId, SortedMap<Integer, Schema>> schemas =
309-
deserializeSchemaMap(version, in);
310-
// In legacy mode, upstream schema and evolved schema never differs
311-
return new SchemaManager(schemas, schemas, SchemaChangeBehavior.EVOLVE);
312-
}
308+
{
309+
Map<TableId, SortedMap<Integer, Schema>> schemas =
310+
deserializeSchemaMap(version, in);
311+
// In legacy mode, upstream schema and evolved schema never differs
312+
return new SchemaManager(schemas, schemas, SchemaChangeBehavior.EVOLVE);
313+
}
313314
case 2:
314-
{
315-
Map<TableId, SortedMap<Integer, Schema>> evolvedSchemas =
316-
deserializeSchemaMap(version, in);
317-
Map<TableId, SortedMap<Integer, Schema>> upstreamSchemas =
318-
deserializeSchemaMap(version, in);
319-
SchemaChangeBehavior behavior =
320-
SchemaChangeBehavior.valueOf(in.readUTF());
321-
return new SchemaManager(upstreamSchemas, evolvedSchemas, behavior);
322-
}
315+
{
316+
Map<TableId, SortedMap<Integer, Schema>> evolvedSchemas =
317+
deserializeSchemaMap(version, in);
318+
Map<TableId, SortedMap<Integer, Schema>> upstreamSchemas =
319+
deserializeSchemaMap(version, in);
320+
SchemaChangeBehavior behavior =
321+
SchemaChangeBehavior.valueOf(in.readUTF());
322+
return new SchemaManager(upstreamSchemas, evolvedSchemas, behavior);
323+
}
323324
default:
324325
throw new RuntimeException("Unknown serialize version: " + version);
325326
}
@@ -343,7 +344,8 @@ private static Map<TableId, SortedMap<Integer, Schema>> deserializeSchemaMap(
343344
// Version
344345
int schemaVersion = in.readInt();
345346
Schema schema =
346-
schemaSerializer.deserialize(version, new DataInputViewStreamWrapper(in));
347+
schemaSerializer.deserialize(
348+
version, new DataInputViewStreamWrapper(in));
347349
versionedSchemas.put(schemaVersion, schema);
348350
}
349351
tableSchemas.put(tableId, versionedSchemas);

0 commit comments

Comments
 (0)