Skip to content

Commit 26786db

Browse files
committed
NoSQL: fix concurrency issue in IndexImpl + add tests + adopt JMH
1 parent 8f9245b commit 26786db

File tree

4 files changed

+130
-21
lines changed

4 files changed

+130
-21
lines changed

components/persistence/index/src/jmh/java/org/apache/polaris/persistence/indexes/RandomUuidKeyIndexImplBench.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
@Warmup(iterations = 3, time = 2000, timeUnit = MILLISECONDS)
4545
@Measurement(iterations = 5, time = 1000, timeUnit = MILLISECONDS)
4646
@Fork(1)
47-
@Threads(1) // Do NOT use multiple threads StoreIndex is NOT thread safe!
47+
@Threads(4)
4848
@BenchmarkMode(Mode.AverageTime)
4949
@OutputTimeUnit(MICROSECONDS)
5050
public class RandomUuidKeyIndexImplBench {

components/persistence/index/src/jmh/java/org/apache/polaris/persistence/indexes/RealisticKeyIndexImplBench.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
@Warmup(iterations = 3, time = 2000, timeUnit = MILLISECONDS)
4949
@Measurement(iterations = 5, time = 1000, timeUnit = MILLISECONDS)
5050
@Fork(1)
51-
@Threads(1) // Do NOT use multiple threads StoreIndex is NOT thread safe!
51+
@Threads(4)
5252
@BenchmarkMode(Mode.AverageTime)
5353
@OutputTimeUnit(MICROSECONDS)
5454
public class RealisticKeyIndexImplBench {

components/persistence/index/src/main/java/org/apache/polaris/persistence/indexes/IndexImpl.java

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ public int estimatedSerializedSize() {
516516

517517
target = target.flip();
518518
} else {
519-
target = serialized.duplicate().position(0).limit(originalSerializedSize);
519+
target = serializedThreadSafe().position(0).limit(originalSerializedSize);
520520
}
521521

522522
return target;
@@ -603,8 +603,8 @@ private IndexImpl(ByteBuffer serialized, IndexValueSerializer<V> ser) {
603603

604604
this.elements = elements;
605605
this.serializer = ser;
606-
this.serialized = serialized.duplicate().clear();
607606
this.originalSerializedSize = serialized.position();
607+
this.serialized = serialized.duplicate().clear();
608608
}
609609

610610
/**
@@ -708,14 +708,13 @@ ByteBuffer serializeKey(ByteBuffer keySerBuffer, ByteBuffer previousKey) {
708708
previousKey.limit(limitSave).position(0);
709709
}
710710

711-
IndexImpl<V> index = IndexImpl.this;
712-
var serialized = requireNonNull(index.serialized);
713-
return keySerBuffer.put(serialized.limit(valueOffset).position(keyOffset)).flip();
711+
return keySerBuffer
712+
.put(serializedNotThreadSafe().limit(valueOffset).position(keyOffset))
713+
.flip();
714714
}
715715

716716
private IndexKey materializeKey() {
717-
IndexImpl<V> index = IndexImpl.this;
718-
var serialized = requireNonNull(index.serialized);
717+
var serialized = serializedThreadSafe();
719718

720719
var suffix = serialized.limit(valueOffset).position(keyOffset);
721720

@@ -761,19 +760,9 @@ private ByteBuffer prefixKey(ByteBuffer serialized, LazyIndexElement me, int rem
761760
return keyBuffer;
762761
}
763762

764-
private ByteBuffer serializedForContent() {
765-
IndexImpl<V> index = IndexImpl.this;
766-
var serialized = requireNonNull(index.serialized);
767-
return serialized.limit(endOffset).position(valueOffset);
768-
}
769-
770-
private V materializeContent() {
771-
return serializer.deserialize(serializedForContent());
772-
}
773-
774763
@Override
775764
public void serializeContent(IndexValueSerializer<V> ser, ByteBuffer target) {
776-
target.put(serializedForContent());
765+
target.put(serializedNotThreadSafe().limit(endOffset).position(valueOffset));
777766
}
778767

779768
@Override
@@ -800,7 +789,10 @@ public V getValue() {
800789
var c = content;
801790
if (c == null) {
802791
if (!hasContent) {
803-
c = content = materializeContent();
792+
c =
793+
content =
794+
serializer.deserialize(
795+
serializedThreadSafe().limit(endOffset).position(valueOffset));
804796
hasContent = true;
805797
}
806798
}
@@ -842,6 +834,32 @@ static ByteBuffer newKeyBuffer() {
842834
return ByteBuffer.allocate(MAX_KEY_BYTES);
843835
}
844836

837+
/**
838+
* Non-thread-safe variant to retrieve {@link #serialized}. Used in these scenarios, which are not
839+
* thread safe by contract:
840+
*
841+
* <ul>
842+
* <li>serializing a <em>modified</em>
843+
* </ul>
844+
*/
845+
private ByteBuffer serializedNotThreadSafe() {
846+
return requireNonNull(serialized);
847+
}
848+
849+
/**
850+
* Thread-safe variant to retrieve {@link #serialized}. Used in these scenarios, which are not
851+
* thread safe by contract:
852+
*
853+
* <ul>
854+
* <li>serializing a non-modified index
855+
* <li>lazy materialization of a key (deserialization)
856+
* <li>lazy materialization of a value (deserialization)
857+
* </ul>
858+
*/
859+
private ByteBuffer serializedThreadSafe() {
860+
return requireNonNull(serialized).duplicate();
861+
}
862+
845863
private static <V> int search(List<IndexElement<V>> e, @Nonnull IndexKey key) {
846864
// Need a StoreIndexElement for the sake of 'binarySearch()' (the content value isn't used)
847865
return search(e, indexElement(key, ""));

components/persistence/index/src/test/java/org/apache/polaris/persistence/indexes/TestIndexImpl.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@
4343
import java.util.HexFormat;
4444
import java.util.List;
4545
import java.util.Map;
46+
import java.util.concurrent.CompletableFuture;
47+
import java.util.concurrent.CountDownLatch;
48+
import java.util.concurrent.Executors;
49+
import java.util.concurrent.Semaphore;
50+
import java.util.concurrent.TimeUnit;
51+
import java.util.concurrent.atomic.AtomicBoolean;
52+
import java.util.function.Consumer;
4653
import java.util.function.Function;
4754
import java.util.stream.IntStream;
4855
import java.util.stream.Stream;
@@ -785,4 +792,88 @@ public void stateRelated() {
785792
soft.assertThat(index.isMutable()).isTrue();
786793
soft.assertThatCode(() -> index.divide(3)).doesNotThrowAnyException();
787794
}
795+
796+
// The following multithreaded "tests" are only there to verify that no ByteBuffer related
797+
// exceptions are thrown.
798+
799+
@Test
800+
public void multithreadedGetKey() throws Exception {
801+
multithreaded(KeyIndexTestSet::randomGetKey, true);
802+
}
803+
804+
@Test
805+
public void multithreadedSerialize() throws Exception {
806+
multithreaded(KeyIndexTestSet::serialize, false);
807+
}
808+
809+
@Test
810+
public void multithreadedFirst() throws Exception {
811+
multithreaded(ts -> ts.keyIndex().first(), false);
812+
}
813+
814+
@Test
815+
public void multithreadedLast() throws Exception {
816+
multithreaded(ts -> ts.keyIndex().last(), false);
817+
}
818+
819+
@Test
820+
public void multithreadedKeys() throws Exception {
821+
multithreaded(ts -> ts.keyIndex().asKeyList(), false);
822+
}
823+
824+
@Test
825+
public void multithreadedElementIterator() throws Exception {
826+
multithreaded(ts -> ts.keyIndex().elementIterator().forEachRemaining(el -> {}), false);
827+
}
828+
829+
@Test
830+
public void multithreadedIterator() throws Exception {
831+
multithreaded(ts -> ts.keyIndex().iterator().forEachRemaining(el -> {}), false);
832+
}
833+
834+
void multithreaded(Consumer<KeyIndexTestSet<ObjRef>> worker, boolean longTest) throws Exception {
835+
var indexTestSet =
836+
KeyIndexTestSet.<ObjRef>newGenerator()
837+
.keySet(ImmutableRandomUuidKeySet.builder().numKeys(100_000).build())
838+
.elementSupplier(key -> indexElement(key, Util.randomObjId()))
839+
.elementSerializer(OBJ_REF_SERIALIZER)
840+
.build()
841+
.generateIndexTestSet();
842+
843+
var threads = Runtime.getRuntime().availableProcessors();
844+
845+
try (var executor = Executors.newFixedThreadPool(threads)) {
846+
var latch = new CountDownLatch(threads);
847+
var start = new Semaphore(0);
848+
var stop = new AtomicBoolean();
849+
850+
var futures =
851+
IntStream.range(0, threads)
852+
.mapToObj(
853+
i ->
854+
CompletableFuture.runAsync(
855+
() -> {
856+
latch.countDown();
857+
try {
858+
start.acquire();
859+
} catch (InterruptedException e) {
860+
throw new RuntimeException(e);
861+
}
862+
while (!stop.get()) {
863+
worker.accept(indexTestSet);
864+
}
865+
},
866+
executor))
867+
.toArray(CompletableFuture[]::new);
868+
869+
latch.await();
870+
start.release(threads);
871+
872+
Thread.sleep(longTest ? TimeUnit.SECONDS.toMillis(3) : 500L);
873+
874+
stop.set(true);
875+
876+
CompletableFuture.allOf(futures).get();
877+
}
878+
}
788879
}

0 commit comments

Comments
 (0)