Skip to content

Commit

Permalink
[dvc] dvc node blob discovery (#980)
Browse files Browse the repository at this point in the history
* dvc blob discovery - router changes

---------

Co-authored-by: “ishwarya-personal” <“[email protected]”>
  • Loading branch information
ishwarya-citro and “ishwarya-personal” authored Jun 10, 2024
1 parent e382726 commit d37e6a9
Show file tree
Hide file tree
Showing 11 changed files with 717 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.linkedin.venice.routerapi;

import com.linkedin.venice.controllerapi.ControllerResponse;
import java.util.List;


public class BlobDiscoveryResponse extends ControllerResponse {
private List<String> liveNodeHostNames;

public void setLiveNodeNames(List<String> liveNodeHostNames) {
this.liveNodeHostNames = liveNodeHostNames;
}

public List<String> getLiveNodeHostNames() {
return liveNodeHostNames;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice;

import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -12,6 +13,9 @@ public class ConfigConstants {

public static final int UNSPECIFIED_REPLICATION_METADATA_VERSION = -1;

public static final long DEFAULT_PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS =
TimeUnit.MINUTES.toSeconds(10);

/**
* End of controller config default value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public class ControllerApiConstants {
public static final String BATCH_JOB_HEARTBEAT_ENABLED = "batch_job_heartbeat_enabled";

public static final String NAME = "store_name";
public static final String STORE_PARTITION = "store_partition";
public static final String STORE_VERSION = "store_version";
public static final String OWNER = "owner";
public static final String FABRIC = "fabric";
public static final String FABRIC_A = "fabric_a";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package com.linkedin.venice.router;

import static com.linkedin.venice.ConfigKeys.CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS;
import static com.linkedin.venice.ConfigKeys.CLIENT_USE_SYSTEM_STORE_REPOSITORY;
import static com.linkedin.venice.ConfigKeys.DATA_BASE_PATH;
import static com.linkedin.venice.ConfigKeys.OFFLINE_JOB_START_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE;
import static com.linkedin.venice.router.api.VenicePathParser.TYPE_BLOB_DISCOVERY;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.fail;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.d2.balancer.D2Client;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
import com.linkedin.davinci.client.factory.CachingDaVinciClientFactory;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.D2TestUtils;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceControllerWrapper;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.routerapi.BlobDiscoveryResponse;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestBlobDiscovery {
private static final String INT_KEY_SCHEMA = "\"int\"";
private static final String INT_VALUE_SCHEMA = "\"int\"";
String clusterName;
String storeName;
private VeniceMultiClusterWrapper multiClusterVenice;
D2Client daVinciD2;

/**
* Set up a multi-cluster Venice environment with meta system store enabled Venice stores.
*/

@BeforeClass(alwaysRun = true)
public void setUp() {
Utils.thisIsLocalhost();

Properties parentControllerProps = new Properties();
parentControllerProps.put(OFFLINE_JOB_START_TIMEOUT_MS, "180000");

VeniceTwoLayerMultiRegionMultiClusterWrapper multiRegionMultiClusterWrapper =
ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(
1,
2,
1,
1,
3,
1,
3,
Optional.of(parentControllerProps),
Optional.empty(),
Optional.empty(),
false);

multiClusterVenice = multiRegionMultiClusterWrapper.getChildRegions().get(0);
String[] clusterNames = multiClusterVenice.getClusterNames();
String parentControllerURLs = multiRegionMultiClusterWrapper.getParentControllers()
.stream()
.map(VeniceControllerWrapper::getControllerUrl)
.collect(Collectors.joining(","));

for (String cluster: clusterNames) {
try (ControllerClient controllerClient =
new ControllerClient(cluster, multiClusterVenice.getControllerConnectString())) {
// Verify the participant store is up and running in child region
String participantStoreName = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(cluster);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(participantStoreName, 1),
controllerClient,
5,
TimeUnit.MINUTES);
}
}

clusterName = clusterNames[0];
storeName = Utils.getUniqueString("test-store");

List<PubSubBrokerWrapper> pubSubBrokerWrappers = multiClusterVenice.getClusters()
.values()
.stream()
.map(VeniceClusterWrapper::getPubSubBrokerWrapper)
.collect(Collectors.toList());
Map<String, String> additionalPubSubProperties =
PubSubBrokerWrapper.getBrokerDetailsForClients(pubSubBrokerWrappers);

try (ControllerClient parentControllerClient = new ControllerClient(clusterName, parentControllerURLs)) {
assertFalse(
parentControllerClient.createNewStore(storeName, "venice-test", INT_KEY_SCHEMA, INT_VALUE_SCHEMA).isError());

PubSubProducerAdapterFactory pubSubProducerAdapterFactory = multiClusterVenice.getClusters()
.get(clusterName)
.getPubSubBrokerWrapper()
.getPubSubClientsFactory()
.getProducerAdapterFactory();

VersionCreationResponse response = TestUtils.createVersionWithBatchData(
parentControllerClient,
storeName,
INT_KEY_SCHEMA,
INT_VALUE_SCHEMA,
IntStream.range(0, 10).mapToObj(i -> new AbstractMap.SimpleEntry<>(i, 0)),
pubSubProducerAdapterFactory,
additionalPubSubProperties);

// Verify the data can be ingested by classical Venice before proceeding.
TestUtils.waitForNonDeterministicPushCompletion(
response.getKafkaTopic(),
parentControllerClient,
30,
TimeUnit.SECONDS);

makeSureSystemStoresAreOnline(parentControllerClient, storeName);
multiClusterVenice.getClusters().get(clusterName).refreshAllRouterMetaData();
}

VeniceProperties backendConfig =
new PropertyBuilder().put(DATA_BASE_PATH, Utils.getTempDataDirectory().getAbsolutePath())
.put(PERSISTENCE_TYPE, PersistenceType.ROCKS_DB)
.put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.build();

DaVinciConfig daVinciConfig = new DaVinciConfig();
daVinciD2 = D2TestUtils.getAndStartD2Client(multiClusterVenice.getZkServerWrapper().getAddress());

try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
daVinciD2,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
new MetricsRepository(),
backendConfig)) {
List<DaVinciClient<Integer, Object>> clients = new ArrayList<>();
DaVinciClient<Integer, Object> client = factory.getAndStartGenericAvroClient(storeName, daVinciConfig);
client.subscribeAll().get();
clients.add(client);
} catch (ExecutionException | InterruptedException e) {
throw new VeniceException(e);
}
}

@AfterTest
public void tearDown() {
D2ClientUtils.shutdownClient(daVinciD2);
}

@Test(timeOut = 60 * Time.MS_PER_SECOND)
public void testBlobDiscovery() throws Exception {
VeniceClusterWrapper veniceClusterWrapper = multiClusterVenice.getClusters().get(clusterName);
TestUtils.waitForNonDeterministicAssertion(2, TimeUnit.MINUTES, true, () -> {
veniceClusterWrapper.updateStore(storeName, new UpdateStoreQueryParams().setBlobTransferEnabled(true));
});

String routerURL = veniceClusterWrapper.getRandomRouterURL();

try (CloseableHttpAsyncClient client = HttpAsyncClients.custom()
.setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(4000).build())
.build()) {
client.start();

String uri =
routerURL + "/" + TYPE_BLOB_DISCOVERY + "?store_name=" + storeName + "&store_version=1&store_partition=1";
HttpGet routerRequest = new HttpGet(uri);
HttpResponse response = client.execute(routerRequest, null).get();
String responseBody;
try (InputStream bodyStream = response.getEntity().getContent()) {
responseBody = IOUtils.toString(bodyStream, Charset.defaultCharset());
}
Assert.assertEquals(
response.getStatusLine().getStatusCode(),
HttpStatus.SC_OK,
"Failed to get resource state for " + storeName + ". Response: " + responseBody);
ObjectMapper mapper = ObjectMapperFactory.getInstance();
BlobDiscoveryResponse blobDiscoveryResponse =
mapper.readValue(responseBody.getBytes(), BlobDiscoveryResponse.class);
// TODO: add another testcase to retrieve >= 1 live nodes
Assert.assertEquals(blobDiscoveryResponse.getLiveNodeHostNames().size(), 0);
} catch (Exception e) {
fail("Unexpected exception", e);
}
}

