diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainIngestionStorageMetadataService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainIngestionStorageMetadataService.java index 20d45a52e1d..066440b232c 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainIngestionStorageMetadataService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/ingestion/main/MainIngestionStorageMetadataService.java @@ -76,9 +76,10 @@ public void stopInner() throws Exception { } @Override - public void computeStoreVersionState(String topicName, Function mapFunction) - throws VeniceException { - topicStoreVersionStateMap.compute(topicName, (key, previousStoreVersionState) -> { + public StoreVersionState computeStoreVersionState( + String topicName, + Function mapFunction) throws VeniceException { + return topicStoreVersionStateMap.compute(topicName, (key, previousStoreVersionState) -> { StoreVersionState newStoreVersionState = mapFunction.apply(previousStoreVersionState); storeVersionStateSyncer.accept(topicName, newStoreVersionState); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 55c0fed9959..e8200bcdf28 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2935,47 +2935,72 @@ private void processStartOfPush( } else { sorted = startOfPush.sorted; } - beginBatchWrite(partition, sorted, partitionConsumptionState); - partitionConsumptionState.setStartOfPushTimestamp(startOfPushKME.producerMetadata.messageTimestamp); - ingestionNotificationDispatcher.reportStarted(partitionConsumptionState); - storageMetadataService.computeStoreVersionState(kafkaVersionTopic, previousStoreVersionState -> { - if (previousStoreVersionState == null) { - // No other partition of the same topic has started yet, let's initialize the StoreVersionState - StoreVersionState newStoreVersionState = new StoreVersionState(); - newStoreVersionState.sorted = sorted; - newStoreVersionState.chunked = startOfPush.chunked; - newStoreVersionState.compressionStrategy = startOfPush.compressionStrategy; - newStoreVersionState.compressionDictionary = startOfPush.compressionDictionary; - if (startOfPush.compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT.getValue()) { - if (startOfPush.compressionDictionary == null) { + StoreVersionState persistedStoreVersionState = + storageMetadataService.computeStoreVersionState(kafkaVersionTopic, previousStoreVersionState -> { + if (previousStoreVersionState == null) { + // No other partition of the same topic has started yet, let's initialize the StoreVersionState + StoreVersionState newStoreVersionState = new StoreVersionState(); + newStoreVersionState.sorted = sorted; + newStoreVersionState.chunked = startOfPush.chunked; + newStoreVersionState.compressionStrategy = startOfPush.compressionStrategy; + newStoreVersionState.compressionDictionary = startOfPush.compressionDictionary; + if (startOfPush.compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT.getValue()) { + if (startOfPush.compressionDictionary == null) { + throw new VeniceException( + "compression Dictionary should not be empty if CompressionStrategy is ZSTD_WITH_DICT"); + } + } + newStoreVersionState.batchConflictResolutionPolicy = startOfPush.timestampPolicy; + newStoreVersionState.startOfPushTimestamp = startOfPushKME.producerMetadata.messageTimestamp; + + LOGGER.info( + "Persisted {} for the first time following a SOP for topic {} with sorted: {}.", + StoreVersionState.class.getSimpleName(), + kafkaVersionTopic, + newStoreVersionState.sorted); + return newStoreVersionState; + } else if (previousStoreVersionState.chunked != startOfPush.chunked) { + // Something very wrong is going on ): ... throw new VeniceException( - "compression Dictionary should not be empty if CompressionStrategy is ZSTD_WITH_DICT"); + "Unexpected: received multiple " + ControlMessageType.START_OF_PUSH.name() + + " control messages with inconsistent 'chunked' fields within the same topic!"); + } else if (previousStoreVersionState.sorted != sorted) { + if (!isHybridMode()) { + // Something very wrong is going on ): ... + throw new VeniceException( + "Unexpected: received multiple " + ControlMessageType.START_OF_PUSH.name() + + " control messages with inconsistent 'sorted' fields within the same topic!" + + " And persisted sorted value: " + previousStoreVersionState.sorted + + " is different from the current one: " + sorted); + } + /** + * Because of the blob-db integration, `SIT` will forcibly set `sorted` to `false` for hybrid stores (check the + * above javadoc) and inconsistent `sorted` can happen during the rolling out/rolling back blob-db features. + * Here is one case how it can happen during the rolling out of blob-db: + * 1. P1 processes `SOP` with `sorted=true` and persist it in `StoreVersionState`. + * 2. P2 hasn't started processing `SOP` yet. + * 3. Restart the cluster and roll out blob-db feature. + * 4. when P2 processes `SOP` with `sorted=true`, it will forcibly set `sorted=false`, and it will be different + * from the previously persisted `StoreVersionState`. + * 5. This is fine as long as we just follow the previously persisted `StoreVersionState`. + * 6. Here are the reasons why step 5 is true: + * a. If the input for a given partition is truly sorted, it can always be ingested as unsorted data. + * b. If the input for a given partition is not sorted, the underlying `SSTFileWriter` will throw exception + * when we try to ingest it as sorted data. + */ + LOGGER.warn( + "Store version state for topic {} has already been initialized with a different value of 'sorted': {}", + kafkaVersionTopic, + previousStoreVersionState.sorted); } - } - newStoreVersionState.batchConflictResolutionPolicy = startOfPush.timestampPolicy; - newStoreVersionState.startOfPushTimestamp = startOfPushKME.producerMetadata.messageTimestamp; + // No need to mutate it, so we return it as is + return previousStoreVersionState; + }); - LOGGER.info( - "Persisted {} for the first time following a SOP for topic {}.", - StoreVersionState.class.getSimpleName(), - kafkaVersionTopic); - return newStoreVersionState; - } else if (previousStoreVersionState.sorted != sorted) { - // Something very wrong is going on ): ... - throw new VeniceException( - "Unexpected: received multiple " + ControlMessageType.START_OF_PUSH.name() - + " control messages with inconsistent 'sorted' fields within the same topic!"); - } else if (previousStoreVersionState.chunked != startOfPush.chunked) { - // Something very wrong is going on ): ... - throw new VeniceException( - "Unexpected: received multiple " + ControlMessageType.START_OF_PUSH.name() - + " control messages with inconsistent 'chunked' fields within the same topic!"); - } else { - // No need to mutate it, so we return it as is - return previousStoreVersionState; - } - }); + ingestionNotificationDispatcher.reportStarted(partitionConsumptionState); + beginBatchWrite(partition, persistedStoreVersionState.sorted, partitionConsumptionState); + partitionConsumptionState.setStartOfPushTimestamp(startOfPushKME.producerMetadata.messageTimestamp); } protected void processEndOfPush( diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageEngineMetadataService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageEngineMetadataService.java index 6c8974f7481..dc8ef699ad0 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageEngineMetadataService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageEngineMetadataService.java @@ -62,13 +62,15 @@ public void stopInner() throws Exception { } @Override - public void computeStoreVersionState(String topicName, Function mapFunction) - throws VeniceException { + public StoreVersionState computeStoreVersionState( + String topicName, + Function mapFunction) throws VeniceException { AbstractStorageEngine engine = getStorageEngineOrThrow(topicName); synchronized (engine) { StoreVersionState previousSVS = engine.getStoreVersionState(); StoreVersionState newSVS = mapFunction.apply(previousSVS); engine.putStoreVersionState(newSVS); + return newSVS; } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageMetadataService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageMetadataService.java index 369733faf82..619eb0cec24 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageMetadataService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/storage/StorageMetadataService.java @@ -11,8 +11,9 @@ * This is a superset of the OffsetManager APIs, which also provide functions for storing store-version level state. */ public interface StorageMetadataService extends OffsetManager { - void computeStoreVersionState(String topicName, Function mapFunction) - throws VeniceException; + StoreVersionState computeStoreVersionState( + String topicName, + Function mapFunction) throws VeniceException; /** * This will clear all metadata, including store-version state and partition states, tied to {@param topicName}. diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 9a4093012e8..a37c9eb4ae2 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -259,6 +259,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -1305,6 +1306,7 @@ void setStoreVersionStateSupplier(StoreVersionState svs) { setupMockAbstractStorageEngine(metadataPartition); doReturn(svs).when(mockAbstractStorageEngine).getStoreVersionState(); doReturn(svs).when(mockStorageMetadataService).getStoreVersionState(topic); + doReturn(svs).when(mockStorageMetadataService).computeStoreVersionState(eq(topic), any()); } void setStoreVersionStateSupplier(boolean sorted) { @@ -2499,8 +2501,23 @@ public void testVeniceMessagesProcessingWithSortedInput(AAConfig aaConfig) throw }, aaConfig); } - @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) - public void testVeniceMessagesProcessingWithSortedInputWithBlobMode(boolean blobMode) throws Exception { + @Test(dataProvider = "Boolean-and-Optional-Boolean", dataProviderClass = DataProviderUtils.class) + public void testVeniceMessagesProcessingWithSortedInputWithBlobMode(boolean blobMode, Boolean sortedFlagInSVS) + throws Exception { + if (sortedFlagInSVS != null) { + setStoreVersionStateSupplier(sortedFlagInSVS); + } else { + doReturn(null).when(mockStorageMetadataService).getStoreVersionState(any()); + } + doAnswer((Answer) invocationOnMock -> { + String topicName = invocationOnMock.getArgument(0, String.class); + Function mapFunction = invocationOnMock.getArgument(1, Function.class); + StoreVersionState updatedStoreVersionState = + mapFunction.apply(mockStorageMetadataService.getStoreVersionState(topicName)); + doReturn(updatedStoreVersionState).when(mockStorageMetadataService).getStoreVersionState(any()); + return updatedStoreVersionState; + }).when(mockStorageMetadataService).computeStoreVersionState(anyString(), any()); + localVeniceWriter.broadcastStartOfPush(true, new HashMap<>()); PubSubProduceResult putMetadata = (PubSubProduceResult) localVeniceWriter.put(putKeyFoo, putValue, EXISTING_SCHEMA_ID).get(); @@ -2522,7 +2539,13 @@ public void testVeniceMessagesProcessingWithSortedInputWithBlobMode(boolean blob .put(topic, PARTITION_FOO, getOffsetRecord(putMetadata.getOffset() - 1)); // Check database mode switches from deferred-write to transactional after EOP control message StoragePartitionConfig deferredWritePartitionConfig = new StoragePartitionConfig(topic, PARTITION_FOO); - deferredWritePartitionConfig.setDeferredWrite(!blobMode); + boolean deferredWrite; + if (!blobMode) { + deferredWrite = sortedFlagInSVS != null ? sortedFlagInSVS : true; + } else { + deferredWrite = sortedFlagInSVS != null ? sortedFlagInSVS : false; + } + deferredWritePartitionConfig.setDeferredWrite(deferredWrite); verify(mockAbstractStorageEngine, times(1)) .beginBatchWrite(eq(deferredWritePartitionConfig), any(), eq(Optional.empty())); StoragePartitionConfig transactionalPartitionConfig = new StoragePartitionConfig(topic, PARTITION_FOO); diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/offsets/DeepCopyStorageMetadataService.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/offsets/DeepCopyStorageMetadataService.java index a1ea0e46e41..48538ecfca8 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/offsets/DeepCopyStorageMetadataService.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/offsets/DeepCopyStorageMetadataService.java @@ -28,9 +28,10 @@ public DeepCopyStorageMetadataService(StorageMetadataService delegate) { } @Override - public void computeStoreVersionState(String topicName, Function mapFunction) - throws VeniceException { - delegateStorageMetadataService.computeStoreVersionState(topicName, previousStoreVersionState -> { + public StoreVersionState computeStoreVersionState( + String topicName, + Function mapFunction) throws VeniceException { + return delegateStorageMetadataService.computeStoreVersionState(topicName, previousStoreVersionState -> { StoreVersionState newSVS = mapFunction.apply( previousStoreVersionState == null ? null diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/offsets/InMemoryStorageMetadataService.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/offsets/InMemoryStorageMetadataService.java index 943d1f62155..0b7ff817fc4 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/offsets/InMemoryStorageMetadataService.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/offsets/InMemoryStorageMetadataService.java @@ -19,10 +19,12 @@ public class InMemoryStorageMetadataService extends InMemoryOffsetManager implem private final ConcurrentMap topicToStoreVersionStateMap = new ConcurrentHashMap<>(); @Override - public void computeStoreVersionState(String topicName, Function mapFunction) - throws VeniceException { + public StoreVersionState computeStoreVersionState( + String topicName, + Function mapFunction) throws VeniceException { LOGGER.info("InMemoryStorageMetadataService.compute(StoreVersionState) called for topicName: {}", topicName); - topicToStoreVersionStateMap.compute(topicName, (s, storeVersionState) -> mapFunction.apply(storeVersionState)); + return topicToStoreVersionStateMap + .compute(topicName, (s, storeVersionState) -> mapFunction.apply(storeVersionState)); } @Override diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/DataProviderUtils.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/DataProviderUtils.java index d32252d4459..f20bccd6644 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/DataProviderUtils.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/DataProviderUtils.java @@ -25,6 +25,7 @@ public class DataProviderUtils { public static final Object[] BOOLEAN = { false, true }; public static final Object[] BOOLEAN_FALSE = { false }; + public static final Object[] OPTIONAL_BOOLEAN = { false, true, null }; public static final Object[] COMPRESSION_STRATEGIES = { NO_OP, GZIP, ZSTD_WITH_DICT }; public static final Object[] PARTITION_COUNTS = { 1, 2, 3, 4, 8, 10, 16, 19, 92, 128 }; @@ -49,6 +50,11 @@ public static Object[][] twoBoolean() { return allPermutationGenerator(BOOLEAN, BOOLEAN); } + @DataProvider(name = "Boolean-and-Optional-Boolean") + public static Object[][] booleanAndOptionalBoolean() { + return allPermutationGenerator(BOOLEAN, OPTIONAL_BOOLEAN); + } + @DataProvider(name = "Three-True-and-False") public static Object[][] threeBoolean() { return allPermutationGenerator(BOOLEAN, BOOLEAN, BOOLEAN);