Skip to content

Commit 9f67d79

Browse files
committed
[FLINK-36366][core] Remove deprecated serializer related config option and method
1 parent da0c758 commit 9f67d79

File tree

33 files changed

+243
-932
lines changed

33 files changed

+243
-932
lines changed

flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java

-354
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.common;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
23+
24+
import com.esotericsoftware.kryo.Serializer;
25+
26+
import java.io.Serializable;
27+
28+
/**
29+
* The wrapper to make serializer serializable.
30+
*
31+
* <p>This can be removed after {@link KryoSerializer} only allow serializer class.
32+
*/
33+
@Internal
34+
public class SerializableSerializer<T extends Serializer<?> & Serializable>
35+
implements Serializable {
36+
private static final long serialVersionUID = 4687893502781067189L;
37+
38+
private T serializer;
39+
40+
public SerializableSerializer(T serializer) {
41+
this.serializer = serializer;
42+
}
43+
44+
public T getSerializer() {
45+
return serializer;
46+
}
47+
}

flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java

-21
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.annotation.PublicEvolving;
23-
import org.apache.flink.api.common.ExecutionConfig;
2423
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
2524
import org.apache.flink.configuration.PipelineOptions;
2625
import org.apache.flink.configuration.ReadableConfig;
@@ -128,30 +127,10 @@ <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(
128127
@Internal
129128
void registerKryoType(Class<?> type);
130129

131-
/**
132-
* Returns the registered types with Kryo Serializers.
133-
*
134-
* @deprecated The method is deprecated because instance-type Kryo serializer definition based
135-
* on {@link ExecutionConfig.SerializableSerializer} is deprecated. Use class-type Kryo
136-
* serializers instead.
137-
*/
138-
@Deprecated
139-
LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
140-
getRegisteredTypesWithKryoSerializers();
141-
142130
/** Returns the registered types with their Kryo Serializer classes. */
143131
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
144132
getRegisteredTypesWithKryoSerializerClasses();
145133

146-
/**
147-
* Returns the registered default Kryo Serializers.
148-
*
149-
* @deprecated The method is deprecated because {@link ExecutionConfig.SerializableSerializer}
150-
* is deprecated.
151-
*/
152-
@Deprecated
153-
LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> getDefaultKryoSerializers();
154-
155134
/** Returns the registered default Kryo Serializer classes. */
156135
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getDefaultKryoSerializerClasses();
157136

flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java

+9-26
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.flink.api.common.serialization;
2020

2121
import org.apache.flink.annotation.Internal;
22-
import org.apache.flink.api.common.ExecutionConfig;
22+
import org.apache.flink.api.common.SerializableSerializer;
2323
import org.apache.flink.api.common.functions.InvalidTypesException;
2424
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
2525
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -55,14 +55,14 @@ public final class SerializerConfigImpl implements SerializerConfig {
5555
// we store them in linked maps/sets to ensure they are registered in order in all kryo
5656
// instances.
5757

58-
private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
59-
registeredTypesWithKryoSerializers = new LinkedHashMap<>();
58+
private LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers =
59+
new LinkedHashMap<>();
6060

6161
private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
6262
registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
6363

64-
private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
65-
defaultKryoSerializers = new LinkedHashMap<>();
64+
private LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers =
65+
new LinkedHashMap<>();
6666

6767
private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses =
6868
new LinkedHashMap<>();
@@ -104,7 +104,7 @@ public <T extends Serializer<?> & Serializable> void addDefaultKryoSerializer(
104104
throw new NullPointerException("Cannot register null class or serializer.");
105105
}
106106

107-
defaultKryoSerializers.put(type, new ExecutionConfig.SerializableSerializer<>(serializer));
107+
defaultKryoSerializers.put(type, new SerializableSerializer<>(serializer));
108108
}
109109

110110
/**
@@ -137,8 +137,7 @@ public <T extends Serializer<?> & Serializable> void registerTypeWithKryoSeriali
137137
throw new NullPointerException("Cannot register null class or serializer.");
138138
}
139139

140-
registeredTypesWithKryoSerializers.put(
141-
type, new ExecutionConfig.SerializableSerializer<>(serializer));
140+
registeredTypesWithKryoSerializers.put(type, new SerializableSerializer<>(serializer));
142141
}
143142

144143
/**
@@ -192,7 +191,7 @@ public void registerKryoType(Class<?> type) {
192191
}
193192

194193
/** Returns the registered types with Kryo Serializers. */
195-
public LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
194+
public LinkedHashMap<Class<?>, SerializableSerializer<?>>
196195
getRegisteredTypesWithKryoSerializers() {
197196
return registeredTypesWithKryoSerializers;
198197
}
@@ -204,8 +203,7 @@ public void registerKryoType(Class<?> type) {
204203
}
205204

206205
/** Returns the registered default Kryo Serializers. */
207-
public LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
208-
getDefaultKryoSerializers() {
206+
public LinkedHashMap<Class<?>, SerializableSerializer<?>> getDefaultKryoSerializers() {
209207
return defaultKryoSerializers;
210208
}
211209

@@ -359,21 +357,6 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
359357
.getOptional(PipelineOptions.FORCE_KRYO_AVRO)
360358
.ifPresent(this::setForceKryoAvro);
361359

362-
configuration
363-
.getOptional(PipelineOptions.KRYO_DEFAULT_SERIALIZERS)
364-
.map(s -> parseKryoSerializersWithExceptionHandling(classLoader, s))
365-
.ifPresent(s -> this.defaultKryoSerializerClasses = s);
366-
367-
configuration
368-
.getOptional(PipelineOptions.POJO_REGISTERED_CLASSES)
369-
.map(c -> loadClasses(c, classLoader, "Could not load pojo type to be registered."))
370-
.ifPresent(c -> this.registeredPojoTypes = c);
371-
372-
configuration
373-
.getOptional(PipelineOptions.KRYO_REGISTERED_CLASSES)
374-
.map(c -> loadClasses(c, classLoader, "Could not load kryo type to be registered."))
375-
.ifPresent(c -> this.registeredKryoTypes = c);
376-
377360
try {
378361
configuration
379362
.getOptional(PipelineOptions.SERIALIZATION_CONFIG)

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.apache.flink.api.java.typeutils.runtime;
2020

2121
import org.apache.flink.annotation.Internal;
22-
import org.apache.flink.api.common.ExecutionConfig;
22+
import org.apache.flink.api.common.SerializableSerializer;
2323
import org.apache.flink.util.Preconditions;
2424

2525
import com.esotericsoftware.kryo.Kryo;
@@ -60,8 +60,7 @@ public enum SerializerDefinitionType {
6060
* serializer definition type is {@link SerializerDefinitionType#INSTANCE}.
6161
*/
6262
@Nullable
63-
private final ExecutionConfig.SerializableSerializer<? extends Serializer<?>>
64-
serializableSerializerInstance;
63+
private final SerializableSerializer<? extends Serializer<?>> serializableSerializerInstance;
6564

6665
private final SerializerDefinitionType serializerDefinitionType;
6766

@@ -86,8 +85,7 @@ public KryoRegistration(
8685

8786
public KryoRegistration(
8887
Class<?> registeredClass,
89-
ExecutionConfig.SerializableSerializer<? extends Serializer<?>>
90-
serializableSerializerInstance) {
88+
SerializableSerializer<? extends Serializer<?>> serializableSerializerInstance) {
9189
this.registeredClass = Preconditions.checkNotNull(registeredClass);
9290

9391
this.serializerClass = null;
@@ -111,8 +109,7 @@ public Class<? extends Serializer<?>> getSerializerClass() {
111109
}
112110

113111
@Nullable
114-
public ExecutionConfig.SerializableSerializer<? extends Serializer<?>>
115-
getSerializableSerializerInstance() {
112+
public SerializableSerializer<? extends Serializer<?>> getSerializableSerializerInstance() {
116113
return serializableSerializerInstance;
117114
}
118115

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java

+16-18
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
package org.apache.flink.api.java.typeutils.runtime.kryo;
2020

2121
import org.apache.flink.annotation.VisibleForTesting;
22-
import org.apache.flink.api.common.ExecutionConfig;
23-
import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;
22+
import org.apache.flink.api.common.SerializableSerializer;
2423
import org.apache.flink.api.common.serialization.SerializerConfig;
24+
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
2525
import org.apache.flink.api.common.typeutils.TypeSerializer;
2626
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
2727
import org.apache.flink.api.java.typeutils.AvroUtils;
@@ -138,8 +138,7 @@ private static ChillSerializerRegistrar loadFlinkChillPackageRegistrar() {
138138

139139
// ------------------------------------------------------------------------
140140

141-
private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
142-
defaultSerializers;
141+
private final LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultSerializers;
143142
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
144143

145144
/**
@@ -167,8 +166,7 @@ private static ChillSerializerRegistrar loadFlinkChillPackageRegistrar() {
167166
// ------------------------------------------------------------------------
168167
// legacy fields; these fields cannot yet be removed to retain backwards compatibility
169168

170-
private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
171-
registeredTypesWithSerializers;
169+
private LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithSerializers;
172170
private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
173171
registeredTypesWithSerializerClasses;
174172
private LinkedHashSet<Class<?>> registeredTypes;
@@ -181,15 +179,17 @@ private static ChillSerializerRegistrar loadFlinkChillPackageRegistrar() {
181179
public KryoSerializer(Class<T> type, SerializerConfig serializerConfig) {
182180
this.type = checkNotNull(type);
183181

184-
this.defaultSerializers = serializerConfig.getDefaultKryoSerializers();
182+
this.defaultSerializers =
183+
((SerializerConfigImpl) serializerConfig).getDefaultKryoSerializers();
185184
this.defaultSerializerClasses = serializerConfig.getDefaultKryoSerializerClasses();
186185

187186
this.kryoRegistrations =
188187
buildKryoRegistrations(
189188
this.type,
190189
serializerConfig.getRegisteredKryoTypes(),
191190
serializerConfig.getRegisteredTypesWithKryoSerializerClasses(),
192-
serializerConfig.getRegisteredTypesWithKryoSerializers(),
191+
((SerializerConfigImpl) serializerConfig)
192+
.getRegisteredTypesWithKryoSerializers(),
193193
serializerConfig.isForceKryoAvroEnabled());
194194
}
195195

@@ -206,7 +206,7 @@ protected KryoSerializer(KryoSerializer<T> toCopy) {
206206
CollectionUtil.newLinkedHashMapWithExpectedSize(toCopy.kryoRegistrations.size());
207207

208208
// deep copy the serializer instances in defaultSerializers
209-
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
209+
for (Map.Entry<Class<?>, SerializableSerializer<?>> entry :
210210
toCopy.defaultSerializers.entrySet()) {
211211

212212
this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue()));
@@ -220,7 +220,7 @@ protected KryoSerializer(KryoSerializer<T> toCopy) {
220220
if (kryoRegistration.getSerializerDefinitionType()
221221
== KryoRegistration.SerializerDefinitionType.INSTANCE) {
222222

223-
ExecutionConfig.SerializableSerializer<? extends Serializer<?>> serializerInstance =
223+
SerializableSerializer<? extends Serializer<?>> serializerInstance =
224224
kryoRegistration.getSerializableSerializerInstance();
225225

226226
if (serializerInstance != null) {
@@ -561,7 +561,7 @@ private void checkKryoInitialized() {
561561

562562
// Add default serializers first, so that the type registrations without a serializer
563563
// are registered with a default serializer
564-
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry :
564+
for (Map.Entry<Class<?>, SerializableSerializer<?>> entry :
565565
defaultSerializers.entrySet()) {
566566
kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
567567
}
@@ -606,8 +606,7 @@ private static LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(
606606
LinkedHashSet<Class<?>> registeredTypes,
607607
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
608608
registeredTypesWithSerializerClasses,
609-
LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
610-
registeredTypesWithSerializers,
609+
LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithSerializers,
611610
TernaryBoolean isForceAvroKryoEnabledOpt) {
612611

613612
final LinkedHashMap<String, KryoRegistration> kryoRegistrations = new LinkedHashMap<>();
@@ -629,9 +628,8 @@ private static LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(
629628
registeredTypeWithSerializerClassEntry.getValue()));
630629
}
631630

632-
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>>
633-
registeredTypeWithSerializerEntry :
634-
checkNotNull(registeredTypesWithSerializers).entrySet()) {
631+
for (Map.Entry<Class<?>, SerializableSerializer<?>> registeredTypeWithSerializerEntry :
632+
checkNotNull(registeredTypesWithSerializers).entrySet()) {
635633

636634
kryoRegistrations.put(
637635
registeredTypeWithSerializerEntry.getKey().getName(),
@@ -684,8 +682,8 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
684682
}
685683
}
686684

687-
private ExecutionConfig.SerializableSerializer<? extends Serializer<?>> deepCopySerializer(
688-
ExecutionConfig.SerializableSerializer<? extends Serializer<?>> original) {
685+
private SerializableSerializer<? extends Serializer<?>> deepCopySerializer(
686+
SerializableSerializer<? extends Serializer<?>> original) {
689687
try {
690688
return InstantiationUtil.clone(
691689
original, Thread.currentThread().getContextClassLoader());

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshot.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.flink.api.java.typeutils.runtime.kryo;
2020

21-
import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;
21+
import org.apache.flink.api.common.SerializableSerializer;
2222
import org.apache.flink.api.common.typeutils.TypeSerializer;
2323
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
2424
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerSnapshotData.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.apache.flink.api.java.typeutils.runtime.kryo;
2020

21-
import org.apache.flink.api.common.ExecutionConfig.SerializableSerializer;
21+
import org.apache.flink.api.common.SerializableSerializer;
2222
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
2323
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
2424
import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.api.common.serialization.SerializerConfig;
23+
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
2324
import org.apache.flink.api.common.typeinfo.TypeInformation;
2425
import org.apache.flink.api.common.typeutils.CompositeType;
2526
import org.apache.flink.api.java.typeutils.AvroUtils;
@@ -89,7 +90,7 @@ public static void recursivelyRegisterType(
8990
if (type.isArray()) {
9091
recursivelyRegisterType(type.getComponentType(), config, alreadySeen);
9192
} else {
92-
config.registerKryoType(type);
93+
((SerializerConfigImpl) config).registerKryoType(type);
9394
// add serializers for Avro type if necessary
9495
AvroUtils.getAvroUtils().addAvroSerializersIfRequired(config, type);
9596

0 commit comments

Comments
 (0)