Skip to content

Commit f2568de

Browse files
xiangyufZakelly
authored andcommitted
[FLINK-35780][state] Support state migration between disabling and enabling state ttl in RocksDBKeyedStateBackend (#25035)
1 parent 41379fb commit f2568de

File tree

18 files changed

+391
-75
lines changed

18 files changed

+391
-75
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSerializerProvider.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
2424
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
2525
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
26+
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
27+
import org.apache.flink.runtime.state.ttl.TtlAwareSerializerSnapshotWrapper;
2628
import org.apache.flink.util.Preconditions;
2729

2830
import javax.annotation.Nonnull;
@@ -294,7 +296,7 @@ private static class LazilyRegisteredStateSerializerProvider<T>
294296

295297
@Nonnull
296298
@Override
297-
@SuppressWarnings("ConstantConditions")
299+
@SuppressWarnings({"ConstantConditions", "unchecked", "rawtypes"})
298300
public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredState(
299301
TypeSerializer<T> newSerializer) {
300302
checkNotNull(newSerializer);
@@ -303,10 +305,14 @@ public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredStat
303305
"A serializer has already been registered for the state; re-registration is not allowed.");
304306
}
305307

308+
// Use wrapped ttl serializer for compatibility check
306309
TypeSerializerSchemaCompatibility<T> result =
307-
newSerializer
310+
TtlAwareSerializer.wrapTtlAwareSerializer(newSerializer)
308311
.snapshotConfiguration()
309-
.resolveSchemaCompatibility(previousSerializerSnapshot);
312+
.resolveSchemaCompatibility(
313+
new TtlAwareSerializerSnapshotWrapper(
314+
previousSerializerSnapshot)
315+
.getTtlAwareSerializerSnapshot());
310316
if (result.isIncompatible()) {
311317
invalidateCurrentSchemaSerializerAccess();
312318
}
@@ -349,6 +355,7 @@ public TypeSerializerSchemaCompatibility<T> registerNewSerializerForRestoredStat
349355

350356
@Nonnull
351357
@Override
358+
@SuppressWarnings({"unchecked", "rawtypes"})
352359
public TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRestoredState(
353360
TypeSerializerSnapshot<T> previousSerializerSnapshot) {
354361
checkNotNull(previousSerializerSnapshot);
@@ -359,10 +366,15 @@ public TypeSerializerSchemaCompatibility<T> setPreviousSerializerSnapshotForRest
359366

360367
this.previousSerializerSnapshot = previousSerializerSnapshot;
361368

369+
// Use wrapped ttl serializer for compatibility check
362370
TypeSerializerSchemaCompatibility<T> result =
363-
Preconditions.checkNotNull(registeredSerializer)
371+
TtlAwareSerializer.wrapTtlAwareSerializer(
372+
Preconditions.checkNotNull(registeredSerializer))
364373
.snapshotConfiguration()
365-
.resolveSchemaCompatibility(previousSerializerSnapshot);
374+
.resolveSchemaCompatibility(
375+
new TtlAwareSerializerSnapshotWrapper(
376+
previousSerializerSnapshot)
377+
.getTtlAwareSerializerSnapshot());
366378
if (result.isIncompatible()) {
367379
invalidateCurrentSchemaSerializerAccess();
368380
}

flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.flink.runtime.state.StateSnapshotTransformers;
5252
import org.apache.flink.runtime.state.StreamCompressionDecorator;
5353
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
54+
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
5455
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
5556
import org.apache.flink.util.FlinkRuntimeException;
5657
import org.apache.flink.util.StateMigrationException;
@@ -253,6 +254,17 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
253254
+ ").");
254255
}
255256

257+
// HeapKeyedStateBackend doesn't support ttl state migration currently.
258+
if (TtlAwareSerializer.needTtlStateMigration(
259+
previousStateSerializer, newStateSerializer)) {
260+
throw new StateMigrationException(
261+
"For heap backends, the new state serializer ("
262+
+ newStateSerializer
263+
+ ") must not need ttl state migration with the old state serializer ("
264+
+ previousStateSerializer
265+
+ ").");
266+
}
267+
256268
restoredKvMetaInfo =
257269
allowFutureMetadataUpdates
258270
? restoredKvMetaInfo.withSerializerUpgradesAllowed()

flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ void testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
154154
e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
155155
}
156156

