Skip to content

Commit

Permalink
[controller] Add retry thread for serviceDiscoveryAnnouncers that fai…
Browse files Browse the repository at this point in the history
…l to register (#1088)

* Implemented retry thread for serviceDiscoveryAnnouncers that failed to register

* Added test cases for ServiceDiscoveryAnnouncerRetryTask

* Created helper class for register and unregister functions and revised test cases

* Made retryRegisterServiceDiscoveryAnnouncerMS into a config key

* Updated retryRegisterServiceDiscoveryAnnouncerMS from Long to long

* Renamed retryRegisterServiceDiscoveryAnnouncerMS to serviceDiscoveryRegistrationRetryMS

* Type checking for long -> 30L * Time.MS_PER_SECOND

* Created AsyncRetryingServiceDiscoveryAnnouncer class that implements ServiceDiscoveryAnnouncer to refactor code

* Test: Added a check to register() to ensure that the call doesn't throw an exception

* Only start and interrupt thread when queue is not empty + removed extraneous logging

* Removed unused LOGGER in TestServiceDiscoveryAnnouncerRetryTask

* Modified ServiceDiscoveryAnnouncerRetryTask with Nisarg's help

* Implemented exiting retry thread if there are no more failed announcers to register + added testing
  • Loading branch information
bonytoni authored Aug 2, 2024
1 parent 1cc259a commit f43ec45
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ private ConfigKeys() {
*/
public static final String DEPRECATED_TOPIC_RETENTION_MS = "deprecated.topic.retention.ms";

/**
* Time interval to retry registering service discovery announcers that failed to register. By default, this is set to 30000L ms.
*/
public static final String SERVICE_DISCOVERY_REGISTRATION_RETRY_MS = "service.discovery.registration.retry.ms";

public static final String FATAL_DATA_VALIDATION_FAILURE_TOPIC_RETENTION_MS =
"fatal.data.validation.failure.topic.retention.ms";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.linkedin.venice.servicediscovery;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* This class is responsible for announcing and de-announcing a {@link ServiceDiscoveryAnnouncer} to a Service Discovery system
* and taking care of failed registrations by retrying them asynchronously.
*/
public class AsyncRetryingServiceDiscoveryAnnouncer implements ServiceDiscoveryAnnouncer {
private static final Logger LOGGER = LogManager.getLogger(AsyncRetryingServiceDiscoveryAnnouncer.class);
private final List<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncers;
private final Thread serviceDiscoveryAnnouncerRetryThread;
private final BlockingQueue<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncerRetryQueue =
new LinkedBlockingQueue<>();

public AsyncRetryingServiceDiscoveryAnnouncer(
List<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncers,
long serviceDiscoveryRegistrationRetryMS) {
this.serviceDiscoveryAnnouncers = serviceDiscoveryAnnouncers;
ServiceDiscoveryAnnouncerRetryTask serviceDiscoveryAnnouncerRetryTask = new ServiceDiscoveryAnnouncerRetryTask(
serviceDiscoveryAnnouncerRetryQueue,
serviceDiscoveryRegistrationRetryMS);
this.serviceDiscoveryAnnouncerRetryThread = new Thread(serviceDiscoveryAnnouncerRetryTask);
}

/**
* Registers each {@link ServiceDiscoveryAnnouncer} in {@code serviceDiscoveryAnnouncers}.
* If a service discovery announcer fails to register, it is added to the retry queue and registration will be retried asynchronously
* in a separate thread.
*/
@Override
public void register() {
serviceDiscoveryAnnouncers.forEach(serviceDiscoveryAnnouncer -> {
try {
serviceDiscoveryAnnouncer.register();
LOGGER.info("Registered to service discovery: {}", serviceDiscoveryAnnouncer);
} catch (Exception e) {
LOGGER.error("Failed to register to service discovery: {}", serviceDiscoveryAnnouncer, e);
serviceDiscoveryAnnouncerRetryQueue.add(serviceDiscoveryAnnouncer);
}
});
if (!serviceDiscoveryAnnouncerRetryQueue.isEmpty()) {
LOGGER.info("Starting service discovery announcer retry thread");
serviceDiscoveryAnnouncerRetryThread.start();
}
}

/**
* Unregisters each {@link ServiceDiscoveryAnnouncer} in {@code serviceDiscoveryAnnouncers}.
* One reason that a service discovery announcer may fail to unregister is if it was never successfully registered.
*/
@Override
public void unregister() {
if (serviceDiscoveryAnnouncerRetryThread.isAlive()) {
LOGGER.info("Stopping service discovery announcer retry thread");
serviceDiscoveryAnnouncerRetryThread.interrupt();
}
serviceDiscoveryAnnouncers.forEach(serviceDiscoveryAnnouncer -> {
try {
serviceDiscoveryAnnouncer.unregister();
LOGGER.info("Unregistered from service discovery: {}", serviceDiscoveryAnnouncer);
} catch (Exception e) {
LOGGER.error("Failed to unregister from service discovery: {}", serviceDiscoveryAnnouncer, e);
}
});
}

Thread getServiceDiscoveryAnnouncerRetryThread() {
return serviceDiscoveryAnnouncerRetryThread;
}

BlockingQueue<ServiceDiscoveryAnnouncer> getServiceDiscoveryAnnouncerRetryQueue() {
return serviceDiscoveryAnnouncerRetryQueue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.venice.servicediscovery;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
* This class is responsible for retrying the registration of a {@link ServiceDiscoveryAnnouncer} in the case of registration failure.
* It is a runnable task that is scheduled to register each service discovery announcer every
* {@link ServiceDiscoveryAnnouncerRetryTask#serviceDiscoveryRegistrationRetryMS} milliseconds.
*/
public class ServiceDiscoveryAnnouncerRetryTask implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(ServiceDiscoveryAnnouncerRetryTask.class);
private final BlockingQueue<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncerRetryQueue;
private final long serviceDiscoveryRegistrationRetryMS;

public ServiceDiscoveryAnnouncerRetryTask(
BlockingQueue<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncerRetryQueue,
long serviceDiscoveryRegistrationRetryMS) {
this.serviceDiscoveryAnnouncerRetryQueue = serviceDiscoveryAnnouncerRetryQueue;
this.serviceDiscoveryRegistrationRetryMS = serviceDiscoveryRegistrationRetryMS;
}

/**
* If the retry queue is empty, the task will wait until there is an available element in the queue.
* Each service discovery announcer will retry registration every {@link ServiceDiscoveryAnnouncerRetryTask#serviceDiscoveryRegistrationRetryMS} milliseconds.
*/
@Override
public void run() {
while (!serviceDiscoveryAnnouncerRetryQueue.isEmpty()) {
try {
ServiceDiscoveryAnnouncer availableAnnouncer = serviceDiscoveryAnnouncerRetryQueue.take();
List<ServiceDiscoveryAnnouncer> announcerList = new ArrayList<>();
announcerList.add(availableAnnouncer);
serviceDiscoveryAnnouncerRetryQueue.drainTo(announcerList);
for (ServiceDiscoveryAnnouncer announcer: announcerList) {
try {
announcer.register();
LOGGER.info("Registered to service discovery: {}", announcer);
} catch (Exception e) {
LOGGER.error("Failed to register to service discovery: {}", announcer, e);
serviceDiscoveryAnnouncerRetryQueue.put(announcer);
}
}
Thread.sleep(serviceDiscoveryRegistrationRetryMS);
} catch (InterruptedException e) {
LOGGER.error("ServiceDiscoveryAnnouncerRetryTask interrupted", e);
Thread.currentThread().interrupt();
break;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package com.linkedin.venice.servicediscovery;

import static com.linkedin.venice.utils.Utils.sleep;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;

import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig;
import com.linkedin.venice.utils.Time;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.testng.Assert;
import org.testng.annotations.Test;


public class TestServiceDiscoveryAnnouncerRetryTask {
private ServiceDiscoveryAnnouncer announcer1 = mock(ServiceDiscoveryAnnouncer.class);
private ServiceDiscoveryAnnouncer announcer2 = mock(ServiceDiscoveryAnnouncer.class);
private ServiceDiscoveryAnnouncer announcer3 = mock(ServiceDiscoveryAnnouncer.class);
private VeniceControllerMultiClusterConfig config = mock(VeniceControllerMultiClusterConfig.class);

@Test
public void testThreadMethodsDoNotRunIfRetryQueueEmpty() {
reset(announcer1, announcer2, announcer3, config);
doNothing().when(announcer1).register();
doNothing().when(announcer2).register();
doNothing().when(announcer3).register();
doNothing().when(announcer1).unregister();
doNothing().when(announcer2).unregister();
doNothing().when(announcer3).unregister();
doReturn(30L * Time.MS_PER_SECOND).when(config).getServiceDiscoveryRegistrationRetryMS();
long serviceDiscoveryRegistrationRetryMS = config.getServiceDiscoveryRegistrationRetryMS();
List<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncers = Arrays.asList(announcer1, announcer2, announcer3);
AsyncRetryingServiceDiscoveryAnnouncer asyncRetryingServiceDiscoveryAnnouncer =
new AsyncRetryingServiceDiscoveryAnnouncer(serviceDiscoveryAnnouncers, serviceDiscoveryRegistrationRetryMS);
Thread serviceDiscoveryAnnouncerRetryThread =
asyncRetryingServiceDiscoveryAnnouncer.getServiceDiscoveryAnnouncerRetryThread();
try {
asyncRetryingServiceDiscoveryAnnouncer.register();
} catch (Exception e) {
Assert.fail("The method call should not throw an exception.");
}
Assert.assertFalse(serviceDiscoveryAnnouncerRetryThread.isAlive());
try {
asyncRetryingServiceDiscoveryAnnouncer.unregister();
} catch (Exception e) {
Assert.fail("The method call should not throw an exception.");
}
Assert.assertFalse(serviceDiscoveryAnnouncerRetryThread.isAlive());
Assert.assertFalse(serviceDiscoveryAnnouncerRetryThread.isInterrupted());
}

/**
* Below is the expected workflow of the test: <br>
* 1) {@link AsyncRetryingServiceDiscoveryAnnouncer#register()} is called. The result of the call is that announcer3
* successfully registers, and announcer1 and announcer2 fail to register, so {@code retryQueue=[announcer1, announcer2]} <br>
* 2) Retry thread starts and retries registering the announcers in the queue <br>
* 3) announcer1 retries registering and fails, and announcer2 retries registering and succeeds, so {@code retryQueue=[announcer1]} <br>
* 4) The thread sleeps for 30 seconds <br>
* 5) announcer1 retries registering and succeeds, so {@code retryQueue=[]} <br>
* 6) The thread sleeps for 30 seconds and queue is empty, so the thread exits
*/
@Test
public void testRegisterServiceDiscoveryAnnouncers() {
reset(announcer1, announcer2, announcer3, config);
doThrow(new RuntimeException()).doThrow(new RuntimeException()).doNothing().when(announcer1).register();
doThrow(new RuntimeException()).doNothing().when(announcer2).register();
doNothing().when(announcer3).register();
doReturn(30L * Time.MS_PER_SECOND).when(config).getServiceDiscoveryRegistrationRetryMS();
long serviceDiscoveryRegistrationRetryMS = config.getServiceDiscoveryRegistrationRetryMS();
List<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncers = Arrays.asList(announcer1, announcer2, announcer3);
AsyncRetryingServiceDiscoveryAnnouncer asyncRetryingServiceDiscoveryAnnouncer =
new AsyncRetryingServiceDiscoveryAnnouncer(serviceDiscoveryAnnouncers, serviceDiscoveryRegistrationRetryMS);
BlockingQueue<ServiceDiscoveryAnnouncer> retryQueue =
asyncRetryingServiceDiscoveryAnnouncer.getServiceDiscoveryAnnouncerRetryQueue();
try {
asyncRetryingServiceDiscoveryAnnouncer.register();
} catch (Exception e) {
Assert.fail("The method call should not throw an exception.");
}

sleep(1000);
Assert.assertTrue(retryQueue.contains(announcer1));
Assert.assertFalse(retryQueue.contains(announcer2));
Assert.assertFalse(retryQueue.contains(announcer3));
Assert.assertEquals(retryQueue.peek(), announcer1);
Assert.assertEquals(retryQueue.size(), 1);

sleep(serviceDiscoveryRegistrationRetryMS + 1000);
Assert.assertFalse(retryQueue.contains(announcer1));
Assert.assertFalse(retryQueue.contains(announcer2));
Assert.assertFalse(retryQueue.contains(announcer3));
Assert.assertEquals(retryQueue.size(), 0);

sleep(serviceDiscoveryRegistrationRetryMS + 1000);
Assert.assertFalse(asyncRetryingServiceDiscoveryAnnouncer.getServiceDiscoveryAnnouncerRetryThread().isAlive());
}

/**
* Below is the expected workflow of the test: <br>
* 1) {@link AsyncRetryingServiceDiscoveryAnnouncer#register()} is called. The result of the call is that all announcers
* fail to register, so {@code retryQueue=[announcer1, announcer2, announcer3]} <br>
* 2) Retry thread starts and retries registering the announcers in the queue <br>
* 3) All announcers fail to register again, so {@code retryQueue=[announcer1, announcer2, announcer3]} <br>
* 4) The thread sleeps for 30 seconds <br>
* 5) announcer1 and announcer2 retry registering and fail, and announcer3 retries registering and succeeds, so
* {@code retryQueue=[announcer1, announcer2]} <br>
* 6) The thread sleeps for 30 seconds <br>
* 7) announcer1 retries registering and succeeds, and announcer2 retries registering and fails, so {@code retryQueue=[announcer2]} <br>
* 8) The thread sleeps for 30 seconds <br>
* 9) announcer2 retries registering and succeeds, so {@code retryQueue=[]} <br>
* 10) The thread sleeps for 30 seconds and queue is empty, so the thread exits
*/
@Test
public void testAddToRetryQueueMultipleTimes() {
reset(announcer1, announcer2, announcer3, config);
doThrow(new RuntimeException()).doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doNothing()
.when(announcer1)
.register();
doThrow(new RuntimeException()).doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doThrow(new RuntimeException())
.doNothing()
.when(announcer2)
.register();
doThrow(new RuntimeException()).doThrow(new RuntimeException()).doNothing().when(announcer3).register();
doReturn(30L * Time.MS_PER_SECOND).when(config).getServiceDiscoveryRegistrationRetryMS();
long serviceDiscoveryRegistrationRetryMS = config.getServiceDiscoveryRegistrationRetryMS();
List<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncers = Arrays.asList(announcer1, announcer2, announcer3);
AsyncRetryingServiceDiscoveryAnnouncer asyncRetryingServiceDiscoveryAnnouncer =
new AsyncRetryingServiceDiscoveryAnnouncer(serviceDiscoveryAnnouncers, serviceDiscoveryRegistrationRetryMS);
BlockingQueue<ServiceDiscoveryAnnouncer> retryQueue =
asyncRetryingServiceDiscoveryAnnouncer.getServiceDiscoveryAnnouncerRetryQueue();
try {
asyncRetryingServiceDiscoveryAnnouncer.register();
} catch (Exception e) {
Assert.fail("The method call should not throw an exception.");
}

sleep(1000);
Assert.assertTrue(retryQueue.contains(announcer1));
Assert.assertTrue(retryQueue.contains(announcer2));
Assert.assertTrue(retryQueue.contains(announcer3));
Assert.assertEquals(retryQueue.peek(), announcer1);
Assert.assertEquals(retryQueue.size(), 3);

sleep(serviceDiscoveryRegistrationRetryMS + 1000);
Assert.assertTrue(retryQueue.contains(announcer1));
Assert.assertTrue(retryQueue.contains(announcer2));
Assert.assertFalse(retryQueue.contains(announcer3));
Assert.assertEquals(retryQueue.peek(), announcer1);
Assert.assertEquals(retryQueue.size(), 2);

sleep(serviceDiscoveryRegistrationRetryMS + 1000);
Assert.assertFalse(retryQueue.contains(announcer1));
Assert.assertTrue(retryQueue.contains(announcer2));
Assert.assertFalse(retryQueue.contains(announcer3));
Assert.assertEquals(retryQueue.peek(), announcer2);
Assert.assertEquals(retryQueue.size(), 1);

sleep(serviceDiscoveryRegistrationRetryMS + 1000);
Assert.assertFalse(retryQueue.contains(announcer1));
Assert.assertFalse(retryQueue.contains(announcer2));
Assert.assertFalse(retryQueue.contains(announcer3));
Assert.assertEquals(retryQueue.size(), 0);

sleep(serviceDiscoveryRegistrationRetryMS + 1000);
Assert.assertFalse(asyncRetryingServiceDiscoveryAnnouncer.getServiceDiscoveryAnnouncerRetryThread().isAlive());
}

@Test
public void testUnregisterServiceDiscoveryAnnouncers() {
reset(announcer1, announcer2, announcer3, config);
doThrow(new RuntimeException()).when(announcer1).unregister();
doNothing().when(announcer2).unregister();
doNothing().when(announcer3).unregister();
doReturn(30L * Time.MS_PER_SECOND).when(config).getServiceDiscoveryRegistrationRetryMS();
long serviceDiscoveryRegistrationRetryMS = config.getServiceDiscoveryRegistrationRetryMS();
List<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncers = Arrays.asList(announcer1, announcer2, announcer3);
AsyncRetryingServiceDiscoveryAnnouncer asyncRetryingServiceDiscoveryAnnouncer =
new AsyncRetryingServiceDiscoveryAnnouncer(serviceDiscoveryAnnouncers, serviceDiscoveryRegistrationRetryMS);
try {
asyncRetryingServiceDiscoveryAnnouncer.unregister();
} catch (Exception e) {
Assert.fail("The method call should not throw an exception.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.servicediscovery.AsyncRetryingServiceDiscoveryAnnouncer;
import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer;
import com.linkedin.venice.system.store.ControllerClientBackedSystemSchemaInitializer;
import com.linkedin.venice.utils.PropertyBuilder;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class VeniceController {
private final VeniceControllerMultiClusterConfig multiClusterConfigs;
private final MetricsRepository metricsRepository;
private final List<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncers;
private final AsyncRetryingServiceDiscoveryAnnouncer asyncRetryingServiceDiscoveryAnnouncer;
private final Optional<DynamicAccessController> accessController;
private final Optional<AuthorizerService> authorizerService;
private final D2Client d2Client;
Expand Down Expand Up @@ -127,6 +129,9 @@ public VeniceController(VeniceControllerContext ctx) {
this.icProvider = Optional.ofNullable(ctx.getIcProvider());
this.externalSupersetSchemaGenerator = Optional.ofNullable(ctx.getExternalSupersetSchemaGenerator());
this.pubSubClientsFactory = multiClusterConfigs.getPubSubClientsFactory();
long serviceDiscoveryRegistrationRetryMS = multiClusterConfigs.getServiceDiscoveryRegistrationRetryMS();
this.asyncRetryingServiceDiscoveryAnnouncer =
new AsyncRetryingServiceDiscoveryAnnouncer(serviceDiscoveryAnnouncers, serviceDiscoveryRegistrationRetryMS);
createServices();
}

Expand Down Expand Up @@ -249,10 +254,7 @@ public void start() {
systemStoreRepairService.ifPresent(AbstractVeniceService::start);
disabledPartitionEnablerService.ifPresent(AbstractVeniceService::start);
// register with service discovery at the end
serviceDiscoveryAnnouncers.forEach(serviceDiscoveryAnnouncer -> {
serviceDiscoveryAnnouncer.register();
LOGGER.info("Registered to service discovery: {}", serviceDiscoveryAnnouncer);
});
asyncRetryingServiceDiscoveryAnnouncer.register();
LOGGER.info("Controller is started.");
}

Expand Down Expand Up @@ -302,10 +304,7 @@ private void initializeSystemSchema(Admin admin) {
*/
public void stop() {
// unregister from service discovery first
serviceDiscoveryAnnouncers.forEach(serviceDiscoveryAnnouncer -> {
serviceDiscoveryAnnouncer.unregister();
LOGGER.info("Unregistered from service discovery: {}", serviceDiscoveryAnnouncer);
});
asyncRetryingServiceDiscoveryAnnouncer.unregister();
// TODO: we may want a dependency structure so we ensure services are shutdown in the correct order.
systemStoreRepairService.ifPresent(Utils::closeQuietlyWithErrorLogged);
storeGraveyardCleanupService.ifPresent(Utils::closeQuietlyWithErrorLogged);
Expand Down
Loading

0 comments on commit f43ec45

Please sign in to comment.