From 09600c77c62494dc9623d3b22b82077c0ba8c9b5 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Thu, 27 Feb 2025 13:41:14 +0000 Subject: [PATCH] address comments & refine test --- velox/common/compression/Compression.cpp | 30 +++------ velox/common/compression/Compression.h | 41 +++++++----- velox/common/compression/Lz4Compression.cpp | 16 ++++- velox/common/compression/Lz4Compression.h | 10 ++- .../compression/tests/CompressionTest.cpp | 63 ++++++++++++------- 5 files changed, 95 insertions(+), 65 deletions(-) diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index 16151a4e17c6..08f4df055eae 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -112,17 +112,6 @@ bool Codec::supportsGetUncompressedLength(CompressionKind kind) { return false; } -bool Codec::supportsStreamingCompression(CompressionKind kind) { - switch (kind) { -#ifdef VELOX_ENABLE_COMPRESSION_LZ4 - case CompressionKind_LZ4: - return true; -#endif - default: - return false; - } -} - bool Codec::supportsCompressFixedLength(CompressionKind kind) { // TODO: Return true if it's supported by compression kind. return false; @@ -133,9 +122,6 @@ Expected> Codec::create( const CodecOptions& codecOptions) { if (!isAvailable(kind)) { auto name = compressionKindToString(kind); - VELOX_RETURN_UNEXPECTED_IF( - folly::StringPiece({name}).startsWith("unknown"), - Status::Invalid("Unrecognized codec: ", name)); return folly::makeUnexpected(Status::Invalid( "Support for codec '{}' is either not built or not implemented.", name)); @@ -158,9 +144,10 @@ Expected> Codec::create( codec = makeLz4HadoopCodec(); break; } + } else { + // By default, create LZ4 Frame codec. + codec = makeLz4FrameCodec(compressionLevel); } - // By default, create LZ4 Frame codec. - codec = makeLz4FrameCodec(compressionLevel); break; #endif default: @@ -185,8 +172,6 @@ Expected> Codec::create( bool Codec::isAvailable(CompressionKind kind) { switch (kind) { - case CompressionKind_NONE: - return true; #ifdef VELOX_ENABLE_COMPRESSION_LZ4 case CompressionKind_LZ4: return true; @@ -196,10 +181,11 @@ bool Codec::isAvailable(CompressionKind kind) { } } -std::optional Codec::getUncompressedLength( +Expected Codec::getUncompressedLength( const uint8_t* input, uint64_t inputLength) const { - return std::nullopt; + return folly::makeUnexpected(Status::Invalid( + "getUncompressedLength is unsupported with {} format.", name())); } Expected Codec::compressFixedLength( @@ -211,6 +197,10 @@ Expected Codec::compressFixedLength( Status::Invalid("'{}' doesn't support fixed-length compression", name())); } +bool Codec::supportsStreamingCompression() const { + return false; +} + Expected> Codec::makeStreamingCompressor() { return folly::makeUnexpected(Status::Invalid( diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index b0bb7bb81aec..c29d2196e0a6 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -73,7 +73,7 @@ class StreamingCompressor { /// Compress some input. /// If CompressResult.outputTooSmall is true on return, compress() should be - /// called again with a larger output buffer. + /// called again with a larger output buffer, such as doubling its size. virtual Expected compress( const uint8_t* input, uint64_t inputLength, @@ -82,15 +82,16 @@ class StreamingCompressor { /// Flush part of the compressed output. /// If FlushResult.outputTooSmall is true on return, flush() should be called - /// again with a larger output buffer. + /// again with a larger output buffer, such as doubling its size. virtual Expected flush( uint8_t* output, uint64_t outputLength) = 0; - /// End compressing, doing whatever is necessary to end the stream. + /// End compressing, doing whatever is necessary to end the stream, and + /// flushing the compressed output. /// If EndResult.outputTooSmall is true on return, end() should be called - /// again with a larger output buffer. Otherwise, the StreamingCompressor - /// should not be used anymore. end() will flush the compressed output. + /// again with a larger output buffer, such as doubling its size. + /// Otherwise, the StreamingCompressor should not be used anymore. virtual Expected end(uint8_t* output, uint64_t outputLength) = 0; }; @@ -106,7 +107,7 @@ class StreamingDecompressor { /// Decompress some input. /// If DecompressResult.outputTooSmall is true on return, decompress() should - /// be called again with a larger output buffer. + /// be called again with a larger output buffer, such as doubling its size. virtual Expected decompress( const uint8_t* input, uint64_t inputLength, @@ -154,9 +155,6 @@ class Codec { /// compressed length. static bool supportsCompressFixedLength(CompressionKind kind); - // Return true if indicated kind supports creating streaming de/compressor. - static bool supportsStreamingCompression(CompressionKind kind); - /// Return the smallest supported compression level. /// If the codec doesn't support compression level, /// `kUseDefaultCompressionLevel` will be returned. @@ -214,19 +212,28 @@ class Codec { // Maximum compressed length of given input length. virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0; - /// Retrieves the actual uncompressed length of data using the specified - /// compression library. - /// Note: This functionality is not universally supported by all compression - /// libraries. If not supported, `std::nullopt` will be returned. - virtual std::optional getUncompressedLength( + /// Retrieves the uncompressed length of the given compressed data using the + /// specified compression library. + /// If the input data is corrupted, returns `Unexpected` with + /// `Status::IOError`. Not all compression libraries support this + /// functionality. Use supportsGetUncompressedLength() to check before + /// calling. If unsupported, returns `Unexpected` with `Status::Invalid`. + virtual Expected getUncompressedLength( const uint8_t* input, uint64_t inputLength) const; - // Create a streaming compressor instance. + // Return true if indicated kind supports creating streaming de/compressor. + virtual bool supportsStreamingCompression() const; + + /// Create a streaming compressor instance. + /// Use supportsStreamingCompression() to check before calling. + /// If unsupported, returns `Unexpected` with `Status::Invalid`. virtual Expected> makeStreamingCompressor(); - // Create a streaming compressor instance. + /// Create a streaming decompressor instance. + /// Use supportsStreamingCompression() to check before calling. + /// If unsupported, returns `Unexpected` with `Status::Invalid`. virtual Expected> makeStreamingDecompressor(); @@ -237,7 +244,7 @@ class Codec { virtual int32_t compressionLevel() const; // The name of this Codec's compression type. - std::string name() const; + virtual std::string name() const; private: // Initializes the codec's resources. diff --git a/velox/common/compression/Lz4Compression.cpp b/velox/common/compression/Lz4Compression.cpp index 463e2963b634..8a991521114d 100644 --- a/velox/common/compression/Lz4Compression.cpp +++ b/velox/common/compression/Lz4Compression.cpp @@ -230,7 +230,7 @@ Status LZ4Compressor::compressBegin( return Status::OK(); } -Status common::LZ4Decompressor::init() { +Status LZ4Decompressor::init() { finished_ = false; auto ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION); VELOX_RETURN_IF(LZ4F_isError(ret), lz4Error("LZ4 init failed: ", ret)); @@ -304,7 +304,7 @@ int32_t Lz4CodecBase::compressionLevel() const { } CompressionKind Lz4CodecBase::compressionKind() const { - return CompressionKind::CompressionKind_LZ4; + return CompressionKind_LZ4; } Lz4FrameCodec::Lz4FrameCodec(int32_t compressionLevel) @@ -363,6 +363,10 @@ Expected Lz4FrameCodec::decompress( }); } +bool Lz4FrameCodec::supportsStreamingCompression() const { + return true; +} + Expected> Lz4FrameCodec::makeStreamingCompressor() { auto ptr = std::make_shared(compressionLevel_); @@ -430,6 +434,10 @@ Expected Lz4RawCodec::decompress( return static_cast(decompressedSize); } +std::string Lz4RawCodec::name() const { + return "lz4_raw"; +} + Lz4HadoopCodec::Lz4HadoopCodec() : Lz4RawCodec(kLz4DefaultCompressionLevel) {} uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) { @@ -491,6 +499,10 @@ int32_t Lz4HadoopCodec::defaultCompressionLevel() const { return kUseDefaultCompressionLevel; } +std::string Lz4HadoopCodec::name() const { + return "lz4_hadoop"; +} + Expected Lz4HadoopCodec::decompressInternal( const uint8_t* input, uint64_t inputLength, diff --git a/velox/common/compression/Lz4Compression.h b/velox/common/compression/Lz4Compression.h index 0d0f238d979d..1b4edc015ec0 100644 --- a/velox/common/compression/Lz4Compression.h +++ b/velox/common/compression/Lz4Compression.h @@ -29,11 +29,11 @@ struct Lz4CodecOptions : CodecOptions { enum Type { kLz4Frame, kLz4Raw, kLz4Hadoop }; Lz4CodecOptions( - Lz4CodecOptions::Type type, + Type type, int32_t compressionLevel = kUseDefaultCompressionLevel) : CodecOptions(compressionLevel), type(type) {} - Lz4CodecOptions::Type type; + Type type; }; class Lz4CodecBase : public Codec { @@ -72,6 +72,8 @@ class Lz4FrameCodec : public Lz4CodecBase { uint8_t* output, uint64_t outputLength) override; + bool supportsStreamingCompression() const override; + Expected> makeStreamingCompressor() override; @@ -99,6 +101,8 @@ class Lz4RawCodec : public Lz4CodecBase { uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; + + std::string name() const override; }; /// The Hadoop Lz4Codec source code can be found here: @@ -127,6 +131,8 @@ class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { int32_t defaultCompressionLevel() const override; + std::string name() const override; + private: Expected decompressInternal( const uint8_t* input, diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index 7be796a62225..3a4fe3891031 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -57,9 +57,10 @@ std::vector generateLz4TestParams() { Lz4CodecOptions::kLz4Frame, Lz4CodecOptions::kLz4Hadoop}) { params.emplace_back( - CompressionKind::CompressionKind_LZ4, - std::make_shared(type)); + CompressionKind_LZ4, std::make_shared(type)); } + // Add default CodecOptions. + params.emplace_back(CompressionKind_LZ4); return params; } @@ -386,14 +387,8 @@ class CodecTest : public ::testing::TestWithParam { TEST_P(CodecTest, specifyCompressionLevel) { std::vector data = makeRandomData(2000); const auto kind = getCompressionKind(); - if (!Codec::isAvailable(kind)) { - // Support for this codec hasn't been built. - VELOX_ASSERT_THROW( - Codec::create(kind, kUseDefaultCompressionLevel), - "Support for codec '" + compressionKindToString(kind) + - "' is either not built or not implemented."); - return; - } + ASSERT_TRUE(Codec::isAvailable(kind)); + auto codecDefault = Codec::create(kind).thenOrThrow(folly::identity, throwsNotOk); checkCodecRoundtrip(codecDefault, data); @@ -421,14 +416,22 @@ TEST_P(CodecTest, getUncompressedLength) { compressed.resize(compressedLength); if (Codec::supportsGetUncompressedLength(getCompressionKind())) { - ASSERT_EQ( - codec->getUncompressedLength(compressed.data(), compressedLength), - inputLength); + auto uncompressedLength = + codec->getUncompressedLength(compressed.data(), compressedLength) + .thenOrThrow(folly::identity, throwsNotOk); + ASSERT_EQ(uncompressedLength, inputLength); } else { - ASSERT_EQ( - codec->getUncompressedLength(compressed.data(), compressedLength), - std::nullopt); + VELOX_ASSERT_ERROR_STATUS( + codec->getUncompressedLength(compressed.data(), compressedLength) + .error(), + StatusCode::kInvalid, + fmt::format( + "getUncompressedLength is unsupported with {} format.", + codec->name())); } + + // TODO: For codecs that support getUncompressedLength(), verify the error + // message for corrupted data. } TEST_P(CodecTest, codecRoundtrip) { @@ -440,47 +443,47 @@ TEST_P(CodecTest, codecRoundtrip) { } TEST_P(CodecTest, streamingCompressor) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { + const auto codec = makeCodec(); + if (!codec->supportsStreamingCompression()) { return; } for (auto dataSize : {0, 10, 10000, 100000}) { - auto codec = makeCodec(); checkStreamingCompressor(codec.get(), makeRandomData(dataSize)); checkStreamingCompressor(codec.get(), makeCompressibleData(dataSize)); } } TEST_P(CodecTest, streamingDecompressor) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { + const auto codec = makeCodec(); + if (!codec->supportsStreamingCompression()) { return; } for (auto dataSize : {0, 10, 10000, 100000}) { - auto codec = makeCodec(); checkStreamingDecompressor(codec.get(), makeRandomData(dataSize)); checkStreamingDecompressor(codec.get(), makeCompressibleData(dataSize)); } } TEST_P(CodecTest, streamingRoundtrip) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { + const auto codec = makeCodec(); + if (!codec->supportsStreamingCompression()) { return; } for (auto dataSize : {0, 10, 10000, 100000}) { - auto codec = makeCodec(); checkStreamingRoundtrip(codec.get(), makeRandomData(dataSize)); checkStreamingRoundtrip(codec.get(), makeCompressibleData(dataSize)); } } TEST_P(CodecTest, streamingDecompressorReuse) { - if (!Codec::supportsStreamingCompression(getCompressionKind())) { + const auto codec = makeCodec(); + if (!codec->supportsStreamingCompression()) { return; } - auto codec = makeCodec(); const auto& decompressor = makeStreamingDecompressor(codec.get()); checkStreamingRoundtrip( makeStreamingCompressor(codec.get()), decompressor, makeRandomData(100)); @@ -512,4 +515,16 @@ TEST(CodecLZ4HadoopTest, compatibility) { checkCodecRoundtrip(c1, c2, makeRandomData(dataSize)); } } + +TEST(CodecTestInvalid, invalidKind) { + CompressionKind kind = CompressionKind_NONE; + ASSERT_FALSE(Codec::isAvailable(kind)); + + VELOX_ASSERT_ERROR_STATUS( + Codec::create(kind, kUseDefaultCompressionLevel).error(), + StatusCode::kInvalid, + fmt::format( + "Support for codec '{}' is either not built or not implemented.", + compressionKindToString(kind))); +} } // namespace facebook::velox::common