diff --git a/build.gradle b/build.gradle index 8a633fd78c4..f7d72789599 100644 --- a/build.gradle +++ b/build.gradle @@ -118,7 +118,7 @@ ext.libraries = [ pulsarIoCommon: "${pulsarGroup}:pulsar-io-common:${pulsarVersion}", r2: "com.linkedin.pegasus:r2:${pegasusVersion}", restliCommon: "com.linkedin.pegasus:restli-common:${pegasusVersion}", - rocksdbjni: 'org.rocksdb:rocksdbjni:8.11.4', + rocksdbjni: 'org.rocksdb:rocksdbjni:8.8.1', samzaApi: 'org.apache.samza:samza-api:1.5.1', slf4j: 'org.slf4j:slf4j:1.7.36', slf4jApi: 'org.slf4j:slf4j-api:1.7.36', diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java index 3481ddc68aa..ad7c865a7a6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/chunking/ChunkingUtils.java @@ -19,8 +19,6 @@ import com.linkedin.venice.utils.LatencyUtils; import com.linkedin.venice.writer.VeniceWriter; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; @@ -341,20 +339,15 @@ private static VALUE getFromStorage( CHUNKS_CONTAINER assembledValueContainer = adapter.constructChunksContainer(chunkedValueManifest); int actualSize = 0; - List keys = new ArrayList<>(chunkedValueManifest.keysWithChunkIdSuffix.size()); - for (int chunkIndex = 0; chunkIndex < chunkedValueManifest.keysWithChunkIdSuffix.size(); chunkIndex++) { - keys.add(chunkedValueManifest.keysWithChunkIdSuffix.get(chunkIndex).array()); - } - List values = - isRmdValue ? store.multiGetReplicationMetadata(partition, keys) : store.multiGet(partition, keys); - for (int chunkIndex = 0; chunkIndex < chunkedValueManifest.keysWithChunkIdSuffix.size(); chunkIndex++) { // N.B.: This is done sequentially. Originally, each chunk was fetched concurrently in the same executor // as the main queries, but this might cause deadlocks, so we are now doing it sequentially. If we want to // optimize large value retrieval in the future, it's unclear whether the concurrent retrieval approach // is optimal (as opposed to streaming the response out incrementally, for example). Since this is a // premature optimization, we are not addressing it right now. - byte[] valueChunk = values.get(chunkIndex); + byte[] chunkKey = chunkedValueManifest.keysWithChunkIdSuffix.get(chunkIndex).array(); + byte[] valueChunk = + isRmdValue ? store.getReplicationMetadata(partition, chunkKey) : store.get(partition, chunkKey); if (valueChunk == null) { throw new VeniceException("Chunk not found in " + getExceptionMessageDetails(store, partition, chunkIndex)); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java index 328754111d3..58db4d064dc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStorageEngine.java @@ -502,13 +502,6 @@ public byte[] get(int partitionId, byte[] key) throws VeniceException { }); } - public List multiGet(int partitionId, List keys) throws VeniceException { - return executeWithSafeGuard(partitionId, () -> { - AbstractStoragePartition partition = getPartitionOrThrow(partitionId); - return partition.multiGet(keys); - }); - } - public ByteBuffer get(int partitionId, byte[] key, ByteBuffer valueToBePopulated) throws VeniceException { return executeWithSafeGuard(partitionId, () -> { AbstractStoragePartition partition = getPartitionOrThrow(partitionId); @@ -552,13 +545,6 @@ public byte[] getReplicationMetadata(int partitionId, byte[] key) { }); } - public List multiGetReplicationMetadata(int partitionId, List keys) { - return executeWithSafeGuard(partitionId, () -> { - AbstractStoragePartition partition = getPartitionOrThrow(partitionId); - return partition.multiGetReplicationMetadata(keys); - }); - } - /** * Put the offset associated with the partitionId into the metadata partition. */ diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStoragePartition.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStoragePartition.java index 3e3c95ef001..3d6c410ea57 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStoragePartition.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/AbstractStoragePartition.java @@ -4,8 +4,6 @@ import com.linkedin.davinci.store.rocksdb.ReplicationMetadataRocksDBStoragePartition; import com.linkedin.venice.exceptions.VeniceUnsupportedOperationException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; @@ -51,12 +49,6 @@ public ByteBuffer get(byte[] key, ByteBuffer valueToBePopulated) { return ByteBuffer.wrap(get(key)); } - public List multiGet(List keys) { - List values = new ArrayList<>(keys.size()); - keys.forEach(key -> values.add(get(key))); - return values; - } - /** * Get a Value from the partition database * @param the type for Key @@ -171,10 +163,6 @@ public byte[] getReplicationMetadata(byte[] key) { throw new VeniceUnsupportedOperationException("getReplicationMetadata"); } - public List multiGetReplicationMetadata(List keys) { - throw new VeniceUnsupportedOperationException("multiGetReplicationMetadata"); - } - /** * This API deletes a record from RocksDB but updates the metadata in ByteBuffer format and puts it into RocksDB. * Only {@link ReplicationMetadataRocksDBStoragePartition} will execute this method, diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/ReplicationMetadataRocksDBStoragePartition.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/ReplicationMetadataRocksDBStoragePartition.java index 0c8451153b7..a9023300be8 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/ReplicationMetadataRocksDBStoragePartition.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/ReplicationMetadataRocksDBStoragePartition.java @@ -10,15 +10,12 @@ import com.linkedin.venice.store.rocksdb.RocksDBUtils; import com.linkedin.venice.utils.ByteUtils; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.WriteBatch; @@ -147,24 +144,6 @@ public byte[] getReplicationMetadata(byte[] key) { } } - @Override - public List multiGetReplicationMetadata(List keys) { - readCloseRWLock.readLock().lock(); - try { - makeSureRocksDBIsStillOpen(); - ColumnFamilyHandle rmdHandle = columnFamilyHandleList.get(REPLICATION_METADATA_COLUMN_FAMILY_INDEX); - List cfHandleList = new ArrayList<>(keys.size()); - for (int i = 0; i < keys.size(); ++i) { - cfHandleList.add(rmdHandle); - } - return rocksDB.multiGetAsList(getReadOptionsForMultiGet(), cfHandleList, keys); - } catch (RocksDBException e) { - throw new VeniceException("Failed to get value from RocksDB: " + replicaId, e); - } finally { - readCloseRWLock.readLock().unlock(); - } - } - /** * This API deletes a record from RocksDB but updates the metadata in ByteBuffer format and puts it into RocksDB. */ diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBServerConfig.java index 1a8533a4c6d..109d95dc45b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBServerConfig.java @@ -214,11 +214,6 @@ public class RocksDBServerConfig { public static final String ROCKSDB_ATOMIC_FLUSH_ENABLED = "rocksdb.atomic.flush.enabled"; public static final String ROCKSDB_SEPARATE_RMD_CACHE_ENABLED = "rocksdb.separate.rmd.cache.enabled"; public static final String ROCKSDB_BLOCK_BASE_FORMAT_VERSION = "rocksdb.block.base.format.version"; - /** - * Whether to enable async io in the read path or not. - * https://rocksdb.org/blog/2022/10/07/asynchronous-io-in-rocksdb.html - */ - public static final String ROCKSDB_READ_ASYNC_IO_ENABLED = "rocksdb.read.async.io.enabled"; public static final String ROCKSDB_MAX_LOG_FILE_NUM = "rocksdb.max.log.file.num"; public static final String ROCKSDB_MAX_LOG_FILE_SIZE = "rocksdb.max.log.file.size"; @@ -289,7 +284,6 @@ public class RocksDBServerConfig { private int blockBaseFormatVersion; private final int maxLogFileNum; private final long maxLogFileSize; - private final boolean readAsyncIOEanbled; private final String transformerValueSchema; public RocksDBServerConfig(VeniceProperties props) { @@ -412,7 +406,6 @@ public RocksDBServerConfig(VeniceProperties props) { */ this.maxLogFileNum = props.getInt(ROCKSDB_MAX_LOG_FILE_NUM, 3); this.maxLogFileSize = props.getSizeInBytes(ROCKSDB_MAX_LOG_FILE_SIZE, 10 * 1024 * 1024); // 10MB; - this.readAsyncIOEanbled = props.getBoolean(ROCKSDB_READ_ASYNC_IO_ENABLED, true); this.transformerValueSchema = props.containsKey(RECORD_TRANSFORMER_VALUE_SCHEMA) ? props.getString(RECORD_TRANSFORMER_VALUE_SCHEMA) : "null"; } @@ -620,10 +613,6 @@ public long getMaxLogFileSize() { return maxLogFileSize; } - public boolean isReadAsyncIOEanbled() { - return readAsyncIOEanbled; - } - public String getTransformerValueSchema() { return transformerValueSchema; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java index 1e137435c62..0d1728b3bce 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java @@ -67,11 +67,6 @@ public class RocksDBStoragePartition extends AbstractStoragePartition { private static final Logger LOGGER = LogManager.getLogger(RocksDBStoragePartition.class); private static final String ROCKSDB_ERROR_MESSAGE_FOR_RUNNING_OUT_OF_SPACE_QUOTA = "Max allowed space was reached"; protected static final ReadOptions READ_OPTIONS_DEFAULT = new ReadOptions(); - /** - * Async IO will speed up the lookup for multi-get with posix file system. - * https://rocksdb.org/blog/2022/10/07/asynchronous-io-in-rocksdb.html - */ - protected static final ReadOptions READ_OPTIONS_WITH_ASYNC_IO = new ReadOptions().setAsyncIo(true); static final byte[] REPLICATION_METADATA_COLUMN_FAMILY = "timestamp_metadata".getBytes(); private static final FlushOptions WAIT_FOR_FLUSH_OPTIONS = new FlushOptions().setWaitForFlush(true); @@ -604,19 +599,11 @@ public byte[] get(ByteBuffer keyBuffer) { } } - protected ReadOptions getReadOptionsForMultiGet() { - if (rocksDBServerConfig.isRocksDBPlainTableFormatEnabled() || !rocksDBServerConfig.isReadAsyncIOEanbled()) { - return READ_OPTIONS_DEFAULT; - } - return READ_OPTIONS_WITH_ASYNC_IO; - } - - @Override public List multiGet(List keys) { readCloseRWLock.readLock().lock(); try { makeSureRocksDBIsStillOpen(); - return rocksDB.multiGetAsList(getReadOptionsForMultiGet(), keys); + return rocksDB.multiGetAsList(keys); } catch (RocksDBException e) { throw new VeniceException("Failed to get value from RocksDB: " + replicaId, e); } finally { @@ -629,8 +616,7 @@ public List multiGet(List keys, List values) try { makeSureRocksDBIsStillOpen(); - List statusList = rocksDB.multiGetByteBuffers(getReadOptionsForMultiGet(), keys, values); - + List statusList = rocksDB.multiGetByteBuffers(keys, values); int keyCnt = keys.size(); int statusCnt = statusList.size(); int valueCnt = values.size(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java index 7d0bd8e760b..7a6015a61bb 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTaskTest.java @@ -92,8 +92,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -486,12 +484,8 @@ public void testReadingChunkedRmdFromStorage() { chunkedManifestBytes = ChunkingTestUtils.prependSchemaId(chunkedManifestBytes, manifestSchemaId).array(); byte[] topLevelKey2 = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key2); byte[] chunkedKey1InKey2 = chunkedKeyWithSuffix1.array(); - when(storageEngine.getReplicationMetadata(partition, topLevelKey2)).thenReturn(chunkedManifestBytes); when(storageEngine.getReplicationMetadata(partition, chunkedKey1InKey2)).thenReturn(chunkedValue1); - List chunkedValues = new ArrayList<>(1); - chunkedValues.add(chunkedValue1); - when(storageEngine.multiGetReplicationMetadata(eq(partition), any())).thenReturn(chunkedValues); byte[] result2 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(partition, key2, container, 0L); Assert.assertNotNull(result2); Assert.assertNotNull(container.getManifest()); @@ -524,12 +518,9 @@ public void testReadingChunkedRmdFromStorage() { byte[] topLevelKey3 = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key3); byte[] chunkedKey1InKey3 = chunkedKeyWithSuffix1.array(); byte[] chunkedKey2InKey3 = chunkedKeyWithSuffix2.array(); - when(storageEngine.getReplicationMetadata(partition, topLevelKey3)).thenReturn(chunkedManifestBytes); when(storageEngine.getReplicationMetadata(partition, chunkedKey1InKey3)).thenReturn(chunkedValue1); when(storageEngine.getReplicationMetadata(partition, chunkedKey2InKey3)).thenReturn(chunkedValue2); - when(storageEngine.multiGetReplicationMetadata(eq(partition), any())) - .thenReturn(Arrays.asList(chunkedValue1, chunkedValue2)); byte[] result3 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(partition, key3, container, 0L); Assert.assertNotNull(result3); Assert.assertNotNull(container.getManifest()); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java index c89945b3508..020e3e29589 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/storage/chunking/ChunkingTest.java @@ -1,6 +1,5 @@ package com.linkedin.davinci.storage.chunking; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; @@ -23,7 +22,6 @@ import com.linkedin.venice.utils.ByteUtils; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; @@ -216,8 +214,6 @@ private void runTest( doReturn(chunk1Bytes).when(storageEngine).get(eq(partition), eq(firstKey)); doReturn(chunk2Bytes).when(storageEngine).get(eq(partition), eq(secondKey)); - doReturn(Arrays.asList(chunk1Bytes, chunk2Bytes)).when(storageEngine).multiGet(eq(partition), any()); - StoreDeserializerCache storeDeserializerCache = rawBytesStoreDeserializerCache ? RawBytesStoreDeserializerCache.getInstance() : new AvroStoreDeserializerCache(schemaRepository, storeName, true);