diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index ded4f3fed7e..69023dcdca8 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -56,6 +56,7 @@ import com.linkedin.venice.utils.ExceptionUtils; import com.linkedin.venice.utils.LatencyUtils; import com.linkedin.venice.utils.Time; +import com.linkedin.venice.utils.Timer; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.VeniceResourceCloseResult; @@ -395,70 +396,131 @@ public static ControlMessage generateHeartbeatMessage(CheckSumType checkSumType) */ @Override public void close(boolean gracefulClose) { + try { + closeAsync(gracefulClose).get(); + } catch (ExecutionException | InterruptedException e) { + logger.warn("Future completed exceptionally in closing VeniceWriter for topic: {}", topicName, e); + } + } + + /** + * Close the {@link VeniceWriter}. + * + * Deprecating this method due to the concern of sending END_OF_SEGMENT control message to a non-existing topic can be + * blocked indefinitely as it is calling + * {@link #sendMessage(KeyProvider, KafkaMessageEnvelopeProvider, int, PubSubProducerCallback, boolean)}.get() + * without timeout. + * + * @param gracefulClose whether to end the segments and send END_OF_SEGMENT control message. + * @param retryOnGracefulCloseFailure whether to retry on graceful close failure. + */ + @Deprecated + public void close(boolean gracefulClose, boolean retryOnGracefulCloseFailure) { synchronized (closeLock) { if (isClosed) { return; } - long startTime = System.currentTimeMillis(); - logger.info("Closing VeniceWriter for topic: {}", topicName); - try { - // If {@link #broadcastEndOfPush(Map)} was already called, the {@link #endAllSegments(boolean)} - // will not do anything (it's idempotent). Segments should not be ended if there are still data missing. - if (gracefulClose) { - endAllSegments(true); - } - // DO NOT call the {@link #PubSubProducerAdapter.close(int) version from here.} - // For non-shared producer mode gracefulClose will flush the producer - producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose); - OPEN_VENICE_WRITER_COUNT.decrementAndGet(); - } catch (Exception e) { - logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e); - VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); + logger.info("Closing VeniceWriter for topic: {}, gracefulness: {}", topicName, gracefulClose); + try (Timer ignore = Timer.run( + elapsedTimeInMs -> logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, elapsedTimeInMs))) { + try { + // If {@link #broadcastEndOfPush(Map)} was already called, the {@link #endAllSegments(boolean)} + // will not do anything (it's idempotent). Segments should not be ended if there are still data missing. + if (gracefulClose) { + endAllSegments(true); + } + // DO NOT call the {@link #PubSubProducerAdapter.close(int) version from here.} + // For non-shared producer mode gracefulClose will flush the producer + + producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose); + OPEN_VENICE_WRITER_COUNT.decrementAndGet(); + } catch (Exception e) { + handleExceptionInClose(e, gracefulClose, retryOnGracefulCloseFailure); + } finally { + threadPoolExecutor.shutdown(); + isClosed = true; + } } - threadPoolExecutor.shutdown(); - isClosed = true; - logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, System.currentTimeMillis() - startTime); } } public CompletableFuture closeAsync(boolean gracefulClose) { + return closeAsync(gracefulClose, true); + } + + public CompletableFuture closeAsync( + boolean gracefulClose, + boolean retryOnGracefulCloseFailure) { + /* + * If the VeniceWriter is already closed, return a completed future. This is to avoid the case that a Thread pool + * RejectedExecutionException when a previous closeAsync is executed and the threadPool is already terminated. + */ + synchronized (closeLock) { + if (isClosed) { + return CompletableFuture.completedFuture(VeniceResourceCloseResult.ALREADY_CLOSED); + } + } + return CompletableFuture.supplyAsync(() -> { synchronized (closeLock) { if (isClosed) { return VeniceResourceCloseResult.ALREADY_CLOSED; } - long startTime = System.currentTimeMillis(); - logger.info("Closing VeniceWriter for topic: {}", topicName); - try { - // try to end all segments before closing the producer - if (gracefulClose) { - CompletableFuture endSegmentsFuture = - CompletableFuture.runAsync(() -> endAllSegments(true), threadPoolExecutor); - try { - endSegmentsFuture.get(Math.max(100, closeTimeOutInMs / 2), TimeUnit.MILLISECONDS); - } catch (Exception e) { - // cancel the endSegmentsFuture if it's not done in time - if (!endSegmentsFuture.isDone()) { - endSegmentsFuture.cancel(true); + logger.info("Closing VeniceWriter for topic: {}, gracefulness: {}", topicName, gracefulClose); + try (Timer ignore = Timer.run( + elapsedTimeInMs -> logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, elapsedTimeInMs))) { + try { + // try to end all segments before closing the producer. + if (gracefulClose) { + CompletableFuture endSegmentsFuture = + CompletableFuture.runAsync(() -> endAllSegments(true), threadPoolExecutor); + try { + endSegmentsFuture.get(Math.max(100, closeTimeOutInMs / 2), TimeUnit.MILLISECONDS); + } catch (Exception e) { + // cancel the endSegmentsFuture if it's not done in time. + if (!endSegmentsFuture.isDone()) { + endSegmentsFuture.cancel(true); + } + logger.warn("Swallowed an exception while trying to end all segments for topic: {}", topicName, e); } - logger.warn("Swallowed an exception while trying to end all segments for topic: {}", topicName, e); } + producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose); + OPEN_VENICE_WRITER_COUNT.decrementAndGet(); + } catch (Exception e) { + handleExceptionInClose(e, gracefulClose, retryOnGracefulCloseFailure); + } finally { + threadPoolExecutor.shutdown(); + isClosed = true; } - producerAdapter.close(topicName, closeTimeOutInMs, gracefulClose); - OPEN_VENICE_WRITER_COUNT.decrementAndGet(); - } catch (Exception e) { - logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e); - VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); } - threadPoolExecutor.shutdown(); - isClosed = true; - logger.info("Closed VeniceWriter for topic: {} in {} ms", topicName, System.currentTimeMillis() - startTime); return VeniceResourceCloseResult.SUCCESS; } + }, threadPoolExecutor); } + void handleExceptionInClose(Exception e, boolean gracefulClose, boolean retryOnGracefulCloseFailure) { + logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", topicName, e); + VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); + + // For graceful close, swallow the exception and give another try to close it ungracefully. + try { + if (gracefulClose && retryOnGracefulCloseFailure) { + logger.info( + "Ungracefully closing the VeniceWriter for topic: {}, closeTimeOut: {} ms", + topicName, + closeTimeOutInMs); + producerAdapter.close(topicName, closeTimeOutInMs, false); + OPEN_VENICE_WRITER_COUNT.decrementAndGet(); + } + } catch (Exception ex) { + // Even ungraceful close fails, give up, swallow exception and move on. + logger.warn("Exception in ungraceful close for topic: {}", topicName, ex); + VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet(); + } + } + @Override public void close() { close(true); @@ -1561,7 +1623,7 @@ private String getSizeReport(int serializedKeySize, int serializedValueSize, int * @param partition the Kafka partition to write to. * @param debugInfo arbitrary key/value pairs of information that will be propagated alongside the control message. */ - private void sendStartOfSegment(int partition, Map debugInfo) { + public void sendStartOfSegment(int partition, Map debugInfo) { ControlMessage controlMessage = new ControlMessage(); controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue(); StartOfSegment startOfSegment = new StartOfSegment(); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java index 512b3f793e6..6d99884dc8d 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java @@ -9,10 +9,12 @@ import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES; import static com.linkedin.venice.writer.VeniceWriter.VENICE_DEFAULT_LOGICAL_TS; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -48,12 +50,15 @@ import com.linkedin.venice.storage.protocol.ChunkedValueManifest; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.SystemTime; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.VeniceProperties; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.common.errors.TimeoutException; import org.mockito.ArgumentCaptor; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -533,4 +538,26 @@ public void testSendHeartbeat(boolean addLeaderCompleteHeader, LeaderCompleteSta } } } + + // Write a unit test for the retry mechanism in VeniceWriter.close(true) method. + @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = 10 * Time.MS_PER_SECOND) + public void testVeniceWriterCloseRetry(boolean gracefulClose) throws ExecutionException, InterruptedException { + PubSubProducerAdapter mockedProducer = mock(PubSubProducerAdapter.class); + doThrow(new TimeoutException()).when(mockedProducer).close(anyString(), anyInt(), eq(true)); + + String testTopic = "test"; + VeniceWriterOptions veniceWriterOptions = new VeniceWriterOptions.Builder(testTopic).setPartitionCount(1).build(); + VeniceWriter writer = + new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer); + + // Verify that if the producer throws a TimeoutException, the VeniceWriter will retry the close() method + // with doFlash = false for both close() and closeAsync() methods. + writer.close(gracefulClose); + + writer = new VeniceWriter(veniceWriterOptions, VeniceProperties.empty(), mockedProducer); + writer.closeAsync(gracefulClose).get(); + + // Verify that the close(false) method will be called twice. + verify(mockedProducer, times(2)).close(anyString(), anyInt(), eq(false)); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest.java index fc933c311e1..d572b3ca33e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/pubsub/adapter/kafka/producer/ApacheKafkaProducerAdapterITest.java @@ -114,11 +114,11 @@ private void createTopic(String topicName, int numPartitions, int replicationFac } } - private KafkaKey getDummyKey() { + public static KafkaKey getDummyKey() { return new KafkaKey(MessageType.PUT, Utils.getUniqueString("key-").getBytes()); } - private KafkaMessageEnvelope getDummyVal() { + public static KafkaMessageEnvelope getDummyVal() { KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope(); messageEnvelope.producerMetadata = new ProducerMetadata(); messageEnvelope.producerMetadata.messageTimestamp = 0; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java index fc0dca8f811..e1ede61ebc3 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/writer/VeniceWriterTest.java @@ -1,8 +1,13 @@ package com.linkedin.venice.writer; import static com.linkedin.venice.kafka.TopicManager.DEFAULT_KAFKA_OPERATION_TIMEOUT_MS; +import static com.linkedin.venice.utils.Time.MS_PER_SECOND; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.linkedin.venice.ConfigKeys; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.integration.utils.PubSubBrokerWrapper; import com.linkedin.venice.integration.utils.ServiceFactory; import com.linkedin.venice.kafka.TopicManager; @@ -22,6 +27,8 @@ import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.serialization.avro.KafkaValueSerializer; import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer; +import com.linkedin.venice.utils.DataProviderUtils; +import com.linkedin.venice.utils.ExceptionUtils; import com.linkedin.venice.utils.IntegrationTestPushUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; @@ -33,10 +40,13 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -45,6 +55,7 @@ @Test public class VeniceWriterTest { + private static final Logger LOGGER = LogManager.getLogger(VeniceWriterTest.class); private PubSubBrokerWrapper pubSubBrokerWrapper; private TopicManager topicManager; private PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory; @@ -142,4 +153,72 @@ public void testThreadSafetyForPutMessages() throws ExecutionException, Interrup 100, veniceWriter -> veniceWriter.put(new KafkaKey(MessageType.PUT, "blah".getBytes()), "blah".getBytes(), 1, null)); } + + /** + * This test does the following steps: + * 1. Creates a VeniceWriter with a topic that does not exist. + * 2. Create a new thread to send a SOS control message to this non-existent topic. + * 3. The new thread should block on sendMessage() call. + * 4. Main thread closes the VeniceWriter (no matter 'doFlush' flag is true or false) and + * expect the 'sendMessageThread' to unblock. + */ + @Test(timeOut = 60 * MS_PER_SECOND, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class) + public void testVeniceWriterClose(boolean doFlush) { + String topicName = Utils.getUniqueString("version-topic"); + int partitionCount = 1; + + // Intentionally not create the topic: "version-topic", so that the control message send will also be blocked. + // topicManager.createTopic(pubSubTopic, partitionCount, 1, true); + + CountDownLatch countDownLatch = new CountDownLatch(1); + + Properties properties = new Properties(); + properties.put(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS, pubSubBrokerWrapper.getAddress()); + properties.put(ConfigKeys.PARTITIONER_CLASS, DefaultVenicePartitioner.class.getName()); + + try (VeniceWriter veniceWriter = + TestUtils.getVeniceWriterFactory(properties, pubSubProducerAdapterFactory) + .createVeniceWriter( + new VeniceWriterOptions.Builder(topicName).setUseKafkaKeySerializer(true) + .setPartitionCount(partitionCount) + .build())) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + + Future sendMessageFuture = executor.submit(() -> { + Thread.currentThread().setName("sendMessageThread"); + countDownLatch.countDown(); + try { + // send to non-existent topic and block. + veniceWriter.sendStartOfSegment(0, null); + fail("sendMessage on non-existent topic should have blocked the executing thread"); + } catch (VeniceException e) { + LOGGER.info("As expected an exception has been received from sendMessage()", e); + assertNotNull(e.getMessage(), "Exception thrown by sendMessage does not have a message"); + assertTrue( + e.getMessage() + .contains( + String.format( + "Got an error while trying to produce message into Kafka. Topic: '%s'", + veniceWriter.getTopicName()))); + assertTrue(ExceptionUtils.recursiveMessageContains(e, "Producer closed while send in progress")); + assertTrue(ExceptionUtils.recursiveMessageContains(e, "Requested metadata update after close")); + LOGGER.info("All expectations were met in thread: {}", Thread.currentThread().getName()); + } + }); + + try { + countDownLatch.await(); + // Still wait for some time to make sure blocking sendMessage is inside kafka before closing it. + Utils.sleep(5000); + veniceWriter.close(doFlush); + + // this is necessary to check whether expectations in sendMessage thread were met. + sendMessageFuture.get(); + } catch (Exception e) { + fail("Producer closing should have succeeded without an exception", e); + } finally { + executor.shutdownNow(); + } + } + } }