Skip to content

Commit

Permalink
[dvc] Add a new config to replace pid as instance name suffix (#934)
Browse files Browse the repository at this point in the history
Using process pid as instance name suffix will not work for VPJ when application slice is performing large scale restart.
This PR introduces a new config as instance suffix to replace PID. And if the config is not specified, we will still use PID as fall back (but not recommended)
  • Loading branch information
sixpluszero authored Apr 5, 2024
1 parent b275867 commit 4af1744
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci;

import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX;
import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION;
import static java.lang.Thread.currentThread;

Expand Down Expand Up @@ -190,7 +191,9 @@ public DaVinciBackend(
VeniceWriterFactory writerFactory =
new VeniceWriterFactory(backendProps.toProperties(), pubSubClientsFactory.getProducerAdapterFactory(), null);
String pid = Utils.getPid();
String instanceName = Utils.getHostName() + "_" + (pid == null ? "NA" : pid);
String instanceSuffix =
configLoader.getCombinedProperties().getString(PUSH_STATUS_INSTANCE_NAME_SUFFIX, (pid == null ? "NA" : pid));
String instanceName = Utils.getHostName() + "_" + instanceSuffix;

// Fetch latest update schema's protocol ID for Push Status Store from Router.
ClientConfig pushStatusStoreClientConfig = ClientConfig.cloneConfig(clientConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1738,18 +1738,19 @@ private ConfigKeys() {
public static final String PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS =
"push.status.store.heartbeat.interval.seconds";

/**
* Config to control what's the suffix for Da Vinci instance which is reporting push status and heartbeats. By default,
* it is process PID if not specified, but note that PID is subject to change upon instance restart.
*/
public static final String PUSH_STATUS_INSTANCE_NAME_SUFFIX = "push.status.instance.name.suffix";

/**
* The expiration timeout. If an instance not sending heartbeats for over the expiration
* time, it will be considered as stale.
*/
public static final String PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS =
"push.status.store.heartbeat.expiration.seconds";

/**
* Derived schemaId for push status store write compute.
*/
public static final String PUSH_STATUS_STORE_DERIVED_SCHEMA_ID = "push.status.store.derived.schema.id";

/**
* Whether to throttle SSL connections between router and client.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY;
import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.USE_PUSH_STATUS_STORE_FOR_INCREMENTAL_PUSH;
Expand Down Expand Up @@ -144,6 +145,8 @@ public void testKafkaPushJob(boolean isIsolated) throws Exception {
extraBackendConfigMap.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 10);
extraBackendConfigMap.put(PUSH_STATUS_STORE_ENABLED, true);
extraBackendConfigMap.put(DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS, 5);
String expectedInstanceSuffix = "sampleApp_i015";
extraBackendConfigMap.put(PUSH_STATUS_INSTANCE_NAME_SUFFIX, expectedInstanceSuffix);

try (DaVinciClient<Integer, Integer> daVinciClient = ServiceFactory.getGenericAvroDaVinciClientWithRetries(
storeName,
Expand All @@ -152,8 +155,11 @@ public void testKafkaPushJob(boolean isIsolated) throws Exception {
extraBackendConfigMap)) {
daVinciClient.subscribeAll().get();
runVPJ(vpjProperties, 2, cluster);
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, () -> {
assertEquals(reader.getPartitionStatus(storeName, 2, 0, Optional.empty()).size(), 1);
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, true, () -> {
Map<CharSequence, Integer> partitionStatus = reader.getPartitionStatus(storeName, 2, 0, Optional.empty());
assertEquals(partitionStatus.size(), 1);
String expectedHostName = Utils.getHostName() + "_" + expectedInstanceSuffix;
assertTrue(partitionStatus.containsKey(new Utf8(expectedHostName)));
});
}
Admin admin = cluster.getVeniceControllers().get(0).getVeniceAdmin();
Expand Down

0 comments on commit 4af1744

Please sign in to comment.