Skip to content

Commit 1b40264

Browse files
authored
[da-vinci][server] Added flag to enable RocksDB blob file for blocked based format (#1359)
* [da-vinci][server] Added flag to enable RocksDB blob file for blocked based format This PR adds options to enable BlobDB feature in RocksDB to reduce write amplification for large value stores. Currently, these options are only valid for block-based format. New configs: rocksdb.blob.files.enabled : default false rocksdb.min.blob.size.in.bytes : default 4KB rocksdb.blob.file.size.in.bytes : default 256MB rocksdb.blob.garbage.collection.age.cutoff : default 0.25 rocksdb.blob.garbage.collection.force.threshold : default 0.8 This feature is not perfect, as the main goal is to reduce the write amplification, but space amplification will increase and we can tune the above params to find the right balance. * Added some tests
1 parent 9f852b0 commit 1b40264

File tree

5 files changed

+144
-14
lines changed

5 files changed

+144
-14
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/RocksDBMemoryStats.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,11 @@ public class RocksDBMemoryStats extends AbstractVeniceStats {
5555
"rocksdb.actual-delayed-write-rate",
5656
"rocksdb.block-cache-capacity",
5757
"rocksdb.block-cache-pinned-usage",
58-
"rocksdb.block-cache-usage");
58+
"rocksdb.block-cache-usage",
59+
"rocksdb.num-blob-files",
60+
"rocksdb.total-blob-file-size",
61+
"rocksdb.live-blob-file-size",
62+
"rocksdb.live-blob-file-garbage-size");
5963

