From 560b804440e2c658cf810738658e3c4e2d275380 Mon Sep 17 00:00:00 2001 From: Manoj Nagarajan Date: Thu, 18 Apr 2024 12:39:28 -0400 Subject: [PATCH] [dvc] Add config to return Da Vinci specific ExecutionStatus for Errors (#947) * Added a config "use.da.vinci.specific.execution.status.for.error" to enable/disable returning newly added Da Vinci specific ExecutionStatus for Errors in Controllers for DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES and in Da Vinci clients for other DVC specific statuses. * It's disabled by default and can be enabled back again after the new code is in all components --- .../com/linkedin/davinci/DaVinciBackend.java | 27 +++++--- .../davinci/config/VeniceServerConfig.java | 8 +++ .../linkedin/davinci/DaVinciBackendTest.java | 69 +++++++++++++------ .../java/com/linkedin/venice/ConfigKeys.java | 7 ++ .../endToEnd/DaVinciClientDiskFullTest.java | 28 ++++++-- .../DaVinciClientMemoryLimitTest.java | 57 ++++++++++----- .../controller/VeniceControllerConfig.java | 9 ++- .../venice/controller/VeniceHelixAdmin.java | 3 +- .../pushmonitor/AbstractPushMonitor.java | 3 +- .../venice/pushmonitor/PushMonitorUtils.java | 20 ++++-- .../pushmonitor/PushStatusCollector.java | 8 ++- .../pushmonitor/PushMonitorUtilsTest.java | 37 ++++++---- .../pushmonitor/PushStatusCollectorTest.java | 9 ++- 13 files changed, 206 insertions(+), 79 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index f386f42a82b..f40a04277ee 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -105,6 +105,7 @@ public class DaVinciBackend implements Closeable { private final Optional cacheBackend; private DaVinciIngestionBackend ingestionBackend; private final AggVersionedStorageEngineStats aggVersionedStorageEngineStats; + private final boolean useDaVinciSpecificExecutionStatusForError; public DaVinciBackend( ClientConfig clientConfig, @@ -116,6 +117,7 @@ public DaVinciBackend( LOGGER.info("Creating Da Vinci backend with managed clients: {}", managedClients); try { VeniceServerConfig backendConfig = configLoader.getVeniceServerConfig(); + useDaVinciSpecificExecutionStatusForError = backendConfig.useDaVinciSpecificExecutionStatusForError(); this.configLoader = configLoader; metricsRepository = Optional.ofNullable(clientConfig.getMetricsRepository()) .orElse(TehutiUtils.getMetricsRepository("davinci-client")); @@ -664,7 +666,7 @@ public void error(String kafkaTopic, int partitionId, String message, Exception /** * Report push status needs to be executed before deleting the {@link VersionBackend}. */ - ExecutionStatus status = getDaVinciErrorStatus(e); + ExecutionStatus status = getDaVinciErrorStatus(e, useDaVinciSpecificExecutionStatusForError); reportPushStatus(kafkaTopic, partitionId, status); versionBackend.completePartitionExceptionally(partitionId, e); @@ -740,16 +742,21 @@ public void endOfIncrementalPushReceived( } }; - static ExecutionStatus getDaVinciErrorStatus(Exception e) { - ExecutionStatus status = DVC_INGESTION_ERROR_OTHER; - if (e instanceof VeniceException) { - if (e instanceof MemoryLimitExhaustedException - || (e.getCause() != null && e.getCause() instanceof MemoryLimitExhaustedException)) { - status = DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED; - } else if (e instanceof DiskLimitExhaustedException - || (e.getCause() != null && e.getCause() instanceof DiskLimitExhaustedException)) { - status = DVC_INGESTION_ERROR_DISK_FULL; + static ExecutionStatus getDaVinciErrorStatus(Exception e, boolean useDaVinciSpecificExecutionStatusForError) { + ExecutionStatus status; + if (useDaVinciSpecificExecutionStatusForError) { + status = DVC_INGESTION_ERROR_OTHER; + if (e instanceof VeniceException) { + if (e instanceof MemoryLimitExhaustedException + || (e.getCause() != null && e.getCause() instanceof MemoryLimitExhaustedException)) { + status = DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED; + } else if (e instanceof DiskLimitExhaustedException + || (e.getCause() != null && e.getCause() instanceof DiskLimitExhaustedException)) { + status = DVC_INGESTION_ERROR_DISK_FULL; + } } + } else { + status = ExecutionStatus.ERROR; } return status; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 6b6e106dda9..c2380f15617 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -129,6 +129,7 @@ import static com.linkedin.venice.ConfigKeys.SYSTEM_SCHEMA_INITIALIZATION_AT_START_TIME_ENABLED; import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED; import static com.linkedin.venice.ConfigKeys.UNSORTED_INPUT_DRAINER_SIZE; +import static com.linkedin.venice.ConfigKeys.USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR; import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE_DEFAULT_VALUE; import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModelFactory; @@ -458,6 +459,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final int nonExistingTopicCheckRetryIntervalSecond; private final boolean dedicatedConsumerPoolForAAWCLeaderEnabled; private final int dedicatedConsumerPoolSizeForAAWCLeader; + private final boolean useDaVinciSpecificExecutionStatusForError; public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException { this(serverProperties, Collections.emptyMap()); @@ -754,6 +756,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map { ExecutionStatus status = (ExecutionStatus) permutation[0]; return status.isDVCIngestionError(); - }, ExecutionStatus.values()); + }, ExecutionStatus.values(), BOOLEAN); } - @Test(dataProvider = "DvcErrorExecutionStatus") - public void testGetDaVinciErrorStatus(ExecutionStatus executionStatus) { + @Test(dataProvider = "DvcErrorExecutionStatusAndBoolean") + public void testGetDaVinciErrorStatus( + ExecutionStatus executionStatus, + boolean useDaVinciSpecificExecutionStatusForError) { VeniceException veniceException; switch (executionStatus) { case DVC_INGESTION_ERROR_DISK_FULL: @@ -40,15 +44,23 @@ public void testGetDaVinciErrorStatus(ExecutionStatus executionStatus) { fail("Unexpected execution status: " + executionStatus); return; } - assertEquals( - DaVinciBackend.getDaVinciErrorStatus(veniceException), - executionStatus.equals(ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES) - ? DVC_INGESTION_ERROR_OTHER - : executionStatus); + if (useDaVinciSpecificExecutionStatusForError) { + assertEquals( + DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError), + executionStatus.equals(ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES) + ? DVC_INGESTION_ERROR_OTHER + : executionStatus); + } else { + assertEquals( + DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError), + ERROR); + } } - @Test(dataProvider = "DvcErrorExecutionStatus") - public void testGetDaVinciErrorStatusNested(ExecutionStatus executionStatus) { + @Test(dataProvider = "DvcErrorExecutionStatusAndBoolean") + public void testGetDaVinciErrorStatusNested( + ExecutionStatus executionStatus, + boolean useDaVinciSpecificExecutionStatusForError) { VeniceException veniceException; switch (executionStatus) { case DVC_INGESTION_ERROR_DISK_FULL: @@ -65,15 +77,23 @@ public void testGetDaVinciErrorStatusNested(ExecutionStatus executionStatus) { fail("Unexpected execution status: " + executionStatus); return; } - assertEquals( - DaVinciBackend.getDaVinciErrorStatus(veniceException), - executionStatus.equals(ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES) - ? DVC_INGESTION_ERROR_OTHER - : executionStatus); + if (useDaVinciSpecificExecutionStatusForError) { + assertEquals( + DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError), + executionStatus.equals(ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES) + ? DVC_INGESTION_ERROR_OTHER + : executionStatus); + } else { + assertEquals( + DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError), + ERROR); + } } - @Test(dataProvider = "DvcErrorExecutionStatus") - public void testGetDaVinciErrorStatusWithInvalidCases(ExecutionStatus executionStatus) { + @Test(dataProvider = "DvcErrorExecutionStatusAndBoolean") + public void testGetDaVinciErrorStatusWithInvalidCases( + ExecutionStatus executionStatus, + boolean useDaVinciSpecificExecutionStatusForError) { VeniceException veniceException; switch (executionStatus) { case DVC_INGESTION_ERROR_DISK_FULL: @@ -87,7 +107,16 @@ public void testGetDaVinciErrorStatusWithInvalidCases(ExecutionStatus executionS return; } - assertEquals(DaVinciBackend.getDaVinciErrorStatus(veniceException), DVC_INGESTION_ERROR_OTHER); + if (useDaVinciSpecificExecutionStatusForError) { + assertEquals( + DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError), + DVC_INGESTION_ERROR_OTHER); + + } else { + assertEquals( + DaVinciBackend.getDaVinciErrorStatus(veniceException, useDaVinciSpecificExecutionStatusForError), + ERROR); + } } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 38e260d8558..f5afaee1692 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -1756,6 +1756,13 @@ private ConfigKeys() { public static final String PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS = "push.status.store.heartbeat.expiration.seconds"; + /** + * when enabled, Da Vinci Clients returns specific status codes to indicate the type of ingestion failure + * rather than a generic {@link com.linkedin.venice.pushmonitor.ExecutionStatus.ERROR} + */ + public static final String USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR = + "use.da.vinci.specific.execution.status.for.error"; + /** * Whether to throttle SSL connections between router and client. */ diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java index e9d5c28f76b..5201f366afb 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientDiskFullTest.java @@ -10,7 +10,9 @@ import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_DISK_FULL_THRESHOLD; import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; +import static com.linkedin.venice.ConfigKeys.USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR; import static com.linkedin.venice.hadoop.VenicePushJob.PushJobCheckpoints.DVC_INGESTION_ERROR_DISK_FULL; +import static com.linkedin.venice.hadoop.VenicePushJob.PushJobCheckpoints.START_JOB_STATUS_POLLING; import static com.linkedin.venice.hadoop.VenicePushJobConstants.PUSH_JOB_STATUS_UPLOAD_ENABLE; import static com.linkedin.venice.integration.utils.ServiceFactory.getVeniceCluster; import static com.linkedin.venice.meta.PersistenceType.ROCKS_DB; @@ -43,6 +45,7 @@ import com.linkedin.venice.integration.utils.VeniceRouterWrapper; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.status.protocol.PushJobDetails; +import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.TestWriteUtils; @@ -130,7 +133,8 @@ private double getDiskFullThreshold(int recordCount, int recordSizeMin) throws I return diskFullThreshold; } - private VeniceProperties getDaVinciBackendConfig() throws IOException { + private VeniceProperties getDaVinciBackendConfig(boolean useDaVinciSpecificExecutionStatusForError) + throws IOException { String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); PropertyBuilder venicePropertyBuilder = new PropertyBuilder(); @@ -141,6 +145,7 @@ private VeniceProperties getDaVinciBackendConfig() throws IOException { .put(PUSH_STATUS_STORE_ENABLED, true) .put(D2_ZK_HOSTS_ADDRESS, venice.getZk().getAddress()) .put(CLUSTER_DISCOVERY_D2_SERVICE, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) + .put(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, useDaVinciSpecificExecutionStatusForError) .put(SERVER_DISK_FULL_THRESHOLD, getDiskFullThreshold(largePushRecordCount, largePushRecordMinSize)); return venicePropertyBuilder.build(); } @@ -194,8 +199,8 @@ private PushJobDetails makeCopyOf(PushJobDetails pushJobDetails) { } } - @Test(timeOut = TEST_TIMEOUT) - public void testDaVinciDiskFullFailure() throws Exception { + @Test(timeOut = TEST_TIMEOUT, dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False") + public void testDaVinciDiskFullFailure(boolean useDaVinciSpecificExecutionStatusForError) throws Exception { String storeName = Utils.getUniqueString("davinci_disk_full_test"); // Test a small push File inputDir = getTempDataDirectory(); @@ -231,7 +236,7 @@ public void testDaVinciDiskFullFailure() throws Exception { }); // Spin up DaVinci client - VeniceProperties backendConfig = getDaVinciBackendConfig(); + VeniceProperties backendConfig = getDaVinciBackendConfig(useDaVinciSpecificExecutionStatusForError); MetricsRepository metricsRepository = new MetricsRepository(); try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, @@ -267,18 +272,27 @@ public void testDaVinciDiskFullFailure() throws Exception { VeniceException.class, () -> runVPJ(vpjPropertiesForV2, 2, controllerClient, Optional.of(pushJobDetailsTracker))); assertTrue( - exception.getMessage().contains("status: " + ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL), + exception.getMessage() + .contains( + "status: " + (useDaVinciSpecificExecutionStatusForError + ? ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL + : ExecutionStatus.ERROR)), exception.getMessage()); assertTrue( exception.getMessage() - .contains("Found a failed partition replica in Da Vinci due to disk threshold reached"), + .contains( + "Found a failed partition replica in Da Vinci" + + (useDaVinciSpecificExecutionStatusForError ? " due to disk threshold reached" : "")), exception.getMessage()); + assertEquals( pushJobDetailsTracker.getRecordedPushJobDetails() .get(pushJobDetailsTracker.getRecordedPushJobDetails().size() - 1) .getPushJobLatestCheckpoint() .intValue(), - DVC_INGESTION_ERROR_DISK_FULL.getValue()); + useDaVinciSpecificExecutionStatusForError + ? DVC_INGESTION_ERROR_DISK_FULL.getValue() + : START_JOB_STATUS_POLLING.getValue()); } finally { controllerClient.disableAndDeleteStore(storeName); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java index d2aeb971505..1d7f7a09595 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientMemoryLimitTest.java @@ -17,6 +17,7 @@ import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_SERVICE_PORT; import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_MODE; import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; +import static com.linkedin.venice.ConfigKeys.USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR; import static com.linkedin.venice.meta.PersistenceType.ROCKS_DB; import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob; import static com.linkedin.venice.utils.IntegrationTestPushUtils.defaultVPJProps; @@ -101,12 +102,18 @@ public void cleanUp() { Utils.closeQuietlyWithErrorLogged(venice); } - private VeniceProperties getDaVinciBackendConfig(boolean ingestionIsolationEnabledInDaVinci) { - return getDaVinciBackendConfig(ingestionIsolationEnabledInDaVinci, Collections.EMPTY_SET); + private VeniceProperties getDaVinciBackendConfig( + boolean ingestionIsolationEnabledInDaVinci, + boolean useDaVinciSpecificExecutionStatusForError) { + return getDaVinciBackendConfig( + ingestionIsolationEnabledInDaVinci, + useDaVinciSpecificExecutionStatusForError, + Collections.EMPTY_SET); } private VeniceProperties getDaVinciBackendConfig( boolean ingestionIsolationEnabledInDaVinci, + boolean useDaVinciSpecificExecutionStatusForError, Set memoryLimitStores) { String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); PropertyBuilder venicePropertyBuilder = new PropertyBuilder(); @@ -119,7 +126,8 @@ private VeniceProperties getDaVinciBackendConfig( .put(CLUSTER_DISCOVERY_D2_SERVICE, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) .put(ROCKSDB_MEMTABLE_SIZE_IN_BYTES, "2MB") .put(ROCKSDB_TOTAL_MEMTABLE_USAGE_CAP_IN_BYTES, "10MB") - .put(INGESTION_MEMORY_LIMIT_STORE_LIST, String.join(",", memoryLimitStores)); + .put(INGESTION_MEMORY_LIMIT_STORE_LIST, String.join(",", memoryLimitStores)) + .put(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, useDaVinciSpecificExecutionStatusForError); if (ingestionIsolationEnabledInDaVinci) { venicePropertyBuilder.put(SERVER_INGESTION_MODE, IngestionMode.ISOLATED); venicePropertyBuilder.put(SERVER_INGESTION_ISOLATION_APPLICATION_PORT, TestUtils.getFreePort()); @@ -133,9 +141,10 @@ private VeniceProperties getDaVinciBackendConfig( return venicePropertyBuilder.build(); } - @Test(timeOut = TEST_TIMEOUT, dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False") - public void testDaVinciMemoryLimitShouldFailLargeDataPush(boolean ingestionIsolationEnabledInDaVinci) - throws Exception { + @Test(timeOut = TEST_TIMEOUT, dataProviderClass = DataProviderUtils.class, dataProvider = "Two-True-and-False") + public void testDaVinciMemoryLimitShouldFailLargeDataPush( + boolean ingestionIsolationEnabledInDaVinci, + boolean useDaVinciSpecificExecutionStatusForError) throws Exception { String storeName = Utils.getUniqueString("davinci_memory_limit_test"); // Test a small push File inputDir = getTempDataDirectory(); @@ -173,8 +182,10 @@ public void testDaVinciMemoryLimitShouldFailLargeDataPush(boolean ingestionIsola }); // Spin up DaVinci client - VeniceProperties backendConfig = - getDaVinciBackendConfig(ingestionIsolationEnabledInDaVinci, new HashSet<>(Arrays.asList(storeName))); + VeniceProperties backendConfig = getDaVinciBackendConfig( + ingestionIsolationEnabledInDaVinci, + useDaVinciSpecificExecutionStatusForError, + new HashSet<>(Arrays.asList(storeName))); MetricsRepository metricsRepository = new MetricsRepository(); try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, @@ -204,10 +215,16 @@ public void testDaVinciMemoryLimitShouldFailLargeDataPush(boolean ingestionIsola VeniceException exception = expectThrows(VeniceException.class, () -> runVPJ(vpjPropertiesForV2, 2, controllerClient)); assertTrue( - exception.getMessage().contains("status: " + ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED)); + exception.getMessage() + .contains( + "status: " + (useDaVinciSpecificExecutionStatusForError + ? ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED + : ExecutionStatus.ERROR))); assertTrue( exception.getMessage() - .contains("Found a failed partition replica in Da Vinci due to memory limit reached")); + .contains( + "Found a failed partition replica in Da Vinci" + + (useDaVinciSpecificExecutionStatusForError ? " due to memory limit reached" : ""))); // Run a bigger push against a non-enforced store should succeed vpjProperties = defaultVPJProps(venice, inputDirPath, storeNameWithoutMemoryEnforcement); @@ -240,9 +257,10 @@ public void testDaVinciMemoryLimitShouldFailLargeDataPush(boolean ingestionIsola } } - @Test(timeOut = TEST_TIMEOUT, dataProviderClass = DataProviderUtils.class, dataProvider = "True-and-False") + @Test(timeOut = TEST_TIMEOUT, dataProviderClass = DataProviderUtils.class, dataProvider = "Two-True-and-False") public void testDaVinciMemoryLimitShouldFailLargeDataPushAndResumeHybridStore( - boolean ingestionIsolationEnabledInDaVinci) throws Exception { + boolean ingestionIsolationEnabledInDaVinci, + boolean useDaVinciSpecificExecutionStatusForError) throws Exception { String batchOnlyStoreName = Utils.getUniqueString("davinci_memory_limit_test_batch_only"); // Test a small push File inputDir = getTempDataDirectory(); @@ -305,7 +323,8 @@ public void testDaVinciMemoryLimitShouldFailLargeDataPushAndResumeHybridStore( }); // Spin up DaVinci client - VeniceProperties backendConfig = getDaVinciBackendConfig(ingestionIsolationEnabledInDaVinci); + VeniceProperties backendConfig = + getDaVinciBackendConfig(ingestionIsolationEnabledInDaVinci, useDaVinciSpecificExecutionStatusForError); MetricsRepository metricsRepository = new MetricsRepository(); try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, @@ -361,10 +380,16 @@ public void testDaVinciMemoryLimitShouldFailLargeDataPushAndResumeHybridStore( VeniceException exception = expectThrows(VeniceException.class, () -> runVPJ(vpjPropertiesForV2, 2, controllerClient)); assertTrue( - exception.getMessage().contains("status: " + ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED)); + exception.getMessage() + .contains( + "status: " + (useDaVinciSpecificExecutionStatusForError + ? ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED + : ExecutionStatus.ERROR))); assertTrue( exception.getMessage() - .contains("Found a failed partition replica in Da Vinci due to memory limit reached")); + .contains( + "Found a failed partition replica in Da Vinci" + + (useDaVinciSpecificExecutionStatusForError ? " due to memory limit reached" : ""))); // Write more records to the hybrid store. for (; hybridStoreKeyId < 200; ++hybridStoreKeyId) { @@ -462,7 +487,7 @@ public void testHybridStoreHittingMemoryLimiterShouldResumeAfterFreeUpResource( }); // Spin up DaVinci client - VeniceProperties backendConfig = getDaVinciBackendConfig(ingestionIsolationEnabledInDaVinci); + VeniceProperties backendConfig = getDaVinciBackendConfig(ingestionIsolationEnabledInDaVinci, false); MetricsRepository metricsRepository = new MetricsRepository(); try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java index 19d06aa3047..44119755117 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java @@ -99,6 +99,7 @@ import static com.linkedin.venice.ConfigKeys.TOPIC_DELETION_STATUS_POLL_INTERVAL_MS; import static com.linkedin.venice.ConfigKeys.TOPIC_MANAGER_KAFKA_OPERATION_TIMEOUT_MS; import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED; +import static com.linkedin.venice.ConfigKeys.USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR; import static com.linkedin.venice.ConfigKeys.USE_PUSH_STATUS_STORE_FOR_INCREMENTAL_PUSH; import static com.linkedin.venice.ConfigKeys.VENICE_STORAGE_CLUSTER_LEADER_HAAS; import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_DELETION_STATUS_POLL_INTERVAL_MS_DEFAULT_VALUE; @@ -318,7 +319,7 @@ public class VeniceControllerConfig extends VeniceControllerClusterConfig { private final int unusedSchemaCleanupIntervalSeconds; private final int minSchemaCountToKeep; - + private final boolean useDaVinciSpecificExecutionStatusForError; private final PubSubClientsFactory pubSubClientsFactory; public VeniceControllerConfig(VeniceProperties props) { @@ -556,6 +557,8 @@ public VeniceControllerConfig(VeniceProperties props) { props.getBoolean(CONTROLLER_UNUSED_VALUE_SCHEMA_CLEANUP_ENABLED, false); this.unusedSchemaCleanupIntervalSeconds = props.getInt(CONTROLLER_UNUSED_SCHEMA_CLEANUP_INTERVAL_SECONDS, 36000); this.minSchemaCountToKeep = props.getInt(CONTROLLER_MIN_SCHEMA_COUNT_TO_KEEP, 20); + this.useDaVinciSpecificExecutionStatusForError = + props.getBoolean(USE_DA_VINCI_SPECIFIC_EXECUTION_STATUS_FOR_ERROR, false); this.pubSubClientsFactory = new PubSubClientsFactory(props); } @@ -678,6 +681,10 @@ public int getMinSchemaCountToKeep() { return minSchemaCountToKeep; } + public boolean useDaVinciSpecificExecutionStatusForError() { + return useDaVinciSpecificExecutionStatusForError; + } + public Map getChildDataCenterControllerD2Map() { return childDataCenterControllerD2Map; } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index eebdbe6037c..65f7ae58ea5 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -5831,7 +5831,8 @@ private OfflinePushStatusInfo getOfflinePushStatusInfo( version.getPartitionCount(), incrementalPushVersion, multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceCount(), - multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceRatio()); + multiClusterConfigs.getControllerConfig(clusterName).getDaVinciPushStatusScanMaxOfflineInstanceRatio(), + multiClusterConfigs.getCommonConfig().useDaVinciSpecificExecutionStatusForError()); ExecutionStatus daVinciStatus = daVinciStatusAndDetails.getStatus(); String daVinciDetails = daVinciStatusAndDetails.getDetails(); ExecutionStatus overallExecutionStatus = getOverallPushStatus(executionStatus, daVinciStatus); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java index d6a15a5871d..d7eda14df46 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java @@ -130,7 +130,8 @@ public AbstractPushMonitor( controllerConfig.getDaVinciPushStatusScanThreadNumber(), controllerConfig.getDaVinciPushStatusScanNoReportRetryMaxAttempt(), controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceCount(), - controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio()); + controllerConfig.getDaVinciPushStatusScanMaxOfflineInstanceRatio(), + controllerConfig.useDaVinciSpecificExecutionStatusForError()); this.isOfflinePushMonitorDaVinciPushStatusEnabled = controllerConfig.isDaVinciPushStatusEnabled(); pushStatusCollector.start(); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java index 3f5c2a6cd64..3dd9f79065d 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushMonitorUtils.java @@ -51,7 +51,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails( int partitionCount, Optional incrementalPushVersion, int maxOfflineInstanceCount, - double maxOfflineInstanceRatio) { + double maxOfflineInstanceRatio, + boolean useDaVinciSpecificExecutionStatusForError) { if (reader == null) { throw new VeniceException("PushStatusStoreReader is null"); } @@ -70,7 +71,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails( partitionCount, incrementalPushVersion, maxOfflineInstanceCount, - maxOfflineInstanceRatio); + maxOfflineInstanceRatio, + useDaVinciSpecificExecutionStatusForError); } else { // DaVinci starts using new status key format, which contains status for all partitions in one key. // Only batch pushes will use this key; incremental pushes will still use partition level status key. @@ -126,7 +128,9 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails( if (lastUpdateTime + TimeUnit.MINUTES.toMillis(daVinciErrorInstanceWaitTime) < System.currentTimeMillis()) { storeVersionToDVCDeadInstanceTimeMap.remove(topicName); return new ExecutionStatusWithDetails( - ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES, + useDaVinciSpecificExecutionStatusForError + ? ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES + : ExecutionStatus.ERROR, "Too many dead instances: " + offlineInstanceCount + ", total instances: " + totalInstanceCount + ", example offline instances: " + offlineInstanceList, noDaVinciStatusReported); @@ -165,7 +169,8 @@ public static ExecutionStatusWithDetails getDaVinciPushStatusAndDetails( partitionCount, incrementalPushVersion, maxOfflineInstanceCount, - maxOfflineInstanceRatio); + maxOfflineInstanceRatio, + useDaVinciSpecificExecutionStatusForError); if (partitionLevelStatus.getStatus() != ExecutionStatus.COMPLETED) { // Do not report COMPLETED, instead, report status from the partition level status key. statusDetailStringBuilder.append( @@ -200,7 +205,8 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe int partitionCount, Optional incrementalPushVersion, int maxOfflineInstanceCount, - double maxOfflineInstanceRatio) { + double maxOfflineInstanceRatio, + boolean useDaVinciSpecificExecutionStatusForError) { if (reader == null) { throw new VeniceException("PushStatusStoreReader is null"); } @@ -277,7 +283,9 @@ public static ExecutionStatusWithDetails getDaVinciPartitionLevelPushStatusAndDe if (lastUpdateTime + TimeUnit.MINUTES.toMillis(daVinciErrorInstanceWaitTime) < System.currentTimeMillis()) { storeVersionToDVCDeadInstanceTimeMap.remove(topicName); return new ExecutionStatusWithDetails( - ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES, + useDaVinciSpecificExecutionStatusForError + ? ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES + : ExecutionStatus.ERROR, "Too many dead instances: " + offlineReplicaCount + ", total instances: " + totalReplicaCount + ", example offline instances: " + offlineInstanceList, noDaVinciStatusReported); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java index 02750366da0..db0c36ad64e 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/PushStatusCollector.java @@ -49,6 +49,7 @@ public class PushStatusCollector { private final AtomicBoolean isStarted = new AtomicBoolean(false); private final Map topicToNoDaVinciStatusRetryCountMap = new HashMap<>(); + private final boolean useDaVinciSpecificExecutionStatusForError; public PushStatusCollector( ReadWriteStoreRepository storeRepository, @@ -60,7 +61,8 @@ public PushStatusCollector( int daVinciPushStatusScanThreadNumber, int daVinciPushStatusNoReportRetryMaxAttempts, int daVinciPushStatusScanMaxOfflineInstanceCount, - double daVinciPushStatusScanMaxOfflineInstanceRatio) { + double daVinciPushStatusScanMaxOfflineInstanceRatio, + boolean useDaVinciSpecificExecutionStatusForError) { this.storeRepository = storeRepository; this.pushStatusStoreReader = pushStatusStoreReader; this.pushCompletedHandler = pushCompletedHandler; @@ -71,6 +73,7 @@ public PushStatusCollector( this.daVinciPushStatusNoReportRetryMaxAttempts = daVinciPushStatusNoReportRetryMaxAttempts; this.daVinciPushStatusScanMaxOfflineInstanceCount = daVinciPushStatusScanMaxOfflineInstanceCount; this.daVinciPushStatusScanMaxOfflineInstanceRatio = daVinciPushStatusScanMaxOfflineInstanceRatio; + this.useDaVinciSpecificExecutionStatusForError = useDaVinciSpecificExecutionStatusForError; } public void start() { @@ -135,7 +138,8 @@ private void scanDaVinciPushStatus() { pushStatus.getPartitionCount(), Optional.empty(), daVinciPushStatusScanMaxOfflineInstanceCount, - daVinciPushStatusScanMaxOfflineInstanceRatio); + daVinciPushStatusScanMaxOfflineInstanceRatio, + useDaVinciSpecificExecutionStatusForError); pushStatus.setDaVinciStatus(statusWithDetails); return pushStatus; }, pushStatusStoreScanExecutor)); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java index 04c6d02fa6d..6d3df2d5594 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushMonitorUtilsTest.java @@ -5,6 +5,7 @@ import static org.mockito.Mockito.mock; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; +import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.Utils; import java.util.Collections; import java.util.HashMap; @@ -51,8 +52,8 @@ public void testCompleteStatusCanBeReportedWithOfflineInstancesBelowFailFastThre validatePushStatus(reader, "store_v1", 2, 0.25, ExecutionStatus.COMPLETED); } - @Test - public void testDaVinciPushStatusScan() { + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testDaVinciPushStatusScan(boolean useDaVinciSpecificExecutionStatusForError) { PushMonitorUtils.setDaVinciErrorInstanceWaitTime(0); PushStatusStoreReader reader = mock(PushStatusStoreReader.class); doReturn(true).when(reader).isInstanceAlive(eq("store"), eq("a")); @@ -83,7 +84,8 @@ public void testDaVinciPushStatusScan() { 2, 0.25, ExecutionStatus.STARTED, - null); + null, + useDaVinciSpecificExecutionStatusForError); // Expected to fail. validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold( @@ -91,8 +93,11 @@ public void testDaVinciPushStatusScan() { "store_v1", 1, 0.25, - ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES, - "Too many dead instances: 2, total instances: 4, example offline instances: " + offlineInstances); + useDaVinciSpecificExecutionStatusForError + ? ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES + : ExecutionStatus.ERROR, + "Too many dead instances: 2, total instances: 4, example offline instances: " + offlineInstances, + useDaVinciSpecificExecutionStatusForError); /** * Testing ratio-based threshold. @@ -104,15 +109,19 @@ public void testDaVinciPushStatusScan() { 1, 0.5, ExecutionStatus.STARTED, - null); + null, + useDaVinciSpecificExecutionStatusForError); // Expected to fail. validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold( reader, "store_v2", 1, 0.25, - ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES, - "Too many dead instances: 2, total instances: 4, example offline instances: " + offlineInstances); + useDaVinciSpecificExecutionStatusForError + ? ExecutionStatus.DVC_INGESTION_ERROR_TOO_MANY_DEAD_INSTANCES + : ExecutionStatus.ERROR, + "Too many dead instances: 2, total instances: 4, example offline instances: " + offlineInstances, + useDaVinciSpecificExecutionStatusForError); } private void validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold( @@ -121,7 +130,8 @@ private void validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold( int maxOfflineInstanceCount, double maxOfflineInstanceRatio, ExecutionStatus expectedStatus, - String expectedErrorDetails) { + String expectedErrorDetails, + boolean useDaVinciSpecificExecutionStatusForError) { /** * Even if offline instances number exceed the max offline threshold count it will remain STARTED for the first check, * as we need to wait until daVinciErrorInstanceWaitTime has passed since it first occurs. @@ -132,7 +142,8 @@ private void validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold( 1, Optional.empty(), maxOfflineInstanceCount, - maxOfflineInstanceRatio); + maxOfflineInstanceRatio, + useDaVinciSpecificExecutionStatusForError); Assert.assertEquals(executionStatusWithDetails.getStatus(), ExecutionStatus.STARTED); // Sleep 1ms and try again. Utils.sleep(1); @@ -142,7 +153,8 @@ private void validateOfflineReplicaInPushStatusWhenBreachingFailFastThreshold( 1, Optional.empty(), maxOfflineInstanceCount, - maxOfflineInstanceRatio); + maxOfflineInstanceRatio, + useDaVinciSpecificExecutionStatusForError); Assert.assertEquals(executionStatusWithDetails.getStatus(), expectedStatus); if (expectedStatus.isError()) { Assert.assertEquals(executionStatusWithDetails.getDetails(), expectedErrorDetails); @@ -161,7 +173,8 @@ private void validatePushStatus( 1, Optional.empty(), maxOfflineInstanceCount, - maxOfflineInstanceRatio); + maxOfflineInstanceRatio, + true); Assert.assertEquals(executionStatusWithDetails.getStatus(), expectedStatus); } } diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java index b3ebd8cf480..1f3edb5be3b 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/pushmonitor/PushStatusCollectorTest.java @@ -72,7 +72,8 @@ public void testPushStatusCollector() { 4, 1, 20, - 1); + 1, + true); pushStatusCollector.start(); pushStatusCollector.subscribeTopic(regularStoreTopicV1, 10); @@ -223,7 +224,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetry() { 4, 1, 20, - 1); + 1, + true); pushStatusCollector.start(); pushCompletedCount.set(0); @@ -314,7 +316,8 @@ public void testPushStatusCollectorDaVinciStatusPollingRetryWhenEmptyResultUntil 4, 0, 20, - 1); + 1, + true); pushStatusCollector.start(); pushCompletedCount.set(0);