Skip to content

Commit

Permalink
Revert "[server][da-vinci] Bumped RocksDB dep and adopt multiget asyn…
Browse files Browse the repository at this point in the history
…c io by default (#950) (#1063)

Revert "[server][da-vinci] Bumped RocksDB dep and adopt multiget async io by default (#950)"

This reverts commit 02591a1.
  • Loading branch information
gaojieliu authored Jul 15, 2024
1 parent 129dbfa commit f9af2bf
Show file tree
Hide file tree
Showing 9 changed files with 6 additions and 98 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ext.libraries = [
pulsarIoCommon: "${pulsarGroup}:pulsar-io-common:${pulsarVersion}",
r2: "com.linkedin.pegasus:r2:${pegasusVersion}",
restliCommon: "com.linkedin.pegasus:restli-common:${pegasusVersion}",
rocksdbjni: 'org.rocksdb:rocksdbjni:8.11.4',
rocksdbjni: 'org.rocksdb:rocksdbjni:8.8.1',
samzaApi: 'org.apache.samza:samza-api:1.5.1',
slf4j: 'org.slf4j:slf4j:1.7.36',
slf4jApi: 'org.slf4j:slf4j-api:1.7.36',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.writer.VeniceWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;

Expand Down Expand Up @@ -341,20 +339,15 @@ private static <VALUE, CHUNKS_CONTAINER> VALUE getFromStorage(
CHUNKS_CONTAINER assembledValueContainer = adapter.constructChunksContainer(chunkedValueManifest);
int actualSize = 0;

List<byte[]> keys = new ArrayList<>(chunkedValueManifest.keysWithChunkIdSuffix.size());
for (int chunkIndex = 0; chunkIndex < chunkedValueManifest.keysWithChunkIdSuffix.size(); chunkIndex++) {
keys.add(chunkedValueManifest.keysWithChunkIdSuffix.get(chunkIndex).array());
}
List<byte[]> values =
isRmdValue ? store.multiGetReplicationMetadata(partition, keys) : store.multiGet(partition, keys);

for (int chunkIndex = 0; chunkIndex < chunkedValueManifest.keysWithChunkIdSuffix.size(); chunkIndex++) {
// N.B.: This is done sequentially. Originally, each chunk was fetched concurrently in the same executor
// as the main queries, but this might cause deadlocks, so we are now doing it sequentially. If we want to
// optimize large value retrieval in the future, it's unclear whether the concurrent retrieval approach
// is optimal (as opposed to streaming the response out incrementally, for example). Since this is a
// premature optimization, we are not addressing it right now.
byte[] valueChunk = values.get(chunkIndex);
byte[] chunkKey = chunkedValueManifest.keysWithChunkIdSuffix.get(chunkIndex).array();
byte[] valueChunk =
isRmdValue ? store.getReplicationMetadata(partition, chunkKey) : store.get(partition, chunkKey);

if (valueChunk == null) {
throw new VeniceException("Chunk not found in " + getExceptionMessageDetails(store, partition, chunkIndex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,13 +502,6 @@ public byte[] get(int partitionId, byte[] key) throws VeniceException {
});
}

public List<byte[]> multiGet(int partitionId, List<byte[]> keys) throws VeniceException {
return executeWithSafeGuard(partitionId, () -> {
AbstractStoragePartition partition = getPartitionOrThrow(partitionId);
return partition.multiGet(keys);
});
}

public ByteBuffer get(int partitionId, byte[] key, ByteBuffer valueToBePopulated) throws VeniceException {
return executeWithSafeGuard(partitionId, () -> {
AbstractStoragePartition partition = getPartitionOrThrow(partitionId);
Expand Down Expand Up @@ -552,13 +545,6 @@ public byte[] getReplicationMetadata(int partitionId, byte[] key) {
});
}

public List<byte[]> multiGetReplicationMetadata(int partitionId, List<byte[]> keys) {
return executeWithSafeGuard(partitionId, () -> {
AbstractStoragePartition partition = getPartitionOrThrow(partitionId);
return partition.multiGetReplicationMetadata(keys);
});
}

/**
* Put the offset associated with the partitionId into the metadata partition.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import com.linkedin.davinci.store.rocksdb.ReplicationMetadataRocksDBStoragePartition;
import com.linkedin.venice.exceptions.VeniceUnsupportedOperationException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
Expand Down Expand Up @@ -51,12 +49,6 @@ public ByteBuffer get(byte[] key, ByteBuffer valueToBePopulated) {
return ByteBuffer.wrap(get(key));
}

public List<byte[]> multiGet(List<byte[]> keys) {
List<byte[]> values = new ArrayList<>(keys.size());
keys.forEach(key -> values.add(get(key)));
return values;
}

/**
* Get a Value from the partition database
* @param <K> the type for Key
Expand Down Expand Up @@ -171,10 +163,6 @@ public byte[] getReplicationMetadata(byte[] key) {
throw new VeniceUnsupportedOperationException("getReplicationMetadata");
}

public List<byte[]> multiGetReplicationMetadata(List<byte[]> keys) {
throw new VeniceUnsupportedOperationException("multiGetReplicationMetadata");
}

/**
* This API deletes a record from RocksDB but updates the metadata in ByteBuffer format and puts it into RocksDB.
* Only {@link ReplicationMetadataRocksDBStoragePartition} will execute this method,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.ByteUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
Expand Down Expand Up @@ -147,24 +144,6 @@ public byte[] getReplicationMetadata(byte[] key) {
}
}

@Override
public List<byte[]> multiGetReplicationMetadata(List<byte[]> keys) {
readCloseRWLock.readLock().lock();
try {
makeSureRocksDBIsStillOpen();
ColumnFamilyHandle rmdHandle = columnFamilyHandleList.get(REPLICATION_METADATA_COLUMN_FAMILY_INDEX);
List cfHandleList = new ArrayList<>(keys.size());
for (int i = 0; i < keys.size(); ++i) {
cfHandleList.add(rmdHandle);
}
return rocksDB.multiGetAsList(getReadOptionsForMultiGet(), cfHandleList, keys);
} catch (RocksDBException e) {
throw new VeniceException("Failed to get value from RocksDB: " + replicaId, e);
} finally {
readCloseRWLock.readLock().unlock();
}
}

/**
* This API deletes a record from RocksDB but updates the metadata in ByteBuffer format and puts it into RocksDB.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,6 @@ public class RocksDBServerConfig {
public static final String ROCKSDB_ATOMIC_FLUSH_ENABLED = "rocksdb.atomic.flush.enabled";
public static final String ROCKSDB_SEPARATE_RMD_CACHE_ENABLED = "rocksdb.separate.rmd.cache.enabled";
public static final String ROCKSDB_BLOCK_BASE_FORMAT_VERSION = "rocksdb.block.base.format.version";
/**
* Whether to enable async io in the read path or not.
* https://rocksdb.org/blog/2022/10/07/asynchronous-io-in-rocksdb.html
*/
public static final String ROCKSDB_READ_ASYNC_IO_ENABLED = "rocksdb.read.async.io.enabled";

public static final String ROCKSDB_MAX_LOG_FILE_NUM = "rocksdb.max.log.file.num";
public static final String ROCKSDB_MAX_LOG_FILE_SIZE = "rocksdb.max.log.file.size";
Expand Down Expand Up @@ -289,7 +284,6 @@ public class RocksDBServerConfig {
private int blockBaseFormatVersion;
private final int maxLogFileNum;
private final long maxLogFileSize;
private final boolean readAsyncIOEanbled;
private final String transformerValueSchema;

public RocksDBServerConfig(VeniceProperties props) {
Expand Down Expand Up @@ -412,7 +406,6 @@ public RocksDBServerConfig(VeniceProperties props) {
*/
this.maxLogFileNum = props.getInt(ROCKSDB_MAX_LOG_FILE_NUM, 3);
this.maxLogFileSize = props.getSizeInBytes(ROCKSDB_MAX_LOG_FILE_SIZE, 10 * 1024 * 1024); // 10MB;
this.readAsyncIOEanbled = props.getBoolean(ROCKSDB_READ_ASYNC_IO_ENABLED, true);
this.transformerValueSchema =
props.containsKey(RECORD_TRANSFORMER_VALUE_SCHEMA) ? props.getString(RECORD_TRANSFORMER_VALUE_SCHEMA) : "null";
}
Expand Down Expand Up @@ -620,10 +613,6 @@ public long getMaxLogFileSize() {
return maxLogFileSize;
}

public boolean isReadAsyncIOEanbled() {
return readAsyncIOEanbled;
}

public String getTransformerValueSchema() {
return transformerValueSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ public class RocksDBStoragePartition extends AbstractStoragePartition {
private static final Logger LOGGER = LogManager.getLogger(RocksDBStoragePartition.class);
private static final String ROCKSDB_ERROR_MESSAGE_FOR_RUNNING_OUT_OF_SPACE_QUOTA = "Max allowed space was reached";
protected static final ReadOptions READ_OPTIONS_DEFAULT = new ReadOptions();
/**
* Async IO will speed up the lookup for multi-get with posix file system.
* https://rocksdb.org/blog/2022/10/07/asynchronous-io-in-rocksdb.html
*/
protected static final ReadOptions READ_OPTIONS_WITH_ASYNC_IO = new ReadOptions().setAsyncIo(true);
static final byte[] REPLICATION_METADATA_COLUMN_FAMILY = "timestamp_metadata".getBytes();

private static final FlushOptions WAIT_FOR_FLUSH_OPTIONS = new FlushOptions().setWaitForFlush(true);
Expand Down Expand Up @@ -604,19 +599,11 @@ public byte[] get(ByteBuffer keyBuffer) {
}
}

protected ReadOptions getReadOptionsForMultiGet() {
if (rocksDBServerConfig.isRocksDBPlainTableFormatEnabled() || !rocksDBServerConfig.isReadAsyncIOEanbled()) {
return READ_OPTIONS_DEFAULT;
}
return READ_OPTIONS_WITH_ASYNC_IO;
}

@Override
public List<byte[]> multiGet(List<byte[]> keys) {
readCloseRWLock.readLock().lock();
try {
makeSureRocksDBIsStillOpen();
return rocksDB.multiGetAsList(getReadOptionsForMultiGet(), keys);
return rocksDB.multiGetAsList(keys);
} catch (RocksDBException e) {
throw new VeniceException("Failed to get value from RocksDB: " + replicaId, e);
} finally {
Expand All @@ -629,8 +616,7 @@ public List<ByteBuffer> multiGet(List<ByteBuffer> keys, List<ByteBuffer> values)

try {
makeSureRocksDBIsStillOpen();
List<ByteBufferGetStatus> statusList = rocksDB.multiGetByteBuffers(getReadOptionsForMultiGet(), keys, values);

List<ByteBufferGetStatus> statusList = rocksDB.multiGetByteBuffers(keys, values);
int keyCnt = keys.size();
int statusCnt = statusList.size();
int valueCnt = values.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -486,12 +484,8 @@ public void testReadingChunkedRmdFromStorage() {
chunkedManifestBytes = ChunkingTestUtils.prependSchemaId(chunkedManifestBytes, manifestSchemaId).array();
byte[] topLevelKey2 = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key2);
byte[] chunkedKey1InKey2 = chunkedKeyWithSuffix1.array();

when(storageEngine.getReplicationMetadata(partition, topLevelKey2)).thenReturn(chunkedManifestBytes);
when(storageEngine.getReplicationMetadata(partition, chunkedKey1InKey2)).thenReturn(chunkedValue1);
List<byte[]> chunkedValues = new ArrayList<>(1);
chunkedValues.add(chunkedValue1);
when(storageEngine.multiGetReplicationMetadata(eq(partition), any())).thenReturn(chunkedValues);
byte[] result2 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(partition, key2, container, 0L);
Assert.assertNotNull(result2);
Assert.assertNotNull(container.getManifest());
Expand Down Expand Up @@ -524,12 +518,9 @@ public void testReadingChunkedRmdFromStorage() {
byte[] topLevelKey3 = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(key3);
byte[] chunkedKey1InKey3 = chunkedKeyWithSuffix1.array();
byte[] chunkedKey2InKey3 = chunkedKeyWithSuffix2.array();

when(storageEngine.getReplicationMetadata(partition, topLevelKey3)).thenReturn(chunkedManifestBytes);
when(storageEngine.getReplicationMetadata(partition, chunkedKey1InKey3)).thenReturn(chunkedValue1);
when(storageEngine.getReplicationMetadata(partition, chunkedKey2InKey3)).thenReturn(chunkedValue2);
when(storageEngine.multiGetReplicationMetadata(eq(partition), any()))
.thenReturn(Arrays.asList(chunkedValue1, chunkedValue2));
byte[] result3 = ingestionTask.getRmdWithValueSchemaByteBufferFromStorage(partition, key3, container, 0L);
Assert.assertNotNull(result3);
Assert.assertNotNull(container.getManifest());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.davinci.storage.chunking;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
Expand All @@ -23,7 +22,6 @@
import com.linkedin.venice.utils.ByteUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -216,8 +214,6 @@ private void runTest(
doReturn(chunk1Bytes).when(storageEngine).get(eq(partition), eq(firstKey));
doReturn(chunk2Bytes).when(storageEngine).get(eq(partition), eq(secondKey));

doReturn(Arrays.asList(chunk1Bytes, chunk2Bytes)).when(storageEngine).multiGet(eq(partition), any());

StoreDeserializerCache storeDeserializerCache = rawBytesStoreDeserializerCache
? RawBytesStoreDeserializerCache.getInstance()
: new AvroStoreDeserializerCache(schemaRepository, storeName, true);
Expand Down

0 comments on commit f9af2bf

Please sign in to comment.