Skip to content

Commit

Permalink
Changes made to fix router related issues for blob transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
sebas-inf committed Aug 2, 2024
1 parent 98cdf8e commit 7a8df57
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.venice.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.venice.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.venice.client.store.AvroGenericStoreClientImpl;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -32,10 +33,12 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerAndStart(
String baseDir,
ClientConfig clientConfig) {
try {
AvroGenericStoreClientImpl storeClient =
(AvroGenericStoreClientImpl) ClientFactory.getAndStartGenericAvroClient(clientConfig);
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir),
new NettyFileTransferClient(p2pTransferClientPort, baseDir),
new DvcBlobFinder(ClientFactory.getTransportClient(clientConfig)));
new DaVinciBlobFinder(storeClient));
manager.start();
return manager;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_VERSION;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.AvroGenericStoreClientImpl;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.RetryUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.message.BasicNameValuePair;
Expand All @@ -22,32 +27,46 @@
/**
* DvcBlobFinder discovers live DaVinci peer nodes to facilitate blob transfers necessary for bootstrapping the database
*/
public class DvcBlobFinder implements BlobFinder {
private static final Logger LOGGER = LogManager.getLogger(DvcBlobFinder.class);
public class DaVinciBlobFinder implements BlobFinder {
private static final Logger LOGGER = LogManager.getLogger(DaVinciBlobFinder.class);
private static final String TYPE_BLOB_DISCOVERY = "blob_discovery";
private static final String ERROR_DISCOVERY_MESSAGE =
"Error finding DVC peers for blob transfer in store: %s, version: %d, partition: %d";
private final TransportClient transportClient;
private final AvroGenericStoreClientImpl storeClient;

public DvcBlobFinder(TransportClient transportClient) {
this.transportClient = transportClient;
public DaVinciBlobFinder(AvroGenericStoreClientImpl storeClient) {
this.storeClient = storeClient;
}

@Override
public BlobPeersDiscoveryResponse discoverBlobPeers(String storeName, int version, int partition) {
String uri = buildUriForBlobDiscovery(storeName, version, partition);
String requestPath = buildUriForBlobDiscovery(storeName, version, partition);
byte[] response = executeRequest(requestPath);

CompletableFuture<BlobPeersDiscoveryResponse> futureResponse = transportClient.get(uri).thenApply(response -> {
byte[] responseBody = response.getBody();
ObjectMapper mapper = ObjectMapperFactory.getInstance();
try {
return mapper.readValue(responseBody, BlobPeersDiscoveryResponse.class);
} catch (IOException e) {
return handleError(ERROR_DISCOVERY_MESSAGE, storeName, version, partition, e);
}
}).exceptionally(throwable -> handleError(ERROR_DISCOVERY_MESSAGE, storeName, version, partition, throwable));
ObjectMapper mapper = ObjectMapperFactory.getInstance();
try {
return mapper.readValue(response, BlobPeersDiscoveryResponse.class);
} catch (IOException e) {
return handleError(ERROR_DISCOVERY_MESSAGE, storeName, version, partition, e);
}
}

private byte[] executeRequest(String requestPath) {
byte[] response;
try {
response = RetryUtils.executeWithMaxAttempt(
() -> ((CompletableFuture<byte[]>) storeClient.getRaw(requestPath)).get(),
3,
Duration.ofSeconds(5),
Collections.singletonList(ExecutionException.class));
} catch (Exception e) {
throw new VeniceException("Failed to fetch schema from path " + requestPath, e);
}

return futureResponse.join();
if (response == null) {
throw new VeniceException("Requested schema(s) doesn't exist for request path: " + requestPath);
}
return response;
}

private String buildUriForBlobDiscovery(String storeName, int version, int partition) {
Expand Down Expand Up @@ -76,6 +95,6 @@ private BlobPeersDiscoveryResponse handleError(

@Override
public void close() throws IOException {
transportClient.close();
storeClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import static org.testng.AssertJUnit.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.AvroGenericStoreClientImpl;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.utils.ObjectMapperFactory;
import java.io.IOException;
Expand All @@ -21,17 +21,17 @@
import org.testng.annotations.Test;


public class DvcBlobFinderTest {
private TransportClient transportClient;
private DvcBlobFinder dvcBlobFinder;
public class DaVinciBlobFinderTest {
private AvroGenericStoreClientImpl storeClient;
private DaVinciBlobFinder daVinciBlobFinder;
private static final String storeName = "testStore";
private static final int version = 1;
private static final int partition = 1;

@BeforeMethod
public void setUp() {
transportClient = mock(TransportClient.class);
dvcBlobFinder = new DvcBlobFinder(transportClient);
storeClient = mock(AvroGenericStoreClientImpl.class);
daVinciBlobFinder = new DaVinciBlobFinder(storeClient);
}

@Test
Expand All @@ -42,9 +42,9 @@ public void testDiscoverBlobPeers_Success() {
TransportClientResponse mockResponse = new TransportClientResponse(0, null, responseBody);

CompletableFuture<TransportClientResponse> futureResponse = CompletableFuture.completedFuture(mockResponse);
when(transportClient.get(anyString())).thenReturn(futureResponse);
when(storeClient.get(anyString())).thenReturn(futureResponse);

BlobPeersDiscoveryResponse response = dvcBlobFinder.discoverBlobPeers(storeName, version, partition);
BlobPeersDiscoveryResponse response = daVinciBlobFinder.discoverBlobPeers(storeName, version, partition);
assertEquals(3, response.getDiscoveryResult().size());
}

Expand All @@ -56,13 +56,13 @@ public void testDiscoverBlobPeers_CallsTransportClientWithCorrectURI() {
TransportClientResponse mockResponse = new TransportClientResponse(0, null, responseBody);

CompletableFuture<TransportClientResponse> futureResponse = CompletableFuture.completedFuture(mockResponse);
when(transportClient.get(anyString())).thenReturn(futureResponse);
when(storeClient.get(anyString())).thenReturn(futureResponse);

dvcBlobFinder.discoverBlobPeers(storeName, version, partition);
daVinciBlobFinder.discoverBlobPeers(storeName, version, partition);

// Capture the argument passed to transportClient.get
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
verify(transportClient).get(argumentCaptor.capture());
verify(storeClient).get(argumentCaptor.capture());

String expectedUri = String
.format("blob_discovery?store_name=%s&store_version=%d&store_partition=%d", storeName, version, partition);
Expand All @@ -76,14 +76,14 @@ public void testDiscoverBlobPeers_IOException() throws Exception {
TransportClientResponse mockResponse = new TransportClientResponse(0, null, responseBody);

CompletableFuture<TransportClientResponse> futureResponse = CompletableFuture.completedFuture(mockResponse);
when(transportClient.get(anyString())).thenReturn(futureResponse);
when(storeClient.get(anyString())).thenReturn(futureResponse);

ObjectMapper mapper = ObjectMapperFactory.getInstance();
ObjectMapper mockMapper = spy(mapper);
doThrow(new IOException("Test Exception")).when(mockMapper)
.readValue(responseBody, BlobPeersDiscoveryResponse.class);

BlobPeersDiscoveryResponse response = dvcBlobFinder.discoverBlobPeers(storeName, version, partition);
BlobPeersDiscoveryResponse response = daVinciBlobFinder.discoverBlobPeers(storeName, version, partition);
assertEquals(0, response.getDiscoveryResult().size());
assertEquals(response.getErrorMessage(), "some error");
assertTrue(response.isError());
Expand All @@ -93,9 +93,9 @@ public void testDiscoverBlobPeers_IOException() throws Exception {
public void testDiscoverBlobPeers_Exceptionally() {
CompletableFuture<TransportClientResponse> futureResponse = new CompletableFuture<>();
futureResponse.completeExceptionally(new RuntimeException("Test Exception"));
when(transportClient.get(anyString())).thenReturn(futureResponse);
when(storeClient.get(anyString())).thenReturn(futureResponse);

BlobPeersDiscoveryResponse response = dvcBlobFinder.discoverBlobPeers(storeName, version, partition);
BlobPeersDiscoveryResponse response = daVinciBlobFinder.discoverBlobPeers(storeName, version, partition);

assertTrue(response.isError());
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,7 @@ public void testReadCompressedData(CompressionStrategy compressionStrategy) thro
* For the local P2P testing, need to setup two different directories and ports for the two Da Vinci clients in order
* to avoid conflicts.
*/
@Test(timeOut = 2 * TEST_TIMEOUT, enabled = false)
@Test(timeOut = 2 * TEST_TIMEOUT, enabled = true)
public void testBlobP2PTransferAmongDVC() throws Exception {
Consumer<UpdateStoreQueryParams> paramsConsumer = params -> params.setBlobTransferEnabled(true);
String storeName = Utils.getUniqueString("test-store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.blobtransfer.BlobFinder;
import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse;
import com.linkedin.venice.blobtransfer.DvcBlobFinder;
import com.linkedin.venice.blobtransfer.DaVinciBlobFinder;
import com.linkedin.venice.client.store.AvroGenericStoreClientImpl;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreType;
Expand Down Expand Up @@ -181,7 +182,8 @@ public void testBlobDiscovery() throws Exception {
.setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME)
.setMetricsRepository(new MetricsRepository());

BlobFinder dvcFinder = new DvcBlobFinder(ClientFactory.getTransportClient(clientConfig));
BlobFinder dvcFinder =
new DaVinciBlobFinder((AvroGenericStoreClientImpl) ClientFactory.getAndStartGenericAvroClient(clientConfig));

TestUtils.waitForNonDeterministicAssertion(1, TimeUnit.MINUTES, true, () -> {
BlobPeersDiscoveryResponse response = dvcFinder.discoverBlobPeers(storeName, 1, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.SystemStore;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushmonitor.HybridStoreQuotaStatus;
import com.linkedin.venice.pushstatushelper.PushStatusStoreReader;
import com.linkedin.venice.router.api.RouterResourceType;
Expand Down Expand Up @@ -552,11 +553,12 @@ private void handleBlobDiscovery(ChannelHandlerContext ctx, VenicePathParserHelp
Integer.parseInt(storeVersion),
Integer.parseInt(storePartition),
Optional.empty());

List<String> liveNodeHostNames = instances.entrySet()
.stream()
.filter(entry -> entry.getValue() == ExecutionStatus.COMPLETED.getValue())
.map(Map.Entry::getKey)
.map(CharSequence::toString)
.filter(instanceHostName -> pushStatusStoreReader.isInstanceAlive(storeName, instanceHostName))
.collect(Collectors.toList());
response.setDiscoveryResult(liveNodeHostNames);
} catch (VeniceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.linkedin.venice.acl.handler.StoreAclHandler;
import com.linkedin.venice.authorization.IdentityParser;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.d2.D2ClientFactory;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.helix.HelixBaseRoutingRepository;
Expand Down Expand Up @@ -261,7 +260,8 @@ public RouterServer(
serviceDiscoveryAnnouncers,
accessController,
sslFactory,
TehutiUtils.getMetricsRepository(ROUTER_SERVICE_NAME));
TehutiUtils.getMetricsRepository(ROUTER_SERVICE_NAME),
null);
}

// for test purpose
Expand All @@ -274,7 +274,8 @@ public RouterServer(
List<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncers,
Optional<DynamicAccessController> accessController,
Optional<SSLFactory> sslFactory,
MetricsRepository metricsRepository) {
MetricsRepository metricsRepository,
D2Client d2Client) {
this(properties, serviceDiscoveryAnnouncers, accessController, sslFactory, metricsRepository, true);

HelixReadOnlyZKSharedSystemStoreRepository readOnlyZKSharedSystemStoreRepository =
Expand Down Expand Up @@ -325,8 +326,7 @@ public RouterServer(
config.getRefreshIntervalForZkReconnectInMs());
this.liveInstanceMonitor = new HelixLiveInstanceMonitor(this.zkClient, config.getClusterName());

D2Client d2Client = D2ClientFactory.getD2Client(config.getZkConnection(), Optional.empty());
String d2ServiceName = config.getClusterToD2Map().get(config.getClusterName());
String d2ServiceName = "venice-discovery";
this.pushStatusStoreReader = new PushStatusStoreReader(
d2Client,
d2ServiceName,
Expand Down

0 comments on commit 7a8df57

Please sign in to comment.