6064
// metrics emitted on a per instance basis need only be collected once, not aggregated
6165
private static final Set<String> INSTANCE_METRIC_DOMAINS = Collections.unmodifiableSet(

clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBServerConfig.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
import com.linkedin.venice.exceptions.VeniceException;
44
import com.linkedin.venice.utils.VeniceProperties;
55
import java.util.Arrays;
6+
import org.apache.logging.log4j.LogManager;
7+
import org.apache.logging.log4j.Logger;
68
import org.rocksdb.CompactionStyle;
79
import org.rocksdb.CompressionType;
810

911

1012
public class RocksDBServerConfig {
13+
private static final Logger LOGGER = LogManager.getLogger(RocksDBServerConfig.class);
1114
/**
1215
* Ability to use direct IO for disk reads, might yield better performance on Azure disks.
1316
* Also makes caching behavior more consistent, by limiting the caching to only RocksDB.
@@ -219,6 +222,18 @@ public class RocksDBServerConfig {
219222
public static final String ROCKSDB_MAX_LOG_FILE_SIZE = "rocksdb.max.log.file.size";
220223
public static final String RECORD_TRANSFORMER_VALUE_SCHEMA = "record.transformer.value.schema";
221224

225+
/**
226+
* Check this page to find more details:
227+
* https://github.com/facebook/rocksdb/wiki/BlobDB
228+
*/
229+
public static final String ROCKSDB_BLOB_FILES_ENABLED = "rocksdb.blob.files.enabled";
230+
public static final String ROCKSDB_MIN_BLOB_SIZE_IN_BYTES = "rocksdb.min.blob.size.in.bytes";
231+
public static final String ROCKSDB_BLOB_FILE_SIZE_IN_BYTES = "rocksdb.blob.file.size.in.bytes";
232+
public static final String ROCKSDB_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF = "rocksdb.blob.garbage.collection.age.cutoff";
233+
public static final String ROCKSDB_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD =
234+
"rocksdb.blob.garbage.collection.force.threshold";
235+
public static final String ROCKSDB_BLOB_FILE_STARTING_LEVEL = "rocksdb.blob.file.starting.level";
236+
222237
private final boolean rocksDBUseDirectReads;
223238

224239
private final int rocksDBEnvFlushPoolSize;
@@ -286,6 +301,13 @@ public class RocksDBServerConfig {
286301
private final long maxLogFileSize;
287302
private final String transformerValueSchema;
288303

304+
private final boolean blobFilesEnabled;
305+
private final long minBlobSizeInBytes;
306+
private final long blobFileSizeInBytes;
307+
private final double blobGarbageCollectionAgeCutOff;
308+
private final double blobGarbageCollectionForceThreshold;
309+
private final int blobFileStartingLevel;
310+
289311
public RocksDBServerConfig(VeniceProperties props) {
290312
// Do not use Direct IO for reads by default
291313
this.rocksDBUseDirectReads = props.getBoolean(ROCKSDB_OPTIONS_USE_DIRECT_READS, false);
@@ -407,6 +429,21 @@ public RocksDBServerConfig(VeniceProperties props) {
407429
this.maxLogFileNum = props.getInt(ROCKSDB_MAX_LOG_FILE_NUM, 3);
408430
this.maxLogFileSize = props.getSizeInBytes(ROCKSDB_MAX_LOG_FILE_SIZE, 10 * 1024 * 1024); // 10MB;
409431
this.transformerValueSchema = props.getString(RECORD_TRANSFORMER_VALUE_SCHEMA, "null");
432+
433+
/**
434+
* Check this page to find more details:
435+
* https://github.com/facebook/rocksdb/wiki/BlobDB
436+
*/
437+
this.blobFilesEnabled = props.getBoolean(ROCKSDB_BLOB_FILES_ENABLED, false);
438+
if (this.blobFilesEnabled) {
439+
LOGGER.info("RocksDB Blob files feature is enabled");
440+
}
441+
this.minBlobSizeInBytes = props.getSizeInBytes(ROCKSDB_MIN_BLOB_SIZE_IN_BYTES, 4 * 1024); // default: 4KB
442+
this.blobFileSizeInBytes = props.getSizeInBytes(ROCKSDB_BLOB_FILE_SIZE_IN_BYTES, 256 * 1024 * 1024); // default:
443+
// 256MB
444+
this.blobGarbageCollectionAgeCutOff = props.getDouble(ROCKSDB_BLOB_GARBAGE_COLLECTION_AGE_CUTOFF, 0.25);
445+
this.blobGarbageCollectionForceThreshold = props.getDouble(ROCKSDB_BLOB_GARBAGE_COLLECTION_FORCE_THRESHOLD, 0.8);
446+
this.blobFileStartingLevel = props.getInt(ROCKSDB_BLOB_FILE_STARTING_LEVEL, 0);
410447
}
411448

412449
public int getLevel0FileNumCompactionTriggerWriteOnlyVersion() {
@@ -616,4 +653,27 @@ public String getTransformerValueSchema() {
616653
return transformerValueSchema;
617654
}
618655

656+
public boolean isBlobFilesEnabled() {
657+
return blobFilesEnabled;
658+
}
659+
660+
public long getMinBlobSizeInBytes() {
661+
return minBlobSizeInBytes;
662+
}
663+
664+
public long getBlobFileSizeInBytes() {
665+
return blobFileSizeInBytes;
666+
}
667+
668+
public double getBlobGarbageCollectionAgeCutOff() {
669+
return blobGarbageCollectionAgeCutOff;
670+
}
671+
672+
public double getBlobGarbageCollectionForceThreshold() {
673+
return blobGarbageCollectionForceThreshold;
674+
}
675+
676+
public int getBlobFileStartingLevel() {
677+
return blobFileStartingLevel;
678+
}
619679
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,19 @@ protected Options getStoreOptions(StoragePartitionConfig storagePartitionConfig,
401401
tableConfig.setCacheIndexAndFilterBlocks(rocksDBServerConfig.isRocksDBSetCacheIndexAndFilterBlocks());
402402
tableConfig.setFormatVersion(rocksDBServerConfig.getBlockBaseFormatVersion());
403403
options.setTableFormatConfig(tableConfig);
404+
405+
/**
406+
* Only enable blob files for block-based format.
407+
*/
408+
if (rocksDBServerConfig.isBlobFilesEnabled()) {
409+
options.setEnableBlobFiles(true);
410+
options.setEnableBlobGarbageCollection(true);
411+
options.setMinBlobSize(rocksDBServerConfig.getMinBlobSizeInBytes());
412+
options.setBlobFileSize(rocksDBServerConfig.getBlobFileSizeInBytes());
413+
options.setBlobGarbageCollectionAgeCutoff(rocksDBServerConfig.getBlobGarbageCollectionAgeCutOff());
414+
options.setBlobGarbageCollectionForceThreshold(rocksDBServerConfig.getBlobGarbageCollectionForceThreshold());
415+
options.setBlobFileStartingLevel(rocksDBServerConfig.getBlobFileStartingLevel());
416+
}
404417
}
405418

406419
if (storagePartitionConfig.isWriteOnlyConfig()) {

clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ public class RocksDBStoragePartitionTest {
5858
private static final String VALUE_PREFIX = "value_";
5959
private static final RocksDBThrottler ROCKSDB_THROTTLER = new RocksDBThrottler(3);
6060

61+
private static final String BLOB_GARBAGE_METRIC = "rocksdb.live-blob-file-garbage-size";
62+
private static final List<String> BLOB_METRIC_LIST = Arrays.asList(
63+
"rocksdb.num-blob-files",
64+
"rocksdb.total-blob-file-size",
65+
"rocksdb.live-blob-file-size",
66+
BLOB_GARBAGE_METRIC);
67+
6168
private Map<String, String> generateInput(int recordCnt, boolean sorted, int padLength) {
6269
Map<String, String> records;
6370
if (sorted) {
@@ -98,15 +105,16 @@ private void removeDir(String path) {
98105

99106
@DataProvider(name = "testIngestionDataProvider")
100107
public Object[][] testIngestionDataProvider() {
101-
return new Object[][] { { true, false, false, true }, // Sorted input without interruption, with verifyChecksum
102-
{ true, false, false, false }, // Sorted input without interruption, without verifyChecksum
103-
{ true, true, true, false }, // Sorted input with interruption, without verifyChecksum
104-
{ true, true, false, false }, // Sorted input with storage node re-boot, without verifyChecksum
105-
{ true, true, true, true }, // Sorted input with interruption, with verifyChecksum
106-
{ true, true, false, true }, // Sorted input with storage node re-boot, with verifyChecksum
107-
{ false, false, false, false }, // Unsorted input without interruption, without verifyChecksum
108-
{ false, true, false, false }, // Unsorted input with interruption, without verifyChecksum
109-
{ false, true, true, false } // Unsorted input with storage node re-boot, without verifyChecksum
108+
return new Object[][] { { true, false, false, true, true }, // Sorted input without interruption, with
109+
// verifyChecksum
110+
{ true, false, false, false, false }, // Sorted input without interruption, without verifyChecksum
111+
{ true, true, true, false, true }, // Sorted input with interruption, without verifyChecksum
112+
{ true, true, false, false, false }, // Sorted input with storage node re-boot, without verifyChecksum
113+
{ true, true, true, true, true }, // Sorted input with interruption, with verifyChecksum
114+
{ true, true, false, true, false }, // Sorted input with storage node re-boot, with verifyChecksum
115+
{ false, false, false, false, true }, // Unsorted input without interruption, without verifyChecksum
116+
{ false, true, false, false, true }, // Unsorted input with interruption, without verifyChecksum
117+
{ false, true, true, false, true } // Unsorted input with storage node re-boot, without verifyChecksum
110118
};
111119
}
112120

@@ -115,7 +123,8 @@ public void testIngestion(
115123
boolean sorted,
116124
boolean interrupted,
117125
boolean reopenDatabaseDuringInterruption,
118-
boolean verifyChecksum) {
126+
boolean verifyChecksum,
127+
boolean enableBlobFile) {
119128
CheckSum runningChecksum = CheckSum.getInstance(CheckSumType.MD5);
120129
String storeName = Version.composeKafkaTopic(Utils.getUniqueString("test_store"), 1);
121130
String storeDir = getTempDatabaseDir(storeName);
@@ -124,8 +133,27 @@ public void testIngestion(
124133
partitionConfig.setDeferredWrite(sorted);
125134
Options options = new Options();
126135
options.setCreateIfMissing(true);
127-
Map<String, String> inputRecords = generateInput(1010, sorted, 0);
128-
VeniceProperties veniceServerProperties = AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB);
136+
137+
if (enableBlobFile) {
138+
options.setEnableBlobFiles(true);
139+
options.setMinBlobSize(1);
140+
options.setBlobFileSize(2 * 1024 * 1024);
141+
options.setEnableBlobGarbageCollection(true);
142+
options.setBlobGarbageCollectionAgeCutoff(0.25);
143+
options.setBlobGarbageCollectionForceThreshold(0.8);
144+
options.setBlobFileStartingLevel(0);
145+
}
146+
147+
Map<String, String> inputRecords = generateInput(101000, sorted, 0);
148+
Properties extraProps = new Properties();
149+
if (enableBlobFile) {
150+
extraProps.put(ROCKSDB_BLOB_FILES_ENABLED, "true");
151+
extraProps.put(ROCKSDB_MIN_BLOB_SIZE_IN_BYTES, "1");
152+
extraProps.put(ROCKSDB_BLOB_FILE_SIZE_IN_BYTES, "2097152");
153+
extraProps.put(ROCKSDB_BLOB_FILE_STARTING_LEVEL, "0");
154+
}
155+
VeniceProperties veniceServerProperties =
156+
AbstractStorageEngineTest.getServerProperties(PersistenceType.ROCKS_DB, extraProps);
129157
RocksDBServerConfig rocksDBServerConfig = new RocksDBServerConfig(veniceServerProperties);
130158

131159
VeniceServerConfig serverConfig = new VeniceServerConfig(veniceServerProperties);
@@ -228,6 +256,27 @@ public void testIngestion(
228256
Assert.assertEquals(storagePartition.get(entry.getKey().getBytes()), entry.getValue().getBytes());
229257
}
230258

259+
if (sorted) {
260+
if (enableBlobFile) {
261+
// Verify some Blob file related metrics
262+
for (String metric: BLOB_METRIC_LIST) {
263+
Assert.assertEquals(storagePartition.getRocksDBStatValue(metric), 0);
264+
}
265+
}
266+
} else {
267+
if (enableBlobFile) {
268+
// Verify some Blob file related metrics
269+
for (String metric: BLOB_METRIC_LIST) {
270+
if (!metric.equals(BLOB_GARBAGE_METRIC)) {
271+
Assert.assertTrue(storagePartition.getRocksDBStatValue(metric) > 0);
272+
} else {
273+
// No garbage so far.
274+
Assert.assertEquals(storagePartition.getRocksDBStatValue(metric), 0);
275+
}
276+
}
277+
}
278+
}
279+
231280
// Verify current ingestion mode is in deferred-write mode
232281
assertTrue(storagePartition.verifyConfig(partitionConfig));
233282

@@ -436,7 +485,8 @@ public void testIngestionWithClockCache(
436485
boolean sorted,
437486
boolean interrupted,
438487
boolean reopenDatabaseDuringInterruption,
439-
boolean verifyChecksum) {
488+
boolean verifyChecksum,
489+
boolean ignored) {
440490
CheckSum runningChecksum = CheckSum.getInstance(CheckSumType.MD5);
441491
String storeName = Version.composeKafkaTopic(Utils.getUniqueString("test_store"), 1);
442492
String storeDir = getTempDatabaseDir(storeName);

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybrid.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.venice.endToEnd;
22

3+
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_BLOB_FILES_ENABLED;
34
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
45
import static com.linkedin.venice.ConfigKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS;
56
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
@@ -1716,6 +1717,8 @@ private static VeniceClusterWrapper setUpCluster(boolean enablePartitionWiseShar
17161717
KafkaConsumerService.ConsumerAssignmentStrategy.PARTITION_WISE_SHARED_CONSUMER_ASSIGNMENT_STRATEGY.name());
17171718
}
17181719
cluster.addVeniceServer(new Properties(), serverProperties);
1720+
// Enable blob files in one server
1721+
serverProperties.setProperty(ROCKSDB_BLOB_FILES_ENABLED, "true");
17191722
cluster.addVeniceServer(new Properties(), serverProperties);
17201723

17211724
return cluster;

0 commit comments

Comments
 (0)