Skip to content

Commit aa8d93e

Browse files
jiexraymasteryhx
authored andcommitted
[FLINK-32975][state] Enhance equals() for all MapState's iterator (apache#23305)
1 parent 360b97a commit aa8d93e

File tree

3 files changed

+56
-0
lines changed

3 files changed

+56
-0
lines changed

flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java

+35
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797

9898
import java.io.IOException;
9999
import java.io.Serializable;
100+
import java.util.AbstractMap;
100101
import java.util.ArrayList;
101102
import java.util.Arrays;
102103
import java.util.Collections;
@@ -3591,6 +3592,40 @@ public void testMapStateIteratorArbitraryAccess() throws Exception {
35913592
}
35923593
}
35933594

3595+
/** Verify that iterator of {@link MapState} compares on the content. */
3596+
@Test
3597+
public void testMapStateEntryCompare() throws Exception {
3598+
MapStateDescriptor<Integer, Long> stateDesc1 =
3599+
new MapStateDescriptor<>("map-state-1", Integer.class, Long.class);
3600+
MapStateDescriptor<Integer, Long> stateDesc2 =
3601+
new MapStateDescriptor<>("map-state-2", Integer.class, Long.class);
3602+
3603+
CheckpointableKeyedStateBackend<Integer> backend =
3604+
createKeyedBackend(IntSerializer.INSTANCE);
3605+
3606+
try {
3607+
MapState<Integer, Long> state1 =
3608+
backend.getPartitionedState(
3609+
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDesc1);
3610+
MapState<Integer, Long> state2 =
3611+
backend.getPartitionedState(
3612+
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDesc2);
3613+
3614+
Map.Entry<Integer, Long> expectedEntry = new AbstractMap.SimpleEntry<>(0, 10L);
3615+
backend.setCurrentKey(1);
3616+
state1.put(expectedEntry.getKey(), expectedEntry.getValue());
3617+
state2.put(expectedEntry.getKey(), expectedEntry.getValue());
3618+
3619+
assertEquals(state1.entries().iterator().next(), expectedEntry);
3620+
assertEquals(state2.entries().iterator().next(), expectedEntry);
3621+
3622+
assertEquals(state1.entries().iterator().next(), state2.entries().iterator().next());
3623+
} finally {
3624+
IOUtils.closeQuietly(backend);
3625+
backend.dispose();
3626+
}
3627+
}
3628+
35943629
/** Verify that {@link ValueStateDescriptor} allows {@code null} as default. */
35953630
@Test
35963631
public void testValueStateNullAsDefaultValue() throws Exception {

flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java

+11
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.io.IOException;
3636
import java.util.Iterator;
3737
import java.util.Map;
38+
import java.util.Objects;
3839

3940
/**
4041
* Delegated partitioned {@link MapState} that forwards changes to {@link StateChange} upon {@link
@@ -83,6 +84,16 @@ public UV setValue(UV value) {
8384
}
8485
return oldValue;
8586
}
87+
88+
@Override
89+
public boolean equals(Object o) {
90+
if (!(o instanceof Map.Entry)) {
91+
return false;
92+
}
93+
Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
94+
return Objects.equals(entry.getKey(), e.getKey())
95+
&& Objects.equals(entry.getValue(), e.getValue());
96+
}
8697
};
8798
}
8899

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java

+10
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.Arrays;
5252
import java.util.Iterator;
5353
import java.util.Map;
54+
import java.util.Objects;
5455

5556
import static org.apache.flink.util.Preconditions.checkArgument;
5657

@@ -537,6 +538,15 @@ public UV setValue(UV value) {
537538

538539
return oldValue;
539540
}
541+
542+
@Override
543+
public boolean equals(Object o) {
544+
if (!(o instanceof Map.Entry)) {
545+
return false;
546+
}
547+
Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
548+
return Objects.equals(getKey(), e.getKey()) && Objects.equals(getValue(), e.getValue());
549+
}
540550
}
541551

542552
/** An auxiliary utility to scan all entries under the given key. */

0 commit comments

Comments
 (0)