private void makeSureSystemStoresAreOnline(ControllerClient controllerClient, String storeName) {
String metaSystemStoreTopic =
Version.composeKafkaTopic(VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName), 1);
TestUtils.waitForNonDeterministicPushCompletion(metaSystemStoreTopic, controllerClient, 30, TimeUnit.SECONDS);
String daVinciPushStatusStore =
Version.composeKafkaTopic(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(storeName), 1);
TestUtils.waitForNonDeterministicPushCompletion(daVinciPushStatusStore, controllerClient, 30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void setUp() throws VeniceClientException, ExecutionException, Interrupte
// To trigger long-tail retry
extraProperties.put(ConfigKeys.ROUTER_LONG_TAIL_RETRY_FOR_SINGLE_GET_THRESHOLD_MS, 1);
extraProperties.put(ConfigKeys.ROUTER_MAX_KEY_COUNT_IN_MULTIGET_REQ, MAX_KEY_LIMIT); // 20 keys at most in a
// batch-get request
// batch-get request
extraProperties.put(ConfigKeys.ROUTER_LONG_TAIL_RETRY_FOR_BATCH_GET_THRESHOLD_MS, "1-:1");
extraProperties.put(ConfigKeys.ROUTER_SMART_LONG_TAIL_RETRY_ENABLED, false);
extraProperties.put(ConfigKeys.ROUTER_STORAGE_NODE_CLIENT_TYPE, getStorageNodeClientType());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.ConfigConstants.DEFAULT_PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.ACTIVE_ACTIVE_ENABLED_ON_CONTROLLER;
import static com.linkedin.venice.ConfigKeys.ACTIVE_ACTIVE_REAL_TIME_SOURCE_FABRIC_LIST;
import static com.linkedin.venice.ConfigKeys.ADMIN_CHECK_READ_METHOD_FOR_KAFKA;
Expand Down Expand Up @@ -510,8 +511,9 @@ public VeniceControllerConfig(VeniceProperties props) {
*/
this.zkSharedMetaSystemSchemaStoreAutoCreationEnabled =
props.getBoolean(CONTROLLER_ZK_SHARED_META_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED, false);
this.pushStatusStoreHeartbeatExpirationTimeInSeconds =
props.getLong(PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS, TimeUnit.MINUTES.toSeconds(10));
this.pushStatusStoreHeartbeatExpirationTimeInSeconds = props.getLong(
PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS,
DEFAULT_PUSH_STATUS_STORE_HEARTBEAT_EXPIRATION_TIME_IN_SECONDS);
this.isDaVinciPushStatusStoreEnabled = props.getBoolean(PUSH_STATUS_STORE_ENABLED, false);
this.daVinciPushStatusScanEnabled =
props.getBoolean(DAVINCI_PUSH_STATUS_SCAN_ENABLED, true) && isDaVinciPushStatusStoreEnabled;
Expand Down
Loading

0 comments on commit d37e6a9

Please sign in to comment.