157-
private void testKeyedValueStateUpgrade(
157+
protected void testKeyedValueStateUpgrade(
158158
ValueStateDescriptor<TestType> initialAccessDescriptor,
159159
ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore)
160160
throws Exception {
@@ -271,7 +271,7 @@ void testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
271271
e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
272272
}
273273

274-
private void testKeyedListStateUpgrade(
274+
protected void testKeyedListStateUpgrade(
275275
ListStateDescriptor<TestType> initialAccessDescriptor,
276276
ListStateDescriptor<TestType> newAccessDescriptorAfterRestore)
277277
throws Exception {
@@ -431,7 +431,7 @@ private Iterator<Map.Entry<Integer, TestType>> sortedIterator(
431431
return set.iterator();
432432
}
433433

434-
private void testKeyedMapStateUpgrade(
434+
protected void testKeyedMapStateUpgrade(
435435
MapStateDescriptor<Integer, TestType> initialAccessDescriptor,
436436
MapStateDescriptor<Integer, TestType> newAccessDescriptorAfterRestore)
437437
throws Exception {
@@ -1203,7 +1203,7 @@ void testStateMigrationAfterChangingTTL() throws Exception {
12031203
}
12041204

12051205
@TestTemplate
1206-
void testStateMigrationAfterChangingTTLFromEnablingToDisabling() {
1206+
protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception {
12071207
final String stateName = "test-ttl";
12081208

12091209
ValueStateDescriptor<TestType> initialAccessDescriptor =
@@ -1219,17 +1219,16 @@ void testStateMigrationAfterChangingTTLFromEnablingToDisabling() {
12191219
testKeyedValueStateUpgrade(
12201220
initialAccessDescriptor, newAccessDescriptorAfterRestore))
12211221
.satisfiesAnyOf(
1222-
e -> assertThat(e).isInstanceOf(IllegalStateException.class),
1223-
e -> assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
1222+
e -> assertThat(e).isInstanceOf(StateMigrationException.class),
1223+
e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
12241224
}
12251225

12261226
@TestTemplate
1227-
void testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
1227+
protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception {
12281228
final String stateName = "test-ttl";
12291229

12301230
ValueStateDescriptor<TestType> initialAccessDescriptor =
12311231
new ValueStateDescriptor<>(stateName, new TestType.V1TestTypeSerializer());
1232-
12331232
ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore =
12341233
new ValueStateDescriptor<>(stateName, new TestType.V2TestTypeSerializer());
12351234
newAccessDescriptorAfterRestore.enableTimeToLive(

flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private CheckpointStreamFactory createCheckpointStreamFactory() {
8989
}
9090
}
9191

92-
void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
92+
public void createAndRestoreKeyedStateBackend(KeyedStateHandle snapshot) {
9393
createAndRestoreKeyedStateBackend(NUMBER_OF_KEY_GROUPS, snapshot);
9494
}
9595

@@ -144,7 +144,7 @@ private void disposeKeyedStateBackend() {
144144
}
145145
}
146146

147-
KeyedStateHandle takeSnapshot() throws Exception {
147+
public KeyedStateHandle takeSnapshot() throws Exception {
148148
SnapshotResult<KeyedStateHandle> snapshotResult = triggerSnapshot().get();
149149
KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
150150
if (jobManagerOwnedSnapshot != null) {
@@ -171,7 +171,7 @@ public void setCurrentKey(String key) {
171171
}
172172

173173
@SuppressWarnings("unchecked")
174-
<N, S extends State, V> S createState(
174+
public <N, S extends State, V> S createState(
175175
StateDescriptor<S, V> stateDescriptor,
176176
@SuppressWarnings("SameParameterValue") N defaultNamespace)
177177
throws Exception {

flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public boolean isSavepoint() {
109109
return (TtlMergingStateTestContext<?, UV, ?>) ctx;
110110
}
111111

112-
private void initTest() throws Exception {
112+
protected void initTest() throws Exception {
113113
initTest(
114114
StateTtlConfig.UpdateType.OnCreateAndWrite,
115115
StateTtlConfig.StateVisibility.NeverReturnExpired);
@@ -496,7 +496,7 @@ void testSnapshotChangeRestore() throws Exception {
496496
}
497497

498498
@TestTemplate
499-
void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
499+
protected void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
500500
assumeThat(this).isNotInstanceOf(MockTtlStateTest.class);
501501

502502
initTest();

flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendMigrationTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,18 @@ protected boolean supportsKeySerializerCheck() {
6464
// TODO support checking key serializer
6565
return false;
6666
}
67+
68+
@Override
69+
protected void testStateMigrationAfterChangingTTLFromDisablingToEnabling() throws Exception {
70+
if (!(this.delegatedStateBackendSupplier.get() instanceof EmbeddedRocksDBStateBackend)) {
71+
super.testStateMigrationAfterChangingTTLFromDisablingToEnabling();
72+
}
73+
}
74+
75+
@Override
76+
protected void testStateMigrationAfterChangingTTLFromEnablingToDisabling() throws Exception {
77+
if (!(this.delegatedStateBackendSupplier.get() instanceof EmbeddedRocksDBStateBackend)) {
78+
super.testStateMigrationAfterChangingTTLFromEnablingToDisabling();
79+
}
80+
}
6781
}

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/AbstractRocksDBState.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
2727
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
2828
import org.apache.flink.runtime.state.internal.InternalKvState;
29+
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
30+
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
2931
import org.apache.flink.util.FlinkRuntimeException;
3032
import org.apache.flink.util.Preconditions;
3133
import org.apache.flink.util.StateMigrationException;
@@ -36,6 +38,8 @@
3638

3739
import java.io.IOException;
3840

41+
import static org.apache.flink.util.Preconditions.checkArgument;
42+
3943
/**
4044
* Base class for {@link State} implementations that store state in a RocksDB database.
4145
*
@@ -184,12 +188,21 @@ public void migrateSerializedValue(
184188
DataInputDeserializer serializedOldValueInput,
185189
DataOutputSerializer serializedMigratedValueOutput,
186190
TypeSerializer<V> priorSerializer,
187-
TypeSerializer<V> newSerializer)
191+
TypeSerializer<V> newSerializer,
192+
TtlTimeProvider ttlTimeProvider)
188193
throws StateMigrationException {
194+
checkArgument(priorSerializer instanceof TtlAwareSerializer);
195+
checkArgument(newSerializer instanceof TtlAwareSerializer);
196+
TtlAwareSerializer<V, ?> ttlAwarePriorSerializer =
197+
(TtlAwareSerializer<V, ?>) priorSerializer;
198+
TtlAwareSerializer<V, ?> ttlAwareNewSerializer = (TtlAwareSerializer<V, ?>) newSerializer;
189199

190200
try {
191-
V value = priorSerializer.deserialize(serializedOldValueInput);
192-
newSerializer.serialize(value, serializedMigratedValueOutput);
201+
ttlAwareNewSerializer.migrateValueFromPriorSerializer(
202+
ttlAwarePriorSerializer,
203+
() -> ttlAwarePriorSerializer.deserialize(serializedOldValueInput),
204+
serializedMigratedValueOutput,
205+
ttlTimeProvider);
193206
} catch (Exception e) {
194207
throw new StateMigrationException("Error while trying to migrate RocksDB state.", e);
195208
}
@@ -233,6 +246,11 @@ protected AbstractRocksDBState<K, N, V> setDefaultValue(V defaultValue) {
233246
return this;
234247
}
235248

249+
protected AbstractRocksDBState<K, N, V> setColumnFamily(ColumnFamilyHandle columnFamily) {
250+
this.columnFamily = columnFamily;
251+
return this;
252+
}
253+
236254
@Override
237255
public StateIncrementalVisitor<K, N, V> getStateIncrementalVisitor(
238256
int recommendedMaxNumberOfReturnedRecords) {

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBAggregatingState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ static <K, N, SV, S extends State, IS extends S> IS update(
192192
((AggregatingStateDescriptor) stateDesc).getAggregateFunction())
193193
.setNamespaceSerializer(registerResult.f1.getNamespaceSerializer())
194194
.setValueSerializer(registerResult.f1.getStateSerializer())
195-
.setDefaultValue(stateDesc.getDefaultValue());
195+
.setDefaultValue(stateDesc.getDefaultValue())
196+
.setColumnFamily(registerResult.f0);
196197
}
197198
}

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
5757
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
5858
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
59+
import org.apache.flink.runtime.state.ttl.TtlAwareSerializer;
5960
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
6061
import org.apache.flink.state.rocksdb.iterator.RocksStateKeysAndNamespaceIterator;
6162
import org.apache.flink.state.rocksdb.iterator.RocksStateKeysIterator;
@@ -88,6 +89,7 @@
8889
import java.io.File;
8990
import java.io.IOException;
9091
import java.util.ArrayList;
92+
import java.util.Collections;
9193
import java.util.HashMap;
9294
import java.util.LinkedHashMap;
9395
import java.util.List;
@@ -687,20 +689,20 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {
687689
RegisteredKeyValueStateBackendMetaInfo<N, SV> castedMetaInfo =
688690
(RegisteredKeyValueStateBackendMetaInfo<N, SV>) oldStateInfo.metaInfo;
689691

690-
newMetaInfo =
691-
updateRestoredStateMetaInfo(
692-
Tuple2.of(oldStateInfo.columnFamilyHandle, castedMetaInfo),
693-
stateDesc,
694-
namespaceSerializer,
695-
stateSerializer);
696-
692+
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>
693+
newRocksDBState =
694+
updateRestoredStateMetaInfo(
695+
Tuple2.of(oldStateInfo.columnFamilyHandle, castedMetaInfo),
696+
stateDesc,
697+
namespaceSerializer,
698+
stateSerializer);
699+
newMetaInfo = newRocksDBState.f1;
697700
newMetaInfo =
698701
allowFutureMetadataUpdates
699702
? newMetaInfo.withSerializerUpgradesAllowed()
700703
: newMetaInfo;
701704

702-
newRocksStateInfo =
703-
new RocksDbKvStateInfo(oldStateInfo.columnFamilyHandle, newMetaInfo);
705+
newRocksStateInfo = new RocksDbKvStateInfo(newRocksDBState.f0, newMetaInfo);
704706
kvStateInformation.put(stateDesc.getName(), newRocksStateInfo);
705707
sstMergeManager.register(newRocksStateInfo);
706708
} else {
@@ -744,13 +746,16 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {
744746
}
745747

746748
private <N, S extends State, SV>
747-
RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo(
748-
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>
749-
oldStateInfo,
750-
StateDescriptor<S, SV> stateDesc,
751-
TypeSerializer<N> namespaceSerializer,
752-
TypeSerializer<SV> stateSerializer)
753-
throws Exception {
749+
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>
750+
updateRestoredStateMetaInfo(
751+
Tuple2<
752+
ColumnFamilyHandle,
753+
RegisteredKeyValueStateBackendMetaInfo<N, SV>>
754+
oldStateInfo,
755+
StateDescriptor<S, SV> stateDesc,
756+
TypeSerializer<N> namespaceSerializer,
757+
TypeSerializer<SV> stateSerializer)
758+
throws Exception {
754759

755760
RegisteredKeyValueStateBackendMetaInfo<N, SV> restoredKvStateMetaInfo = oldStateInfo.f1;
756761

@@ -789,7 +794,7 @@ RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo(
789794
+ ").");
790795
}
791796

792-
return restoredKvStateMetaInfo;
797+
return oldStateInfo;
793798
}
794799

795800
/**
@@ -855,14 +860,43 @@ private <N, S extends State, SV> void migrateStateValues(
855860

856861
DataInputDeserializer serializedValueInput = new DataInputDeserializer();
857862
DataOutputSerializer migratedSerializedValueOutput = new DataOutputSerializer(512);
863+
864+
// Check if this is ttl state migration
865+
TtlAwareSerializer<SV, ?> previousTtlAwareSerializer =
866+
(TtlAwareSerializer<SV, ?>)
867+
TtlAwareSerializer.wrapTtlAwareSerializer(
868+
stateMetaInfo.f1.getPreviousStateSerializer());
869+
TtlAwareSerializer<SV, ?> currentTtlAwareSerializer =
870+
(TtlAwareSerializer<SV, ?>)
871+
TtlAwareSerializer.wrapTtlAwareSerializer(
872+
stateMetaInfo.f1.getStateSerializer());
873+
874+
if (TtlAwareSerializer.needTtlStateMigration(
875+
previousTtlAwareSerializer, currentTtlAwareSerializer)) {
876+
// By performing ttl state migration, we need to recreate column family to
877+
// enable/disable ttl compaction filter factory.
878+
db.dropColumnFamily(stateMetaInfo.f0);
879+
stateMetaInfo.f0 =
880+
RocksDBOperationUtils.createColumnFamily(
881+
RocksDBOperationUtils.createColumnFamilyDescriptor(
882+
stateMetaInfo.f1,
883+
columnFamilyOptionsFactory,
884+
ttlCompactFiltersManager,
885+
optionsContainer.getWriteBufferManagerCapacity()),
886+
db,
887+
Collections.emptyList(),
888+
ICloseableRegistry.NO_OP);
889+
}
890+
858891
while (iterator.isValid()) {
859892
serializedValueInput.setBuffer(iterator.value());
860893

861894
rocksDBState.migrateSerializedValue(
862895
serializedValueInput,
863896
migratedSerializedValueOutput,
864-
stateMetaInfo.f1.getPreviousStateSerializer(),
865-
stateMetaInfo.f1.getStateSerializer());
897+
previousTtlAwareSerializer,
898+
currentTtlAwareSerializer,
899+
this.ttlTimeProvider);
866900

867901
batchWriter.put(
868902
stateMetaInfo.f0,

0 commit comments

Comments
 (0)