diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index a4392ccabda..69b85623b4b 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -1,5 +1,6 @@ package com.linkedin.davinci; +import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX; import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION; import static java.lang.Thread.currentThread; @@ -190,7 +191,9 @@ public DaVinciBackend( VeniceWriterFactory writerFactory = new VeniceWriterFactory(backendProps.toProperties(), pubSubClientsFactory.getProducerAdapterFactory(), null); String pid = Utils.getPid(); - String instanceName = Utils.getHostName() + "_" + (pid == null ? "NA" : pid); + String instanceSuffix = + configLoader.getCombinedProperties().getString(PUSH_STATUS_INSTANCE_NAME_SUFFIX, (pid == null ? "NA" : pid)); + String instanceName = Utils.getHostName() + "_" + instanceSuffix; // Fetch latest update schema's protocol ID for Push Status Store from Router. ClientConfig pushStatusStoreClientConfig = ClientConfig.cloneConfig(clientConfig) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index 2f265dcf5fd..18cbaff885c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -1738,6 +1738,12 @@ private ConfigKeys() { public static final String PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS = "push.status.store.heartbeat.interval.seconds"; + /** + * Config to control what's the suffix for Da Vinci instance which is reporting push status and heartbeats. By default, + * it is process PID if not specified, but note that PID is subject to change upon instance restart. + */ + public static final String PUSH_STATUS_INSTANCE_NAME_SUFFIX = "push.status.instance.name.suffix"; + /** * The expiration timeout. If an instance not sending heartbeats for over the expiration * time, it will be considered as stale. @@ -1745,11 +1751,6 @@ private ConfigKeys() { public static final String PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS = "push.status.store.heartbeat.expiration.seconds"; - /** - * Derived schemaId for push status store write compute. - */ - public static final String PUSH_STATUS_STORE_DERIVED_SCHEMA_ID = "push.status.store.derived.schema.id"; - /** * Whether to throttle SSL connections between router and client. */ diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java index e8f745c6212..e1812496fc9 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java @@ -3,6 +3,7 @@ import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS; import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS; +import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED; import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS; import static com.linkedin.venice.ConfigKeys.USE_PUSH_STATUS_STORE_FOR_INCREMENTAL_PUSH; @@ -144,6 +145,8 @@ public void testKafkaPushJob(boolean isIsolated) throws Exception { extraBackendConfigMap.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 10); extraBackendConfigMap.put(PUSH_STATUS_STORE_ENABLED, true); extraBackendConfigMap.put(DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS, 5); + String expectedInstanceSuffix = "sampleApp_i015"; + extraBackendConfigMap.put(PUSH_STATUS_INSTANCE_NAME_SUFFIX, expectedInstanceSuffix); try (DaVinciClient daVinciClient = ServiceFactory.getGenericAvroDaVinciClientWithRetries( storeName, @@ -152,8 +155,11 @@ public void testKafkaPushJob(boolean isIsolated) throws Exception { extraBackendConfigMap)) { daVinciClient.subscribeAll().get(); runVPJ(vpjProperties, 2, cluster); - TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, () -> { - assertEquals(reader.getPartitionStatus(storeName, 2, 0, Optional.empty()).size(), 1); + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, true, () -> { + Map partitionStatus = reader.getPartitionStatus(storeName, 2, 0, Optional.empty()); + assertEquals(partitionStatus.size(), 1); + String expectedHostName = Utils.getHostName() + "_" + expectedInstanceSuffix; + assertTrue(partitionStatus.containsKey(new Utf8(expectedHostName))); }); } Admin admin = cluster.getVeniceControllers().get(0).getVeniceAdmin();