From c1d289e8e66db1e7fb8ab9f7c9333f485b7d1ad0 Mon Sep 17 00:00:00 2001 From: Lei Lu Date: Thu, 25 Jan 2024 19:12:21 -0800 Subject: [PATCH] =?UTF-8?q?[server][controller]=20Try=20ungraceful=20close?= =?UTF-8?q?=20when=20the=20graceful=20one=20faile=E2=80=A6=20(#828)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [server][controller] Try ungraceful close when the graceful one failed in VeniceWriter In VeniceWriter.close(), when gracefulClose is set to true, Kafka producer call the underlining method, which tries to flush the buffered data before closing it (see ApacheKafkaProducerAdapter.close). However, when exception happens (e.g. timeout), today, VeniveWriter only logs the exception, moves on, and consider the VeniceWriter being closed while it is actually not. (A leaking venicewriter can cause several issues, e.g. a stuck consumer thread etc.) This change adds a retry of ungraceful close if gracefulClose flag is set to true and it failed. In this case, ungraceful close will skip the flushing step and it should always succeed (e.g. StoreIngestionTask.kill()). For cases where gracefulClose is set to false, this change doesn't change anything. --- .../linkedin/venice/writer/VeniceWriter.java | 144 +++++++++++++----- .../venice/writer/VeniceWriterUnitTest.java | 27 ++++ .../ApacheKafkaProducerAdapterITest.java | 4 +- .../venice/writer/VeniceWriterTest.java | 79 ++++++++++ 4 files changed, 211 insertions(+), 43 deletions(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index ded4f3fed7e..69023dcdca8 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -56,6 +56,7 @@ import com.linkedin.venice.utils.ExceptionUtils; import com.linkedin.venice.utils.LatencyUtils; import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Timer; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.VeniceResourceCloseResult; @@ -395,70 +396,131 @@ public static ControlMessage generateHeartbeatMessage(CheckSumType checkSumType) */ @Override public void close(boolean gracefulClose) { + try { + closeAsync(gracefulClose).get(); + } catch (ExecutionException | InterruptedException e) { + logger.warn("Future completed exceptionally in closing VeniceWriter for topic: {}", topicName, e); + } + } + + /** + * Close the {@link VeniceWriter}. + * + * Deprecating this method due to the concern of sending END_OF_SEGMENT control message to a non-existing topic can be + * blocked indefinitely as it is calling + * {@link #sendMessage(KeyProvider, KafkaMessageEnvelopeProvider, int, PubSubProducerCallback, boolean)}.get() + * without timeout. + * + * @param gracefulClose whether to end the segments and send END_OF_SEGMENT control message. + * @param retryOnGracefulCloseFailure whether to retry on graceful close failure. + */ + @Deprecated + public void close(boolean gracefulClose, boolean retryOnGracefulCloseFailure) { synchronized (closeLock) { if (isClosed) { return; } - long startTime = System.currentTimeMillis(); - logger.info("Closing VeniceWriter for topic: {}", topicName); - try { - // If {@link #broadcastEndOfPush(Map)} was already called, the {@link #endAllSegments(boolean)} - // will not do anything (it's idempotent). Segments should not be ended if there are still data missing. - if (gracefulClose) { - endAllSegments(true); - } - // DO NOT call the {@link #PubSubProducerAdapter.close(int) version from here.} - // For non-shared producer mode gracefulClose will flush the producer - producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose); - OPEN_VENICE_WRITER_COUNT.decrementAndGet(); - } catch (Exception e) { - logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e); - VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); + logger.info("Closing VeniceWriter for topic: {}, gracefulness: {}", topicName, gracefulClose); + try (Timer ignore = Timer.run( + elapsedTimeInMs -> logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, elapsedTimeInMs))) { + try { + // If {@link #broadcastEndOfPush(Map)} was already called, the {@link #endAllSegments(boolean)} + // will not do anything (it's idempotent). Segments should not be ended if there are still data missing. + if (gracefulClose) { + endAllSegments(true); + } + // DO NOT call the {@link #PubSubProducerAdapter.close(int) version from here.} + // For non-shared producer mode gracefulClose will flush the producer + + producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose); + OPEN_VENICE_WRITER_COUNT.decrementAndGet(); + } catch (Exception e) { + handleExceptionInClose(e, gracefulClose, retryOnGracefulCloseFailure); + } finally { + threadPoolExecutor.shutdown(); + isClosed = true; + } } - threadPoolExecutor.shutdown(); - isClosed = true; - logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, System.currentTimeMillis() - startTime); } } public CompletableFuture closeAsync(boolean gracefulClose) { + return closeAsync(gracefulClose, true); + } + + public CompletableFuture closeAsync( + boolean gracefulClose, + boolean retryOnGracefulCloseFailure) { + /* + * If the VeniceWriter is already closed, return a completed future. This is to avoid the case that a Thread pool + * RejectedExecutionException when a previous closeAsync is executed and the threadPool is already terminated. + */ + synchronized (closeLock) { + if (isClosed) { + return CompletableFuture.completedFuture(VeniceResourceCloseResult.ALREADY_CLOSED); + } + } + return CompletableFuture.supplyAsync(() -> { synchronized (closeLock) { if (isClosed) { return VeniceResourceCloseResult.ALREADY_CLOSED; } - long startTime = System.currentTimeMillis(); - logger.info("Closing VeniceWriter for topic: {}", topicName); - try { - // try to end all segments before closing the producer - if (gracefulClose) { - CompletableFuture endSegmentsFuture = - CompletableFuture.runAsync(() -> endAllSegments(true), threadPoolExecutor); - try { - endSegmentsFuture.get(Math.max(100, closeTimeOutInMs / 2), TimeUnit.MILLISECONDS); - } catch (Exception e) { - // cancel the endSegmentsFuture if it's not done in time - if (!endSegmentsFuture.isDone()) { - endSegmentsFuture.cancel(true); + logger.info("Closing VeniceWriter for topic: {}, gracefulness: {}", topicName, gracefulClose); + try (Timer ignore = Timer.run( + elapsedTimeInMs -> logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, elapsedTimeInMs))) { + try { + // try to end all segments before closing the producer. + if (gracefulClose) { + CompletableFuture endSegmentsFuture = + CompletableFuture.runAsync(() -> endAllSegments(true), threadPoolExecutor); + try { + endSegmentsFuture.get(Math.max(100, closeTimeOutInMs / 2), TimeUnit.MILLISECONDS); + } catch (Exception e) { + // cancel the endSegmentsFuture if it's not done in time. + if (!endSegmentsFuture.isDone()) { + endSegmentsFuture.cancel(true); + } + logger.warn("Swallowed an exception while trying to end all segments for topic: {}", topicName, e); } - logger.warn("Swallowed an exception while trying to end all segments for topic: {}", topicName, e); } + producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose); + OPEN_VENICE_WRITER_COUNT.decrementAndGet(); + } catch (Exception e) { + handleExceptionInClose(e, gracefulClose, retryOnGracefulCloseFailure); + } finally { + threadPoolExecutor.shutdown(); + isClosed = true; } - producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose); - OPEN_VENICE_WRITER_COUNT.decrementAndGet(); - } catch (Exception e) { - logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e); - VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); } - threadPoolExecutor.shutdown(); - isClosed = true; - logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, System.currentTimeMillis() - startTime); return VeniceResourceCloseResult.SUCCESS; } + }, threadPoolExecutor); } + void handleExceptionInClose(Exception e, boolean gracefulClose, boolean retryOnGracefulCloseFailure) { + logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e); + VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); + + // For graceful close, swallow the exception and give another try to close it ungracefully. + try { + if (gracefulClose && retryOnGracefulCloseFailure) { + logger.info( + "Ungracefully closing the VeniceWriter for topic: {}, closeTimeOut: {} ms", + topicName, + closeTimeOutInMs); + producerAdapter.close(topicName, closeTimeOutInMs, false); + OPEN_VENICE_WRITER_COUNT.decrementAndGet(); + } + } catch (Exception ex) { + // Even ungraceful close fails, give up, swallow exception and move on. + logger.warn("Exception in ungraceful close for topic: {}", topicName, ex); + VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); + } + } + @Override public void close() { close(true); @@ -1561,7 +1623,7 @@ private String getSizeReport(int serializedKeySize, int serializedValueSize, int * @param partition the Kafka partition to write to. * @param debugInfo arbitrary key/value pairs of information that will be propagated alongside the control message. */ - private void sendStartOfSegment(int partition, Map debugInfo) { + public void sendStartOfSegment(int partition, Map debugInfo) { ControlMessage controlMessage = new ControlMessage(); controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue(); StartOfSegment startOfSegment = new StartOfSegment(); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java index 512b3f793e6..6d99884dc8d 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java @@ -9,10 +9,12 @@ import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES; import static com.linkedin.venice.writer.VeniceWriter.VENICE_DEFAULT_LOGICAL_TS; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -48,12 +50,15 @@ import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.SystemTime; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.VeniceProperties; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.common.errors.TimeoutException; import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -533,4 +538,26 @@ public void testSendHeartbeat(boolean addLeaderCompleteHeader, LeaderCompleteSta } } } + + // Write a unit test for the retry mechanism in VeniceWriter.close(true) method. + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = 10 * Time.MS_PER_SECOND) + public void testVeniceWriterCloseRetry(boolean gracefulClose) throws ExecutionException, InterruptedException { + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + doThrow(new TimeoutException()).when(mockedProducer).close(anyString(), anyInt(), eq(true)); + + String testTopic = "test"; + VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setPartitionCount(1).build(); + VeniceWriter writer = + new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer); + + // Verify that if the producer throws a TimeoutException, the VeniceWriter will retry the close() method + // with doFlash = false for both close() and closeAsync() methods. + writer.close(gracefulClose); + + writer = new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer); + writer.closeAsync(gracefulClose).get(); + + // Verify that the close(false) method will be called twice. + verify(mockedProducer, times(2)).close(anyString(), anyInt(), eq(false)); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest.java index fc933c311e1..d572b3ca33e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest.java @@ -114,11 +114,11 @@ private void createTopic(String topicName, int numPartitions, int replicationFac } } - private KafkaKey getDummyKey() { + public static KafkaKey getDummyKey() { return new KafkaKey(MessageType.PUT, Utils.getUniqueString("key-").getBytes()); } - private KafkaMessageEnvelope getDummyVal() { + public static KafkaMessageEnvelope getDummyVal() { KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope(); messageEnvelope.producerMetadata = new ProducerMetadata(); messageEnvelope.producerMetadata.messageTimestamp = 0; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java index fc0dca8f811..e1ede61ebc3 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java @@ -1,8 +1,13 @@ package com.linkedin.venice.writer; import static com.linkedin.venice.kafka.TopicManager.DEFAULT_KAFKA_OPERATION_TIMEOUT_MS; +import static com.linkedin.venice.utils.Time.MS_PER_SECOND; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.linkedin.venice.ConfigKeys; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.kafka.TopicManager; @@ -22,6 +27,8 @@ import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.serialization.avro.KafkaValueSerializer; import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer; +import com.linkedin.venice.utils.DataProviderUtils; +import com.linkedin.venice.utils.ExceptionUtils; import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; @@ -33,10 +40,13 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -45,6 +55,7 @@ @Test public class VeniceWriterTest { + private static final Logger LOGGER = LogManager.getLogger(VeniceWriterTest.class); private PubSubBrokerWrapper pubSubBrokerWrapper; private TopicManager topicManager; private PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory; @@ -142,4 +153,72 @@ public void testThreadSafetyForPutMessages() throws ExecutionException, Interrup 100, veniceWriter -> veniceWriter.put(new KafkaKey(MessageType.PUT, "blah".getBytes()), "blah".getBytes(), 1, null)); } + + /** + * This test does the following steps: + * 1. Creates a VeniceWriter with a topic that does not exist. + * 2. Create a new thread to send a SOS control message to this non-existent topic. + * 3. The new thread should block on sendMessage() call. + * 4. Main thread closes the VeniceWriter (no matter 'doFlush' flag is true or false) and + * expect the 'sendMessageThread' to unblock. + */ + @Test(timeOut = 60 * MS_PER_SECOND, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testVeniceWriterClose(boolean doFlush) { + String topicName = Utils.getUniqueString("version-topic"); + int partitionCount = 1; + + // Intentionally not create the topic: "version-topic", so that the control message send will also be blocked. + // topicManager.createTopic(pubSubTopic, partitionCount, 1, true); + + CountDownLatch countDownLatch = new CountDownLatch(1); + + Properties properties = new Properties(); + properties.put(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS, pubSubBrokerWrapper.getAddress()); + properties.put(ConfigKeys.PARTITIONER_CLASS, DefaultVenicePartitioner.class.getName()); + + try (VeniceWriter veniceWriter = + TestUtils.getVeniceWriterFactory(properties, pubSubProducerAdapterFactory) + .createVeniceWriter( + new VeniceWriterOptions.Builder(topicName).setUseKafkaKeySerializer(true) + .setPartitionCount(partitionCount) + .build())) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + + Future sendMessageFuture = executor.submit(() -> { + Thread.currentThread().setName("sendMessageThread"); + countDownLatch.countDown(); + try { + // send to non-existent topic and block. + veniceWriter.sendStartOfSegment(0, null); + fail("sendMessage on non-existent topic should have blocked the executing thread"); + } catch (VeniceException e) { + LOGGER.info("As expected an exception has been received from sendMessage()", e); + assertNotNull(e.getMessage(), "Exception thrown by sendMessage does not have a message"); + assertTrue( + e.getMessage() + .contains( + String.format( + "Got an error while trying to produce message into Kafka. Topic: '%s'", + veniceWriter.getTopicName()))); + assertTrue(ExceptionUtils.recursiveMessageContains(e, "Producer closed while send in progress")); + assertTrue(ExceptionUtils.recursiveMessageContains(e, "Requested metadata update after close")); + LOGGER.info("All expectations were met in thread: {}", Thread.currentThread().getName()); + } + }); + + try { + countDownLatch.await(); + // Still wait for some time to make sure blocking sendMessage is inside kafka before closing it. + Utils.sleep(5000); + veniceWriter.close(doFlush); + + // this is necessary to check whether expectations in sendMessage thread were met. + sendMessageFuture.get(); + } catch (Exception e) { + fail("Producer closing should have succeeded without an exception", e); + } finally { + executor.shutdownNow(); + } + } + } }