diff --git a/CMakeLists.txt b/CMakeLists.txt index 1e0f8e943d084..019adcd29219b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -116,6 +116,7 @@ option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF) option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF) option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF) option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON) +option(VELOX_ENABLE_COMPRESSION_LZ4 "Enable Lz4 compression support." OFF) option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF) option(VELOX_BUILD_VECTOR_TEST_UTILS "Builds Velox vector test utilities" OFF) @@ -148,6 +149,10 @@ if(${VELOX_BUILD_MINIMAL} OR ${VELOX_BUILD_MINIMAL_WITH_DWIO}) set(VELOX_ENABLE_SUBSTRAIT OFF) endif() +if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR}) + set(VELOX_ENABLE_COMPRESSION_LZ4 ON) +endif() + if(${VELOX_BUILD_TESTING}) # Enable all components to build testing binaries set(VELOX_ENABLE_PRESTO_FUNCTIONS ON) @@ -159,6 +164,7 @@ if(${VELOX_BUILD_TESTING}) set(VELOX_ENABLE_SPARK_FUNCTIONS ON) set(VELOX_ENABLE_EXAMPLES ON) set(VELOX_ENABLE_PARQUET ON) + set(VELOX_ENABLE_COMPRESSION_LZ4 ON) endif() if(${VELOX_ENABLE_BENCHMARKS}) @@ -272,6 +278,10 @@ if(VELOX_ENABLE_PARQUET) set(VELOX_ENABLE_ARROW ON) endif() +if(VELOX_ENABLE_COMPRESSION_LZ4) + add_definitions(-DVELOX_ENABLE_COMPRESSION_LZ4) +endif() + # make buildPartitionBounds_ a vector int64 instead of int32 to avoid integer # overflow if(${VELOX_ENABLE_INT64_BUILD_PARTITION_BOUND}) @@ -450,7 +460,9 @@ resolve_dependency(glog) set_source(fmt) resolve_dependency(fmt 9.0.0) -find_package(lz4 REQUIRED) +if(VELOX_ENABLE_COMPRESSION_LZ4) + find_package(lz4 REQUIRED) +endif() if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR}) # DWIO needs all sorts of stream compression libraries. diff --git a/velox/common/compression/CMakeLists.txt b/velox/common/compression/CMakeLists.txt index ef7ae4af3a4c0..c94c86a617140 100644 --- a/velox/common/compression/CMakeLists.txt +++ b/velox/common/compression/CMakeLists.txt @@ -16,13 +16,17 @@ if(${VELOX_BUILD_TESTING}) add_subdirectory(tests) endif() -velox_add_library( - velox_common_compression - Compression.cpp - LzoDecompressor.cpp - Lz4Compression.cpp - HadoopCompressionFormat.cpp) +set(VELOX_COMMON_COMPRESSION_SRCS Compression.cpp LzoDecompressor.cpp) +set(VELOX_COMMON_COMPRESSION_LINK_LIBS velox_status Folly::folly) + +if(VELOX_ENABLE_COMPRESSION_LZ4) + list(APPEND VELOX_COMMON_COMPRESSION_SRCS Lz4Compression.cpp + HadoopCompressionFormat.cpp) + list(APPEND VELOX_COMMON_COMPRESSION_LINK_LIBS lz4::lz4) +endif() + +velox_add_library(velox_common_compression ${VELOX_COMMON_COMPRESSION_SRCS}) velox_link_libraries( velox_common_compression - PUBLIC velox_status Folly::folly lz4::lz4 + PUBLIC ${VELOX_COMMON_COMPRESSION_LINK_LIBS} PRIVATE velox_exception) diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index 2e13a0cc600a4..381bc29a64d61 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -16,7 +16,9 @@ #include "velox/common/compression/Compression.h" #include "velox/common/base/Exceptions.h" +#ifdef VELOX_ENABLE_COMPRESSION_LZ4 #include "velox/common/compression/Lz4Compression.h" +#endif #include @@ -109,8 +111,10 @@ bool Codec::supportsGetUncompressedLength(CompressionKind kind) { bool Codec::supportsStreamingCompression(CompressionKind kind) { switch (kind) { +#ifdef VELOX_ENABLE_COMPRESSION_LZ4 case CompressionKind::CompressionKind_LZ4: return true; +#endif default: return false; } @@ -121,7 +125,7 @@ bool Codec::supportsCompressFixedLength(CompressionKind kind) { return false; } -folly::Expected, Status> Codec::create( +Expected> Codec::create( CompressionKind kind, const CodecOptions& codecOptions) { if (!isAvailable(kind)) { @@ -137,6 +141,7 @@ folly::Expected, Status> Codec::create( auto compressionLevel = codecOptions.compressionLevel; std::unique_ptr codec; switch (kind) { +#ifdef VELOX_ENABLE_COMPRESSION_LZ4 case CompressionKind::CompressionKind_LZ4: if (auto options = dynamic_cast(&codecOptions)) { switch (options->type) { @@ -154,6 +159,7 @@ folly::Expected, Status> Codec::create( // By default, create LZ4 Frame codec. codec = makeLz4FrameCodec(compressionLevel); break; +#endif default: break; } @@ -169,7 +175,7 @@ folly::Expected, Status> Codec::create( return codec; } -folly::Expected, Status> Codec::create( +Expected> Codec::create( CompressionKind kind, int32_t compressionLevel) { return create(kind, CodecOptions{compressionLevel}); @@ -178,13 +184,11 @@ folly::Expected, Status> Codec::create( bool Codec::isAvailable(CompressionKind kind) { switch (kind) { case CompressionKind::CompressionKind_NONE: + return true; +#ifdef VELOX_ENABLE_COMPRESSION_LZ4 case CompressionKind::CompressionKind_LZ4: return true; - case CompressionKind::CompressionKind_SNAPPY: - case CompressionKind::CompressionKind_GZIP: - case CompressionKind::CompressionKind_ZLIB: - case CompressionKind::CompressionKind_ZSTD: - case CompressionKind::CompressionKind_LZO: +#endif default: return false; } @@ -196,7 +200,7 @@ std::optional Codec::getUncompressedLength( return std::nullopt; } -folly::Expected Codec::compressFixedLength( +Expected Codec::compressFixedLength( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -205,13 +209,13 @@ folly::Expected Codec::compressFixedLength( Status::Invalid("'{}' doesn't support fixed-length compression", name())); } -folly::Expected, Status> +Expected> Codec::makeStreamingCompressor() { return folly::makeUnexpected(Status::Invalid( "Streaming compression is unsupported with {} format.", name())); } -folly::Expected, Status> +Expected> Codec::makeStreamingDecompressor() { return folly::makeUnexpected(Status::Invalid( "Streaming decompression is unsupported with {} format.", name())); diff --git a/velox/common/compression/Compression.h b/velox/common/compression/Compression.h index 509b102066d4f..bf9c4b46a1a43 100644 --- a/velox/common/compression/Compression.h +++ b/velox/common/compression/Compression.h @@ -96,7 +96,7 @@ class StreamingCompressor { /// Compress some input. /// If CompressResult.outputTooSmall is true on return, then a larger output /// buffer should be supplied. - virtual folly::Expected compress( + virtual Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -105,7 +105,7 @@ class StreamingCompressor { /// Flush part of the compressed output. /// If FlushResult.outputTooSmall is true on return, flush() should be called /// again with a larger buffer. - virtual folly::Expected flush( + virtual Expected flush( uint8_t* output, uint64_t outputLength) = 0; @@ -113,7 +113,7 @@ class StreamingCompressor { /// If EndResult.outputTooSmall is true on return, end() should be called /// again with a larger buffer. Otherwise, the StreamingCompressor should not /// be used anymore. end() will flush the compressed output. - virtual folly::Expected end( + virtual Expected end( uint8_t* output, uint64_t outputLength) = 0; }; @@ -131,7 +131,7 @@ class StreamingDecompressor { /// Decompress some input. /// If outputTooSmall is true on return, a larger output buffer needs /// to be supplied. - virtual folly::Expected decompress( + virtual Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -158,12 +158,12 @@ class Codec { virtual ~Codec() = default; // Create a kind for the given compression algorithm with CodecOptions. - static folly::Expected, Status> create( + static Expected> create( CompressionKind kind, const CodecOptions& codecOptions = CodecOptions{}); // Create a kind for the given compression algorithm. - static folly::Expected, Status> create( + static Expected> create( CompressionKind kind, int32_t compressionLevel); @@ -202,7 +202,7 @@ class Codec { /// Note: One-shot compression is not always compatible with streaming /// decompression. Depending on the codec (e.g. LZ4), different formats may /// be used. - virtual folly::Expected compress( + virtual Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -214,7 +214,7 @@ class Codec { /// Note: One-shot decompression is not always compatible with streaming /// compression. Depending on the codec (e.g. LZ4), different formats may /// be used. - virtual folly::Expected decompress( + virtual Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -229,7 +229,7 @@ class Codec { /// function. This is useful when fixed-length compression blocks are required /// by the caller. /// Note: Only Gzip and Zstd codec supports this function. - virtual folly::Expected compressFixedLength( + virtual Expected compressFixedLength( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -247,11 +247,11 @@ class Codec { const uint8_t* input) const; // Create a streaming compressor instance. - virtual folly::Expected, Status> + virtual Expected> makeStreamingCompressor(); // Create a streaming compressor instance. - virtual folly::Expected, Status> + virtual Expected> makeStreamingDecompressor(); // This Codec's compression type. diff --git a/velox/common/compression/HadoopCompressionFormat.cpp b/velox/common/compression/HadoopCompressionFormat.cpp index 9b2983c2d0cff..91731fc226eab 100644 --- a/velox/common/compression/HadoopCompressionFormat.cpp +++ b/velox/common/compression/HadoopCompressionFormat.cpp @@ -27,16 +27,6 @@ bool HadoopCompressionFormat::tryDecompressHadoop( uint8_t* output, uint64_t outputLength, uint64_t& actualDecompressedSize) { - // Parquet files written with the Hadoop Lz4RawCodec use their own framing. - // The input buffer can contain an arbitrary number of "frames", each - // with the following structure: - // - bytes 0..3: big-endian uint32_t representing the frame decompressed - // size - // - bytes 4..7: big-endian uint32_t representing the frame compressed size - // - bytes 8...: frame compressed data - // - // The Hadoop Lz4Codec source code can be found here: - // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc uint64_t totalDecompressedSize = 0; while (inputLength >= kPrefixLength) { diff --git a/velox/common/compression/HadoopCompressionFormat.h b/velox/common/compression/HadoopCompressionFormat.h index f58a118d13872..39bdb960cb9bc 100644 --- a/velox/common/compression/HadoopCompressionFormat.h +++ b/velox/common/compression/HadoopCompressionFormat.h @@ -22,8 +22,18 @@ namespace facebook::velox::common { +/// Parquet files written with the Hadoop compression codecs use their own +/// framing. +/// The input buffer can contain an arbitrary number of "frames", each +/// with the following structure: +/// - bytes 0..3: big-endian uint32_t representing the frame decompressed +/// size +/// - bytes 4..7: big-endian uint32_t representing the frame compressed size +/// - bytes 8...: frame compressed data class HadoopCompressionFormat { protected: + /// Try to decompress input data in Hadoop's compression format. + /// Returns true if decompression is successful, false otherwise. bool tryDecompressHadoop( const uint8_t* input, uint64_t inputLength, @@ -31,7 +41,10 @@ class HadoopCompressionFormat { uint64_t outputLength, uint64_t& actualDecompressedSize); - virtual folly::Expected decompressInternal( + /// Called by tryDecompressHadoop to decompress a single frame and + /// should be implemented based on the specific compression format. + /// E.g. Lz4HadoopCodec uses Lz4RawCodec::decompress to decompress a frame. + virtual Expected decompressInternal( const uint8_t* input, uint64_t inputLength, uint8_t* output, diff --git a/velox/common/compression/Lz4Compression.cpp b/velox/common/compression/Lz4Compression.cpp index 872dfbb8c2f73..53f441d66ceb8 100644 --- a/velox/common/compression/Lz4Compression.cpp +++ b/velox/common/compression/Lz4Compression.cpp @@ -54,18 +54,15 @@ class LZ4Compressor : public StreamingCompressor { Status init(); - folly::Expected compress( + Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - folly::Expected flush( - uint8_t* output, - uint64_t outputLength) override; + Expected flush(uint8_t* output, uint64_t outputLength) override; - folly::Expected end(uint8_t* output, uint64_t outputLength) - override; + Expected end(uint8_t* output, uint64_t outputLength) override; protected: Status @@ -87,7 +84,7 @@ class LZ4Decompressor : public StreamingDecompressor { } } - folly::Expected decompress( + Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -123,8 +120,7 @@ Status LZ4Compressor::init() { return Status::OK(); } -folly::Expected -LZ4Compressor::compress( +Expected LZ4Compressor::compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -157,7 +153,7 @@ LZ4Compressor::compress( return CompressResult{inputLength, bytesWritten, false}; } -folly::Expected LZ4Compressor::flush( +Expected LZ4Compressor::flush( uint8_t* output, uint64_t outputLength) { auto outputSize = static_cast(outputLength); @@ -188,7 +184,7 @@ folly::Expected LZ4Compressor::flush( return FlushResult{bytesWritten, false}; } -folly::Expected LZ4Compressor::end( +Expected LZ4Compressor::end( uint8_t* output, uint64_t outputLength) { auto outputSize = static_cast(outputLength); @@ -258,8 +254,7 @@ Status LZ4Decompressor::reset() { #endif } -folly::Expected -LZ4Decompressor::decompress( +Expected LZ4Decompressor::decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -321,7 +316,7 @@ uint64_t Lz4FrameCodec::maxCompressedLength(uint64_t inputLen) { LZ4F_compressFrameBound(static_cast(inputLen), &prefs_)); } -folly::Expected Lz4FrameCodec::compress( +Expected Lz4FrameCodec::compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -337,13 +332,13 @@ folly::Expected Lz4FrameCodec::compress( return static_cast(ret); } -folly::Expected Lz4FrameCodec::decompress( +Expected Lz4FrameCodec::decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) { return makeStreamingDecompressor().then( - [&](const auto& decompressor) -> folly::Expected { + [&](const auto& decompressor) -> Expected { uint64_t bytesWritten = 0; while (!decompressor->isFinished() && inputLength != 0) { auto maybeResult = decompressor->decompress( @@ -368,14 +363,14 @@ folly::Expected Lz4FrameCodec::decompress( }); } -folly::Expected, Status> +Expected> Lz4FrameCodec::makeStreamingCompressor() { auto ptr = std::make_shared(compressionLevel_); VELOX_RETURN_UNEXPECTED_NOT_OK(ptr->init()); return ptr; } -folly::Expected, Status> +Expected> Lz4FrameCodec::makeStreamingDecompressor() { auto ptr = std::make_shared(); VELOX_RETURN_UNEXPECTED_NOT_OK(ptr->init()); @@ -390,7 +385,7 @@ uint64_t Lz4RawCodec::maxCompressedLength(uint64_t inputLength) { LZ4_compressBound(static_cast(inputLength))); } -folly::Expected Lz4RawCodec::compress( +Expected Lz4RawCodec::compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -420,7 +415,7 @@ folly::Expected Lz4RawCodec::compress( return static_cast(compressedSize); } -folly::Expected Lz4RawCodec::decompress( +Expected Lz4RawCodec::decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -441,7 +436,7 @@ uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) { return kPrefixLength + Lz4RawCodec::maxCompressedLength(inputLength); } -folly::Expected Lz4HadoopCodec::compress( +Expected Lz4HadoopCodec::compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -469,7 +464,7 @@ folly::Expected Lz4HadoopCodec::compress( }); } -folly::Expected Lz4HadoopCodec::decompress( +Expected Lz4HadoopCodec::decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -496,7 +491,7 @@ int32_t Lz4HadoopCodec::defaultCompressionLevel() const { return kUseDefaultCompressionLevel; } -folly::Expected Lz4HadoopCodec::decompressInternal( +Expected Lz4HadoopCodec::decompressInternal( const uint8_t* input, uint64_t inputLength, uint8_t* output, diff --git a/velox/common/compression/Lz4Compression.h b/velox/common/compression/Lz4Compression.h index 7c0328ce28d68..c91af85082d2b 100644 --- a/velox/common/compression/Lz4Compression.h +++ b/velox/common/compression/Lz4Compression.h @@ -60,23 +60,23 @@ class Lz4FrameCodec : public Lz4CodecBase { uint64_t maxCompressedLength(uint64_t inputLength) override; - folly::Expected compress( + Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - folly::Expected decompress( + Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - folly::Expected, Status> - makeStreamingCompressor() override; + Expected> makeStreamingCompressor() + override; - folly::Expected, Status> - makeStreamingDecompressor() override; + Expected> makeStreamingDecompressor() + override; protected: const LZ4F_preferences_t prefs_; @@ -88,32 +88,34 @@ class Lz4RawCodec : public Lz4CodecBase { uint64_t maxCompressedLength(uint64_t inputLength) override; - folly::Expected compress( + Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - folly::Expected decompress( + Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; }; +/// The Hadoop Lz4Codec source code can be found here: +/// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { public: Lz4HadoopCodec(); uint64_t maxCompressedLength(uint64_t inputLength) override; - folly::Expected compress( + Expected compress( const uint8_t* input, uint64_t inputLength, uint8_t* output, uint64_t outputLength) override; - folly::Expected decompress( + Expected decompress( const uint8_t* input, uint64_t inputLength, uint8_t* output, @@ -126,7 +128,7 @@ class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat { int32_t defaultCompressionLevel() const override; private: - folly::Expected decompressInternal( + Expected decompressInternal( const uint8_t* input, uint64_t inputLength, uint8_t* output,