diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/BlobTransferUtil.java b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/BlobTransferUtil.java index 6f4676a4f2b..f80033e5e4b 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/BlobTransferUtil.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/BlobTransferUtil.java @@ -2,6 +2,7 @@ import com.linkedin.venice.blobtransfer.client.NettyFileTransferClient; import com.linkedin.venice.blobtransfer.server.P2PBlobTransferService; +import com.linkedin.venice.client.store.AvroGenericStoreClientImpl; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.ClientFactory; import org.apache.logging.log4j.LogManager; @@ -32,10 +33,12 @@ public static BlobTransferManager getP2PBlobTransferManagerAndStart( String baseDir, ClientConfig clientConfig) { try { + AvroGenericStoreClientImpl storeClient = + (AvroGenericStoreClientImpl) ClientFactory.getAndStartGenericAvroClient(clientConfig); BlobTransferManager manager = new NettyP2PBlobTransferManager( new P2PBlobTransferService(p2pTransferServerPort, baseDir), new NettyFileTransferClient(p2pTransferClientPort, baseDir), - new DvcBlobFinder(ClientFactory.getTransportClient(clientConfig))); + new DaVinciBlobFinder(storeClient)); manager.start(); return manager; } catch (Exception e) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DvcBlobFinder.java b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DaVinciBlobFinder.java similarity index 59% rename from internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DvcBlobFinder.java rename to internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DaVinciBlobFinder.java index 809c14aa910..4360e6157c4 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DvcBlobFinder.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/DaVinciBlobFinder.java @@ -5,13 +5,18 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VERSION; import com.fasterxml.jackson.databind.ObjectMapper; -import com.linkedin.venice.client.store.transport.TransportClient; +import com.linkedin.venice.client.store.AvroGenericStoreClientImpl; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.utils.ObjectMapperFactory; +import com.linkedin.venice.utils.RetryUtils; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.http.NameValuePair; import org.apache.http.client.utils.URLEncodedUtils; import org.apache.http.message.BasicNameValuePair; @@ -22,32 +27,46 @@ /** * DvcBlobFinder discovers live DaVinci peer nodes to facilitate blob transfers necessary for bootstrapping the database */ -public class DvcBlobFinder implements BlobFinder { - private static final Logger LOGGER = LogManager.getLogger(DvcBlobFinder.class); +public class DaVinciBlobFinder implements BlobFinder { + private static final Logger LOGGER = LogManager.getLogger(DaVinciBlobFinder.class); private static final String TYPE_BLOB_DISCOVERY = "blob_discovery"; private static final String ERROR_DISCOVERY_MESSAGE = "Error finding DVC peers for blob transfer in store: %s, version: %d, partition: %d"; - private final TransportClient transportClient; + private final AvroGenericStoreClientImpl storeClient; - public DvcBlobFinder(TransportClient transportClient) { - this.transportClient = transportClient; + public DaVinciBlobFinder(AvroGenericStoreClientImpl storeClient) { + this.storeClient = storeClient; } @Override public BlobPeersDiscoveryResponse discoverBlobPeers(String storeName, int version, int partition) { - String uri = buildUriForBlobDiscovery(storeName, version, partition); + String requestPath = buildUriForBlobDiscovery(storeName, version, partition); + byte[] response = executeRequest(requestPath); - CompletableFuture futureResponse = transportClient.get(uri).thenApply(response -> { - byte[] responseBody = response.getBody(); - ObjectMapper mapper = ObjectMapperFactory.getInstance(); - try { - return mapper.readValue(responseBody, BlobPeersDiscoveryResponse.class); - } catch (IOException e) { - return handleError(ERROR_DISCOVERY_MESSAGE, storeName, version, partition, e); - } - }).exceptionally(throwable -> handleError(ERROR_DISCOVERY_MESSAGE, storeName, version, partition, throwable)); + ObjectMapper mapper = ObjectMapperFactory.getInstance(); + try { + return mapper.readValue(response, BlobPeersDiscoveryResponse.class); + } catch (IOException e) { + return handleError(ERROR_DISCOVERY_MESSAGE, storeName, version, partition, e); + } + } + + private byte[] executeRequest(String requestPath) { + byte[] response; + try { + response = RetryUtils.executeWithMaxAttempt( + () -> ((CompletableFuture) storeClient.getRaw(requestPath)).get(), + 3, + Duration.ofSeconds(5), + Collections.singletonList(ExecutionException.class)); + } catch (Exception e) { + throw new VeniceException("Failed to fetch schema from path " + requestPath, e); + } - return futureResponse.join(); + if (response == null) { + throw new VeniceException("Requested schema(s) doesn't exist for request path: " + requestPath); + } + return response; } private String buildUriForBlobDiscovery(String storeName, int version, int partition) { @@ -76,6 +95,6 @@ private BlobPeersDiscoveryResponse handleError( @Override public void close() throws IOException { - transportClient.close(); + storeClient.close(); } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/blobtransfer/DvcBlobFinderTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/blobtransfer/DaVinciBlobFinderTest.java similarity index 77% rename from internal/venice-common/src/test/java/com/linkedin/venice/blobtransfer/DvcBlobFinderTest.java rename to internal/venice-common/src/test/java/com/linkedin/venice/blobtransfer/DaVinciBlobFinderTest.java index 8873383a3c2..0309d2a3819 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/blobtransfer/DvcBlobFinderTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/blobtransfer/DaVinciBlobFinderTest.java @@ -10,7 +10,7 @@ import static org.testng.AssertJUnit.assertTrue; import com.fasterxml.jackson.databind.ObjectMapper; -import com.linkedin.venice.client.store.transport.TransportClient; +import com.linkedin.venice.client.store.AvroGenericStoreClientImpl; import com.linkedin.venice.client.store.transport.TransportClientResponse; import com.linkedin.venice.utils.ObjectMapperFactory; import java.io.IOException; @@ -21,17 +21,17 @@ import org.testng.annotations.Test; -public class DvcBlobFinderTest { - private TransportClient transportClient; - private DvcBlobFinder dvcBlobFinder; +public class DaVinciBlobFinderTest { + private AvroGenericStoreClientImpl storeClient; + private DaVinciBlobFinder daVinciBlobFinder; private static final String storeName = "testStore"; private static final int version = 1; private static final int partition = 1; @BeforeMethod public void setUp() { - transportClient = mock(TransportClient.class); - dvcBlobFinder = new DvcBlobFinder(transportClient); + storeClient = mock(AvroGenericStoreClientImpl.class); + daVinciBlobFinder = new DaVinciBlobFinder(storeClient); } @Test @@ -42,9 +42,9 @@ public void testDiscoverBlobPeers_Success() { TransportClientResponse mockResponse = new TransportClientResponse(0, null, responseBody); CompletableFuture futureResponse = CompletableFuture.completedFuture(mockResponse); - when(transportClient.get(anyString())).thenReturn(futureResponse); + when(storeClient.get(anyString())).thenReturn(futureResponse); - BlobPeersDiscoveryResponse response = dvcBlobFinder.discoverBlobPeers(storeName, version, partition); + BlobPeersDiscoveryResponse response = daVinciBlobFinder.discoverBlobPeers(storeName, version, partition); assertEquals(3, response.getDiscoveryResult().size()); } @@ -56,13 +56,13 @@ public void testDiscoverBlobPeers_CallsTransportClientWithCorrectURI() { TransportClientResponse mockResponse = new TransportClientResponse(0, null, responseBody); CompletableFuture futureResponse = CompletableFuture.completedFuture(mockResponse); - when(transportClient.get(anyString())).thenReturn(futureResponse); + when(storeClient.get(anyString())).thenReturn(futureResponse); - dvcBlobFinder.discoverBlobPeers(storeName, version, partition); + daVinciBlobFinder.discoverBlobPeers(storeName, version, partition); // Capture the argument passed to transportClient.get ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); - verify(transportClient).get(argumentCaptor.capture()); + verify(storeClient).get(argumentCaptor.capture()); String expectedUri = String .format("blob_discovery?store_name=%s&store_version=%d&store_partition=%d", storeName, version, partition); @@ -76,14 +76,14 @@ public void testDiscoverBlobPeers_IOException() throws Exception { TransportClientResponse mockResponse = new TransportClientResponse(0, null, responseBody); CompletableFuture futureResponse = CompletableFuture.completedFuture(mockResponse); - when(transportClient.get(anyString())).thenReturn(futureResponse); + when(storeClient.get(anyString())).thenReturn(futureResponse); ObjectMapper mapper = ObjectMapperFactory.getInstance(); ObjectMapper mockMapper = spy(mapper); doThrow(new IOException("Test Exception")).when(mockMapper) .readValue(responseBody, BlobPeersDiscoveryResponse.class); - BlobPeersDiscoveryResponse response = dvcBlobFinder.discoverBlobPeers(storeName, version, partition); + BlobPeersDiscoveryResponse response = daVinciBlobFinder.discoverBlobPeers(storeName, version, partition); assertEquals(0, response.getDiscoveryResult().size()); assertEquals(response.getErrorMessage(), "some error"); assertTrue(response.isError()); @@ -93,9 +93,9 @@ public void testDiscoverBlobPeers_IOException() throws Exception { public void testDiscoverBlobPeers_Exceptionally() { CompletableFuture futureResponse = new CompletableFuture<>(); futureResponse.completeExceptionally(new RuntimeException("Test Exception")); - when(transportClient.get(anyString())).thenReturn(futureResponse); + when(storeClient.get(anyString())).thenReturn(futureResponse); - BlobPeersDiscoveryResponse response = dvcBlobFinder.discoverBlobPeers(storeName, version, partition); + BlobPeersDiscoveryResponse response = daVinciBlobFinder.discoverBlobPeers(storeName, version, partition); assertTrue(response.isError()); assertEquals( diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 5f23c29f874..9f4f4c83ad4 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -1002,7 +1002,7 @@ public void testReadCompressedData(CompressionStrategy compressionStrategy) thro * For the local P2P testing, need to setup two different directories and ports for the two Da Vinci clients in order * to avoid conflicts. */ - @Test(timeOut = 2 * TEST_TIMEOUT, enabled = false) + @Test(timeOut = 2 * TEST_TIMEOUT, enabled = true) public void testBlobP2PTransferAmongDVC() throws Exception { Consumer paramsConsumer = params -> params.setBlobTransferEnabled(true); String storeName = Utils.getUniqueString("test-store"); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestBlobDiscovery.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestBlobDiscovery.java index 5ce84061b9d..1750091e183 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestBlobDiscovery.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/router/TestBlobDiscovery.java @@ -16,7 +16,8 @@ import com.linkedin.venice.D2.D2ClientUtils; import com.linkedin.venice.blobtransfer.BlobFinder; import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse; -import com.linkedin.venice.blobtransfer.DvcBlobFinder; +import com.linkedin.venice.blobtransfer.DaVinciBlobFinder; +import com.linkedin.venice.client.store.AvroGenericStoreClientImpl; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.common.VeniceSystemStoreType; @@ -181,7 +182,8 @@ public void testBlobDiscovery() throws Exception { .setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME) .setMetricsRepository(new MetricsRepository()); - BlobFinder dvcFinder = new DvcBlobFinder(ClientFactory.getTransportClient(clientConfig)); + BlobFinder dvcFinder = + new DaVinciBlobFinder((AvroGenericStoreClientImpl) ClientFactory.getAndStartGenericAvroClient(clientConfig)); TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, true, () -> { BlobPeersDiscoveryResponse response = dvcFinder.discoverBlobPeers(storeName, 1, 1); diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java b/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java index 447484cb87a..0f534ff8f66 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/MetaDataHandler.java @@ -56,6 +56,7 @@ import com.linkedin.venice.meta.StoreConfig; import com.linkedin.venice.meta.SystemStore; import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.pushmonitor.HybridStoreQuotaStatus; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; import com.linkedin.venice.router.api.RouterResourceType; @@ -552,11 +553,12 @@ private void handleBlobDiscovery(ChannelHandlerContext ctx, VenicePathParserHelp Integer.parseInt(storeVersion), Integer.parseInt(storePartition), Optional.empty()); + List liveNodeHostNames = instances.entrySet() .stream() + .filter(entry -> entry.getValue() == ExecutionStatus.COMPLETED.getValue()) .map(Map.Entry::getKey) .map(CharSequence::toString) - .filter(instanceHostName -> pushStatusStoreReader.isInstanceAlive(storeName, instanceHostName)) .collect(Collectors.toList()); response.setDiscoveryResult(liveNodeHostNames); } catch (VeniceException e) { diff --git a/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java b/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java index 6e7e79142ee..a5e8d8ef31a 100644 --- a/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java +++ b/services/venice-router/src/main/java/com/linkedin/venice/router/RouterServer.java @@ -20,7 +20,6 @@ import com.linkedin.venice.acl.handler.StoreAclHandler; import com.linkedin.venice.authorization.IdentityParser; import com.linkedin.venice.compression.CompressorFactory; -import com.linkedin.venice.d2.D2ClientFactory; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixAdapterSerializer; import com.linkedin.venice.helix.HelixBaseRoutingRepository; @@ -261,7 +260,8 @@ public RouterServer( serviceDiscoveryAnnouncers, accessController, sslFactory, - TehutiUtils.getMetricsRepository(ROUTER_SERVICE_NAME)); + TehutiUtils.getMetricsRepository(ROUTER_SERVICE_NAME), + null); } // for test purpose @@ -274,7 +274,8 @@ public RouterServer( List serviceDiscoveryAnnouncers, Optional accessController, Optional sslFactory, - MetricsRepository metricsRepository) { + MetricsRepository metricsRepository, + D2Client d2Client) { this(properties, serviceDiscoveryAnnouncers, accessController, sslFactory, metricsRepository, true); HelixReadOnlyZKSharedSystemStoreRepository readOnlyZKSharedSystemStoreRepository = @@ -325,8 +326,7 @@ public RouterServer( config.getRefreshIntervalForZkReconnectInMs()); this.liveInstanceMonitor = new HelixLiveInstanceMonitor(this.zkClient, config.getClusterName()); - D2Client d2Client = D2ClientFactory.getD2Client(config.getZkConnection(), Optional.empty()); - String d2ServiceName = config.getClusterToD2Map().get(config.getClusterName()); + String d2ServiceName = "venice-discovery"; this.pushStatusStoreReader = new PushStatusStoreReader( d2Client, d2ServiceName,