diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/GzipCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/GzipCompressor.java index c1e68428e6..4ace2a69ce 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/GzipCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/GzipCompressor.java @@ -24,7 +24,7 @@ public GzipCompressor() { } @Override - public byte[] compress(byte[] data) throws IOException { + protected byte[] compressInternal(byte[] data) throws IOException { ReusableGzipOutputStream out = gzipPool.getReusableGzipOutputStream(); try { out.writeHeader(); @@ -37,7 +37,7 @@ public byte[] compress(byte[] data) throws IOException { } @Override - public void close() throws IOException { + protected void closeInternal() throws IOException { try { gzipPool.close(); } catch (Exception e) { @@ -47,7 +47,7 @@ public void close() throws IOException { } @Override - public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IOException { + protected ByteBuffer compressInternal(ByteBuffer data, int startPositionOfOutput) throws IOException { /** * N.B.: We initialize the size of buffer in this output stream at the size of the deflated payload, which is not * ideal, but not necessarily bad either. The assumption is that GZIP usually doesn't compress our payloads that @@ -74,7 +74,7 @@ public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IO } @Override - public ByteBuffer decompress(ByteBuffer data) throws IOException { + protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException { if (data.hasRemaining()) { if (data.hasArray()) { return decompress(data.array(), data.position(), data.remaining()); @@ -89,14 +89,14 @@ public ByteBuffer decompress(ByteBuffer data) throws IOException { } @Override - public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException { + protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException { try (InputStream gis = decompress(new ByteArrayInputStream(data, offset, length))) { return ByteBuffer.wrap(IOUtils.toByteArray(gis)); } } @Override - public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) + protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader) throws IOException { byte[] decompressedByteArray; try (InputStream gis = decompress(new ByteArrayInputStream(data, offset, length))) { @@ -111,7 +111,7 @@ public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int } @Override - public InputStream decompress(InputStream inputStream) throws IOException { + protected InputStream decompressInternal(InputStream inputStream) throws IOException { return new GZIPInputStream(inputStream); } diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java index baa3e80bd8..3fde4e7d00 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/NoopCompressor.java @@ -6,6 +6,9 @@ import java.nio.ByteBuffer; +/** + * Locking is not necessary for {@link NoopCompressor}, so this class overrides all the public APIs to avoid locking. + */ public class NoopCompressor extends VeniceCompressor { public NoopCompressor() { super(CompressionStrategy.NO_OP); @@ -16,6 +19,11 @@ public byte[] compress(byte[] data) throws IOException { return data; } + @Override + protected byte[] compressInternal(byte[] data) throws IOException { + throw new UnsupportedOperationException("compressInternal"); + } + @Override public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IOException { if (startPositionOfOutput != 0) { @@ -24,6 +32,11 @@ public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IO return data; } + @Override + protected ByteBuffer compressInternal(ByteBuffer src, int startPositionOfOutput) throws IOException { + throw new UnsupportedOperationException("compressInternal"); + } + @Override public int hashCode() { return super.hashCode(); @@ -34,11 +47,21 @@ public ByteBuffer decompress(ByteBuffer data) throws IOException { return data; } + @Override + protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException { + throw new UnsupportedOperationException("decompressInternal"); + } + @Override public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException { return ByteBuffer.wrap(data, offset, length); } + @Override + protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException { + throw new UnsupportedOperationException("decompressInternal"); + } + @Override public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) throws IOException { @@ -50,11 +73,32 @@ public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int return bb; } + @Override + protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader) + throws IOException { + throw new UnsupportedOperationException("decompressAndPrependSchemaHeaderInternal"); + } + @Override public InputStream decompress(InputStream inputStream) throws IOException { return inputStream; } + @Override + protected InputStream decompressInternal(InputStream inputStream) throws IOException { + throw new UnsupportedOperationException("decompressInternal"); + } + + @Override + public void close() throws IOException { + // do nothing + } + + @Override + protected void closeInternal() throws IOException { + throw new UnsupportedOperationException("closeInternal"); + } + @Override public boolean equals(Object o) { if (o == this) { diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java index bf7b30639d..6b18eb94de 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/VeniceCompressor.java @@ -1,27 +1,66 @@ package com.linkedin.venice.compression; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.utils.ByteUtils; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class VeniceCompressor implements Closeable { protected static final int SCHEMA_HEADER_LENGTH = ByteUtils.SIZE_OF_INT; private final CompressionStrategy compressionStrategy; + private boolean isClosed = false; + /** + * To avoid the race condition between 'compress'/'decompress' operation and 'close'. + */ + private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); protected VeniceCompressor(CompressionStrategy compressionStrategy) { this.compressionStrategy = compressionStrategy; } - public abstract byte[] compress(byte[] data) throws IOException; + interface CompressionRunnable { + R run() throws IOException; + } + + private R executeWithSafeGuard(CompressionRunnable runnable) throws IOException { + readWriteLock.readLock().lock(); + try { + if (isClosed) { + throw new VeniceException("Compressor for " + getCompressionStrategy() + " has been closed"); + } + return runnable.run(); + } finally { + readWriteLock.readLock().unlock(); + } + } + + public byte[] compress(byte[] data) throws IOException { + return executeWithSafeGuard(() -> compressInternal(data)); + } - public abstract ByteBuffer compress(ByteBuffer src, int startPositionOfOutput) throws IOException; + protected abstract byte[] compressInternal(byte[] data) throws IOException; - public abstract ByteBuffer decompress(ByteBuffer data) throws IOException; + public ByteBuffer compress(ByteBuffer src, int startPositionOfOutput) throws IOException { + return executeWithSafeGuard(() -> compressInternal(src, startPositionOfOutput)); + } - public abstract ByteBuffer decompress(byte[] data, int offset, int length) throws IOException; + protected abstract ByteBuffer compressInternal(ByteBuffer src, int startPositionOfOutput) throws IOException; + + public ByteBuffer decompress(ByteBuffer data) throws IOException { + return executeWithSafeGuard(() -> decompressInternal(data)); + } + + protected abstract ByteBuffer decompressInternal(ByteBuffer data) throws IOException; + + public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException { + return executeWithSafeGuard(() -> decompressInternal(data, offset, length)); + } + + protected abstract ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException; /** * This method tries to decompress data and maybe prepend the schema header. @@ -29,15 +68,36 @@ protected VeniceCompressor(CompressionStrategy compressionStrategy) { * decompressed data. The ByteBuffer will be positioned at the beginning of the decompressed data and the remaining of * the ByteBuffer will be the length of the decompressed data. */ - public abstract ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) - throws IOException; + public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) + throws IOException { + return executeWithSafeGuard(() -> decompressAndPrependSchemaHeaderInternal(data, offset, length, schemaHeader)); + } + + protected abstract ByteBuffer decompressAndPrependSchemaHeaderInternal( + byte[] data, + int offset, + int length, + int schemaHeader) throws IOException; public CompressionStrategy getCompressionStrategy() { return compressionStrategy; } - public abstract InputStream decompress(InputStream inputStream) throws IOException; + public InputStream decompress(InputStream inputStream) throws IOException { + return executeWithSafeGuard(() -> decompressInternal(inputStream)); + } + + protected abstract InputStream decompressInternal(InputStream inputStream) throws IOException; public void close() throws IOException { + readWriteLock.writeLock().lock(); + try { + isClosed = true; + closeInternal(); + } finally { + readWriteLock.writeLock().unlock(); + } } + + protected abstract void closeInternal() throws IOException; } diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/ZstdWithDictCompressor.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/ZstdWithDictCompressor.java index 0755320486..3b67a2dfda 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/ZstdWithDictCompressor.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/compression/ZstdWithDictCompressor.java @@ -47,12 +47,12 @@ public ZstdWithDictCompressor(final byte[] dictionary, int level) { } @Override - public byte[] compress(byte[] data) { + protected byte[] compressInternal(byte[] data) { return compressor.get().compress(data); } @Override - public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IOException { + protected ByteBuffer compressInternal(ByteBuffer data, int startPositionOfOutput) throws IOException { long maxDstSize = Zstd.compressBound(data.remaining()); if (maxDstSize + startPositionOfOutput > Integer.MAX_VALUE) { throw new ZstdException(Zstd.errGeneric(), "Max output size is greater than Integer.MAX_VALUE"); @@ -87,7 +87,7 @@ public ByteBuffer compress(ByteBuffer data, int startPositionOfOutput) throws IO } @Override - public ByteBuffer decompress(ByteBuffer data) throws IOException { + protected ByteBuffer decompressInternal(ByteBuffer data) throws IOException { if (data.hasRemaining()) { if (data.hasArray()) { return decompress(data.array(), data.position(), data.remaining()); @@ -107,7 +107,7 @@ public ByteBuffer decompress(ByteBuffer data) throws IOException { } @Override - public ByteBuffer decompress(byte[] data, int offset, int length) throws IOException { + protected ByteBuffer decompressInternal(byte[] data, int offset, int length) throws IOException { int expectedSize = validateExpectedDecompressedSize(Zstd.decompressedSize(data, offset, length)); ByteBuffer returnedData = ByteBuffer.allocate(expectedSize); int actualSize = decompressor.get() @@ -124,7 +124,7 @@ public ByteBuffer decompress(byte[] data, int offset, int length) throws IOExcep } @Override - public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int length, int schemaHeader) + protected ByteBuffer decompressAndPrependSchemaHeaderInternal(byte[] data, int offset, int length, int schemaHeader) throws IOException { int expectedDecompressedDataSize = validateExpectedDecompressedSize(Zstd.decompressedSize(data, offset, length)); @@ -138,12 +138,12 @@ public ByteBuffer decompressAndPrependSchemaHeader(byte[] data, int offset, int } @Override - public InputStream decompress(InputStream inputStream) throws IOException { + protected InputStream decompressInternal(InputStream inputStream) throws IOException { return new ZstdInputStream(inputStream).setDict(this.dictDecompress); } @Override - public void close() throws IOException { + protected void closeInternal() throws IOException { this.compressor.close(); this.decompressor.close(); IOUtils.closeQuietly(this.dictCompress); diff --git a/internal/venice-client-common/src/test/java/com/linkedin/venice/compression/TestVeniceCompressor.java b/internal/venice-client-common/src/test/java/com/linkedin/venice/compression/TestVeniceCompressor.java index 596e3642b3..ba015d4834 100644 --- a/internal/venice-client-common/src/test/java/com/linkedin/venice/compression/TestVeniceCompressor.java +++ b/internal/venice-client-common/src/test/java/com/linkedin/venice/compression/TestVeniceCompressor.java @@ -1,6 +1,11 @@ package com.linkedin.venice.compression; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; + import com.github.luben.zstd.Zstd; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.TestUtils; import com.linkedin.venice.utils.Time; @@ -14,6 +19,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.testng.Assert; @@ -173,7 +179,7 @@ private enum SourceDataType { @Test public void testZSTDThrowsExceptionOnNullDictionary() { - Assert.assertThrows( + assertThrows( () -> new CompressorFactory() .createVersionSpecificCompressorIfNotExist(CompressionStrategy.ZSTD_WITH_DICT, "foo_v1", null)); } @@ -205,4 +211,15 @@ public void testCompressorEqual() { } } } + + @Test + public void testCompressorClose() throws IOException { + VeniceCompressor compressor = new ZstdWithDictCompressor("abc".getBytes(), Zstd.maxCompressionLevel()); + String largePayload = RandomStringUtils.randomAlphabetic(500000); + compressor.compress(largePayload.getBytes()); + compressor.close(); + VeniceException exception = + expectThrows(VeniceException.class, () -> compressor.compress(ByteBuffer.wrap(largePayload.getBytes()), 4)); + assertTrue(exception.getMessage().contains("has been closed")); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientIndividualFeatureConfigurationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientIndividualFeatureConfigurationTest.java index 91740a0ed6..f7f2d1cc5e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientIndividualFeatureConfigurationTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/FastClientIndividualFeatureConfigurationTest.java @@ -186,6 +186,10 @@ public void testServerReadQuota() throws Exception { LOGGER.info("RESTARTING servers"); veniceCluster.stopAndRestartVeniceServer(veniceServerWrapper.getPort()); } + serverMetrics.clear(); + for (int i = 0; i < veniceCluster.getVeniceServers().size(); i++) { + serverMetrics.add(veniceCluster.getVeniceServers().get(i).getMetricsRepository()); + } for (int j = 0; j < 5; j++) { for (int i = 0; i < recordCnt; i++) { String key = keyPrefix + i; @@ -198,7 +202,7 @@ public void testServerReadQuota() throws Exception { quotaRequestedQPSSum += serverMetric.getMetric(readQuotaRequestedQPSString).value(); assertEquals(serverMetric.getMetric(readQuotaAllowedUnintentionally).value(), 0d); } - assertTrue(quotaRequestedQPSSum >= 0, "Quota request sum: " + quotaRequestedQPSSum); + assertTrue(quotaRequestedQPSSum > 0, "Quota request sum: " + quotaRequestedQPSSum); } @Test(timeOut = TIME_OUT) diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandler.java index 72dc406434..444041d59c 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandler.java @@ -173,6 +173,8 @@ public final void init() { for (Version version: versions) { customizedViewRepository.subscribeRoutingDataChange(version.kafkaTopicName(), this); } + // also invoke handle store change to ensure corresponding token bucket and stats are initialized. + handleStoreChanged(store); } this.initializedVolatile = true; } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/stats/ServerReadQuotaUsageStats.java b/services/venice-server/src/main/java/com/linkedin/venice/stats/ServerReadQuotaUsageStats.java index f4e5324638..e2857ebdb9 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/stats/ServerReadQuotaUsageStats.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/stats/ServerReadQuotaUsageStats.java @@ -79,6 +79,14 @@ public void setBackupVersion(int version) { } } + public int getCurrentVersion() { + return currentVersion.get(); + } + + public int getBackupVersion() { + return backupVersion.get(); + } + public void removeVersion(int version) { versionedStats.remove(version); } diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandlerTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandlerTest.java index ccf1e52604..1cae4ef4f3 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandlerTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandlerTest.java @@ -85,6 +85,7 @@ public class ReadQuotaEnforcementHandlerTest { private MetricsRepository metricsRepository; private RouterRequest routerRequest; private VeniceServerGrpcHandler mockNextHandler; + private VeniceServerConfig serverConfig; @BeforeMethod public void setUp() { @@ -97,7 +98,7 @@ public void setUp() { customizedViewRepository = mock(HelixCustomizedViewOfflinePushRepository.class); stats = mock(AggServerQuotaUsageStats.class); metricsRepository = new MetricsRepository(); - VeniceServerConfig serverConfig = mock(VeniceServerConfig.class); + serverConfig = mock(VeniceServerConfig.class); when(serverConfig.getQuotaEnforcementIntervalInMs()).thenReturn(10000); when(serverConfig.getQuotaEnforcementCapacityMultiple()).thenReturn(5); doReturn(nodeCapacity).when(serverConfig).getNodeCapacityInRcu(); @@ -366,6 +367,15 @@ public void testGetRCU() { @Test public void testInitWithPreExistingResource() { + // Do initialize the quota enforcement handler with a real stats for this test + quotaEnforcementHandler = new ReadQuotaEnforcementHandler( + serverConfig, + storeRepository, + CompletableFuture.completedFuture(customizedViewRepository), + thisNodeId, + new AggServerQuotaUsageStats(metricsRepository), + metricsRepository, + clock); String storeName = "testStore"; String topic = Version.composeKafkaTopic(storeName, 1); Version version = mock(Version.class); @@ -373,11 +383,18 @@ public void testInitWithPreExistingResource() { Store store = setUpStoreMock(storeName, 1, Collections.singletonList(version), 100, true); doReturn(store).when(storeRepository).getStore(storeName); doReturn(Collections.singletonList(store)).when(storeRepository).getAllStores(); - + Instance thisInstance = mock(Instance.class); + doReturn(thisNodeId).when(thisInstance).getNodeId(); + Partition partition = setUpPartitionMock(topic, thisInstance, true, 0); + doReturn(0).when(partition).getId(); + PartitionAssignment pa = setUpPartitionAssignmentMock(topic, Collections.singletonList(partition)); + doReturn(pa).when(customizedViewRepository).getPartitionAssignments(topic); quotaEnforcementHandler.init(); verify(storeRepository, atLeastOnce()).registerStoreDataChangedListener(any()); verify(customizedViewRepository, atLeastOnce()).subscribeRoutingDataChange(eq(topic), any()); + Assert.assertEquals(quotaEnforcementHandler.getStats().getStoreStats(storeName).getCurrentVersion(), 1); + Assert.assertEquals(quotaEnforcementHandler.getStats().getStoreStats(storeName).getBackupVersion(), 0); } /**