Skip to content

Commit

Permalink
[da-vinci][server] Fixed the race condition during BlobDB rolling out…
Browse files Browse the repository at this point in the history
…/back (#1500) (#1502)

* [da-vinci][server] Fixed the race condition during BlobDB rolling out/back

Because of the blob-db integration, `SIT` will forcibly set `sorted` to `false` for hybrid stores
and inconsistent `sorted` can happen during the rolling out/rolling back blob-db features.
Here is one case how it can happen during the rolling out of blob-db:
1. P1 processes `SOP` with `sorted=true` and persist it in `StoreVersionState`.
2. P2 hasn't started processing `SOP` yet.
3. Restart the cluster and roll out blob-db feature.
4. when P2 processes `SOP` with `sorted=true`, it will forcibly set `sorted=false`, and it will be different
   from the previously persisted `StoreVersionState`.
5. This is fine as long as we just follow the previously persisted `StoreVersionState`.
6. Here are the reasons why step 5 is true:
   a. If the input for a given partition is truly sorted, it can always be ingested as unsorted data.
   b. If the input for a given partition is not sorted, the underlying `SSTFileWriter` will throw exception
      when we try to ingest it as sorted data.

* Fixed comments
  • Loading branch information
gaojieliu authored Feb 5, 2025
1 parent e37204a commit b993ecd
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ public void stopInner() throws Exception {
}

@Override
public void computeStoreVersionState(String topicName, Function<StoreVersionState, StoreVersionState> mapFunction)
throws VeniceException {
topicStoreVersionStateMap.compute(topicName, (key, previousStoreVersionState) -> {
public StoreVersionState computeStoreVersionState(
String topicName,
Function<StoreVersionState, StoreVersionState> mapFunction) throws VeniceException {
return topicStoreVersionStateMap.compute(topicName, (key, previousStoreVersionState) -> {
StoreVersionState newStoreVersionState = mapFunction.apply(previousStoreVersionState);
storeVersionStateSyncer.accept(topicName, newStoreVersionState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2935,47 +2935,72 @@ private void processStartOfPush(
} else {
sorted = startOfPush.sorted;
}
beginBatchWrite(partition, sorted, partitionConsumptionState);
partitionConsumptionState.setStartOfPushTimestamp(startOfPushKME.producerMetadata.messageTimestamp);

ingestionNotificationDispatcher.reportStarted(partitionConsumptionState);
storageMetadataService.computeStoreVersionState(kafkaVersionTopic, previousStoreVersionState -> {
if (previousStoreVersionState == null) {
// No other partition of the same topic has started yet, let's initialize the StoreVersionState
StoreVersionState newStoreVersionState = new StoreVersionState();
newStoreVersionState.sorted = sorted;
newStoreVersionState.chunked = startOfPush.chunked;
newStoreVersionState.compressionStrategy = startOfPush.compressionStrategy;
newStoreVersionState.compressionDictionary = startOfPush.compressionDictionary;
if (startOfPush.compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT.getValue()) {
if (startOfPush.compressionDictionary == null) {
StoreVersionState persistedStoreVersionState =
storageMetadataService.computeStoreVersionState(kafkaVersionTopic, previousStoreVersionState -> {
if (previousStoreVersionState == null) {
// No other partition of the same topic has started yet, let's initialize the StoreVersionState
StoreVersionState newStoreVersionState = new StoreVersionState();
newStoreVersionState.sorted = sorted;
newStoreVersionState.chunked = startOfPush.chunked;
newStoreVersionState.compressionStrategy = startOfPush.compressionStrategy;
newStoreVersionState.compressionDictionary = startOfPush.compressionDictionary;
if (startOfPush.compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT.getValue()) {
if (startOfPush.compressionDictionary == null) {
throw new VeniceException(
"compression Dictionary should not be empty if CompressionStrategy is ZSTD_WITH_DICT");
}
}
newStoreVersionState.batchConflictResolutionPolicy = startOfPush.timestampPolicy;
newStoreVersionState.startOfPushTimestamp = startOfPushKME.producerMetadata.messageTimestamp;

LOGGER.info(
"Persisted {} for the first time following a SOP for topic {} with sorted: {}.",
StoreVersionState.class.getSimpleName(),
kafkaVersionTopic,
newStoreVersionState.sorted);
return newStoreVersionState;
} else if (previousStoreVersionState.chunked != startOfPush.chunked) {
// Something very wrong is going on ): ...
throw new VeniceException(
"compression Dictionary should not be empty if CompressionStrategy is ZSTD_WITH_DICT");
"Unexpected: received multiple " + ControlMessageType.START_OF_PUSH.name()
+ " control messages with inconsistent 'chunked' fields within the same topic!");
} else if (previousStoreVersionState.sorted != sorted) {
if (!isHybridMode()) {
// Something very wrong is going on ): ...
throw new VeniceException(
"Unexpected: received multiple " + ControlMessageType.START_OF_PUSH.name()
+ " control messages with inconsistent 'sorted' fields within the same topic!"
+ " And persisted sorted value: " + previousStoreVersionState.sorted
+ " is different from the current one: " + sorted);
}
/**
* Because of the blob-db integration, `SIT` will forcibly set `sorted` to `false` for hybrid stores (check the
* above javadoc) and inconsistent `sorted` can happen during the rolling out/rolling back blob-db features.
* Here is one case how it can happen during the rolling out of blob-db:
* 1. P1 processes `SOP` with `sorted=true` and persist it in `StoreVersionState`.
* 2. P2 hasn't started processing `SOP` yet.
* 3. Restart the cluster and roll out blob-db feature.
* 4. when P2 processes `SOP` with `sorted=true`, it will forcibly set `sorted=false`, and it will be different
* from the previously persisted `StoreVersionState`.
* 5. This is fine as long as we just follow the previously persisted `StoreVersionState`.
* 6. Here are the reasons why step 5 is true:
* a. If the input for a given partition is truly sorted, it can always be ingested as unsorted data.
* b. If the input for a given partition is not sorted, the underlying `SSTFileWriter` will throw exception
* when we try to ingest it as sorted data.
*/
LOGGER.warn(
"Store version state for topic {} has already been initialized with a different value of 'sorted': {}",
kafkaVersionTopic,
previousStoreVersionState.sorted);
}
}
newStoreVersionState.batchConflictResolutionPolicy = startOfPush.timestampPolicy;
newStoreVersionState.startOfPushTimestamp = startOfPushKME.producerMetadata.messageTimestamp;
// No need to mutate it, so we return it as is
return previousStoreVersionState;
});

LOGGER.info(
"Persisted {} for the first time following a SOP for topic {}.",
StoreVersionState.class.getSimpleName(),
kafkaVersionTopic);
return newStoreVersionState;
} else if (previousStoreVersionState.sorted != sorted) {
// Something very wrong is going on ): ...
throw new VeniceException(
"Unexpected: received multiple " + ControlMessageType.START_OF_PUSH.name()
+ " control messages with inconsistent 'sorted' fields within the same topic!");
} else if (previousStoreVersionState.chunked != startOfPush.chunked) {
// Something very wrong is going on ): ...
throw new VeniceException(
"Unexpected: received multiple " + ControlMessageType.START_OF_PUSH.name()
+ " control messages with inconsistent 'chunked' fields within the same topic!");
} else {
// No need to mutate it, so we return it as is
return previousStoreVersionState;
}
});
ingestionNotificationDispatcher.reportStarted(partitionConsumptionState);
beginBatchWrite(partition, persistedStoreVersionState.sorted, partitionConsumptionState);
partitionConsumptionState.setStartOfPushTimestamp(startOfPushKME.producerMetadata.messageTimestamp);
}

protected void processEndOfPush(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ public void stopInner() throws Exception {
}

@Override
public void computeStoreVersionState(String topicName, Function<StoreVersionState, StoreVersionState> mapFunction)
throws VeniceException {
public StoreVersionState computeStoreVersionState(
String topicName,
Function<StoreVersionState, StoreVersionState> mapFunction) throws VeniceException {
AbstractStorageEngine engine = getStorageEngineOrThrow(topicName);
synchronized (engine) {
StoreVersionState previousSVS = engine.getStoreVersionState();
StoreVersionState newSVS = mapFunction.apply(previousSVS);
engine.putStoreVersionState(newSVS);
return newSVS;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
* This is a superset of the OffsetManager APIs, which also provide functions for storing store-version level state.
*/
public interface StorageMetadataService extends OffsetManager {
void computeStoreVersionState(String topicName, Function<StoreVersionState, StoreVersionState> mapFunction)
throws VeniceException;
StoreVersionState computeStoreVersionState(
String topicName,
Function<StoreVersionState, StoreVersionState> mapFunction) throws VeniceException;

/**
* This will clear all metadata, including store-version state and partition states, tied to {@param topicName}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -1305,6 +1306,7 @@ void setStoreVersionStateSupplier(StoreVersionState svs) {
setupMockAbstractStorageEngine(metadataPartition);
doReturn(svs).when(mockAbstractStorageEngine).getStoreVersionState();
doReturn(svs).when(mockStorageMetadataService).getStoreVersionState(topic);
doReturn(svs).when(mockStorageMetadataService).computeStoreVersionState(eq(topic), any());
}

void setStoreVersionStateSupplier(boolean sorted) {
Expand Down Expand Up @@ -2499,8 +2501,23 @@ public void testVeniceMessagesProcessingWithSortedInput(AAConfig aaConfig) throw
}, aaConfig);
}

@Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
public void testVeniceMessagesProcessingWithSortedInputWithBlobMode(boolean blobMode) throws Exception {
@Test(dataProvider = "Boolean-and-Optional-Boolean", dataProviderClass = DataProviderUtils.class)
public void testVeniceMessagesProcessingWithSortedInputWithBlobMode(boolean blobMode, Boolean sortedFlagInSVS)
throws Exception {
if (sortedFlagInSVS != null) {
setStoreVersionStateSupplier(sortedFlagInSVS);
} else {
doReturn(null).when(mockStorageMetadataService).getStoreVersionState(any());
}
doAnswer((Answer<StoreVersionState>) invocationOnMock -> {
String topicName = invocationOnMock.getArgument(0, String.class);
Function<StoreVersionState, StoreVersionState> mapFunction = invocationOnMock.getArgument(1, Function.class);
StoreVersionState updatedStoreVersionState =
mapFunction.apply(mockStorageMetadataService.getStoreVersionState(topicName));
doReturn(updatedStoreVersionState).when(mockStorageMetadataService).getStoreVersionState(any());
return updatedStoreVersionState;
}).when(mockStorageMetadataService).computeStoreVersionState(anyString(), any());

localVeniceWriter.broadcastStartOfPush(true, new HashMap<>());
PubSubProduceResult putMetadata =
(PubSubProduceResult) localVeniceWriter.put(putKeyFoo, putValue, EXISTING_SCHEMA_ID).get();
Expand All @@ -2522,7 +2539,13 @@ public void testVeniceMessagesProcessingWithSortedInputWithBlobMode(boolean blob
.put(topic, PARTITION_FOO, getOffsetRecord(putMetadata.getOffset() - 1));
// Check database mode switches from deferred-write to transactional after EOP control message
StoragePartitionConfig deferredWritePartitionConfig = new StoragePartitionConfig(topic, PARTITION_FOO);
deferredWritePartitionConfig.setDeferredWrite(!blobMode);
boolean deferredWrite;
if (!blobMode) {
deferredWrite = sortedFlagInSVS != null ? sortedFlagInSVS : true;
} else {
deferredWrite = sortedFlagInSVS != null ? sortedFlagInSVS : false;
}
deferredWritePartitionConfig.setDeferredWrite(deferredWrite);
verify(mockAbstractStorageEngine, times(1))
.beginBatchWrite(eq(deferredWritePartitionConfig), any(), eq(Optional.empty()));
StoragePartitionConfig transactionalPartitionConfig = new StoragePartitionConfig(topic, PARTITION_FOO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ public DeepCopyStorageMetadataService(StorageMetadataService delegate) {
}

@Override
public void computeStoreVersionState(String topicName, Function<StoreVersionState, StoreVersionState> mapFunction)
throws VeniceException {
delegateStorageMetadataService.computeStoreVersionState(topicName, previousStoreVersionState -> {
public StoreVersionState computeStoreVersionState(
String topicName,
Function<StoreVersionState, StoreVersionState> mapFunction) throws VeniceException {
return delegateStorageMetadataService.computeStoreVersionState(topicName, previousStoreVersionState -> {
StoreVersionState newSVS = mapFunction.apply(
previousStoreVersionState == null
? null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ public class InMemoryStorageMetadataService extends InMemoryOffsetManager implem
private final ConcurrentMap<String, StoreVersionState> topicToStoreVersionStateMap = new ConcurrentHashMap<>();

@Override
public void computeStoreVersionState(String topicName, Function<StoreVersionState, StoreVersionState> mapFunction)
throws VeniceException {
public StoreVersionState computeStoreVersionState(
String topicName,
Function<StoreVersionState, StoreVersionState> mapFunction) throws VeniceException {
LOGGER.info("InMemoryStorageMetadataService.compute(StoreVersionState) called for topicName: {}", topicName);
topicToStoreVersionStateMap.compute(topicName, (s, storeVersionState) -> mapFunction.apply(storeVersionState));
return topicToStoreVersionStateMap
.compute(topicName, (s, storeVersionState) -> mapFunction.apply(storeVersionState));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public class DataProviderUtils {
public static final Object[] BOOLEAN = { false, true };
public static final Object[] BOOLEAN_FALSE = { false };
public static final Object[] OPTIONAL_BOOLEAN = { false, true, null };
public static final Object[] COMPRESSION_STRATEGIES = { NO_OP, GZIP, ZSTD_WITH_DICT };
public static final Object[] PARTITION_COUNTS = { 1, 2, 3, 4, 8, 10, 16, 19, 92, 128 };

Expand All @@ -49,6 +50,11 @@ public static Object[][] twoBoolean() {
return allPermutationGenerator(BOOLEAN, BOOLEAN);
}

@DataProvider(name = "Boolean-and-Optional-Boolean")
public static Object[][] booleanAndOptionalBoolean() {
return allPermutationGenerator(BOOLEAN, OPTIONAL_BOOLEAN);
}

@DataProvider(name = "Three-True-and-False")
public static Object[][] threeBoolean() {
return allPermutationGenerator(BOOLEAN, BOOLEAN, BOOLEAN);
Expand Down

0 comments on commit b993ecd

Please sign in to comment.