diff --git a/CMakeLists.txt b/CMakeLists.txt index 13d2c92a9c25..d624044b20a6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -144,6 +144,7 @@ option(VELOX_ENABLE_COMPRESSION_LZ4 "Enable Lz4 compression support." OFF) option(VELOX_ENABLE_COMPRESSION_ZSTD "Enable Zstd compression support." OFF) option(VELOX_ENABLE_COMPRESSION_ZLIB "Enable Zlib/Gzip compression support." OFF) +option(VELOX_ENABLE_COMPRESSION_SNAPPY "Enable Snappy compression support." OFF) option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF) option(VELOX_BUILD_VECTOR_TEST_UTILS "Builds Velox vector test utilities" OFF) @@ -181,6 +182,7 @@ if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR}) set(VELOX_ENABLE_COMPRESSION_LZ4 ON) set(VELOX_ENABLE_COMPRESSION_ZSTD ON) set(VELOX_ENABLE_COMPRESSION_ZLIB ON) + set(VELOX_ENABLE_COMPRESSION_SNAPPY ON) endif() if(${VELOX_BUILD_TESTING}) @@ -197,6 +199,7 @@ if(${VELOX_BUILD_TESTING}) set(VELOX_ENABLE_COMPRESSION_LZ4 ON) set(VELOX_ENABLE_COMPRESSION_ZSTD ON) set(VELOX_ENABLE_COMPRESSION_ZLIB ON) + set(VELOX_ENABLE_COMPRESSION_SNAPPY ON) endif() if(${VELOX_ENABLE_BENCHMARKS}) @@ -502,12 +505,15 @@ if(VELOX_ENABLE_COMPRESSION_ZLIB) find_package(ZLIB REQUIRED) endif() +if(VELOX_ENABLE_COMPRESSION_SNAPPY) + find_package(Snappy REQUIRED) +endif() + if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR}) # DWIO needs all sorts of stream compression libraries. # # TODO: make these optional and pluggable. find_package(lzo2 REQUIRED) - find_package(Snappy REQUIRED) endif() velox_set_source(re2) diff --git a/velox/common/compression/CMakeLists.txt b/velox/common/compression/CMakeLists.txt index 7a4a5ffb3743..381a2a701a1d 100644 --- a/velox/common/compression/CMakeLists.txt +++ b/velox/common/compression/CMakeLists.txt @@ -43,3 +43,10 @@ if(VELOX_ENABLE_COMPRESSION_ZLIB) velox_compile_definitions(velox_common_compression PRIVATE VELOX_ENABLE_COMPRESSION_ZLIB) endif() + +if(VELOX_ENABLE_COMPRESSION_SNAPPY) + velox_sources(velox_common_compression PRIVATE SnappyCompression.cpp) + velox_link_libraries(velox_common_compression PUBLIC Snappy::snappy) + velox_compile_definitions(velox_common_compression + PRIVATE VELOX_ENABLE_COMPRESSION_SNAPPY) +endif() diff --git a/velox/common/compression/Compression.cpp b/velox/common/compression/Compression.cpp index 730a3cdc68d8..2634d22a3a09 100644 --- a/velox/common/compression/Compression.cpp +++ b/velox/common/compression/Compression.cpp @@ -25,6 +25,9 @@ #ifdef VELOX_ENABLE_COMPRESSION_ZLIB #include "velox/common/compression/ZlibCompression.h" #endif +#ifdef VELOX_ENABLE_COMPRESSION_SNAPPY +#include "velox/common/compression/SnappyCompression.h" +#endif #include @@ -117,6 +120,10 @@ bool Codec::supportsGetUncompressedLength(CompressionKind kind) { #ifdef VELOX_ENABLE_COMPRESSION_ZSTD case CompressionKind::CompressionKind_ZSTD: return true; +#endif +#ifdef VELOX_ENABLE_COMPRESSION_SNAPPY + case CompressionKind::CompressionKind_SNAPPY: + return true; #endif default: return false; @@ -221,6 +228,11 @@ Expected> Codec::create( codec = makeGzipCodec(compressionLevel); break; } +#endif +#ifdef VELOX_ENABLE_COMPRESSION_SNAPPY + case CompressionKind::CompressionKind_SNAPPY: + codec = makeSnappyCodec(); + break; #endif default: break; @@ -260,6 +272,10 @@ bool Codec::isAvailable(CompressionKind kind) { [[fallthrough]]; case CompressionKind::CompressionKind_GZIP: return true; +#endif +#ifdef VELOX_ENABLE_COMPRESSION_SNAPPY + case CompressionKind::CompressionKind_SNAPPY: + return true; #endif default: return false; diff --git a/velox/common/compression/SnappyCompression.cpp b/velox/common/compression/SnappyCompression.cpp new file mode 100644 index 000000000000..972a7b2de8a5 --- /dev/null +++ b/velox/common/compression/SnappyCompression.cpp @@ -0,0 +1,110 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/common/compression/SnappyCompression.h" +#include "velox/common/base/Exceptions.h" + +namespace facebook::velox::common { + +uint64_t SnappyCodec::maxCompressedLength(uint64_t inputLength) { + DCHECK_GE(inputLength, 0); + return static_cast( + snappy::MaxCompressedLength(static_cast(inputLength))); +} + +Expected SnappyCodec::compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + size_t output_size; + snappy::RawCompress( + reinterpret_cast(input), + static_cast(inputLength), + reinterpret_cast(output), + &output_size); + return static_cast(output_size); +} + +Expected SnappyCodec::decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) { + size_t decompressedSize; + VELOX_RETURN_UNEXPECTED_IF( + !snappy::GetUncompressedLength( + reinterpret_cast(input), + static_cast(inputLength), + &decompressedSize), + Status::IOError("Corrupt snappy compressed data.")); + VELOX_RETURN_UNEXPECTED_IF( + outputLength < decompressedSize, + Status::IOError("Output length is too small")); + VELOX_RETURN_UNEXPECTED_IF( + !snappy::RawUncompress( + reinterpret_cast(input), + static_cast(inputLength), + reinterpret_cast(output)), + Status::IOError("Corrupt snappy compressed data.")); + return static_cast(decompressedSize); +} + +Expected> +SnappyCodec::makeStreamingCompressor() { + return folly::makeUnexpected( + Status::NotImplemented("Streaming compression unsupported with Snappy")); +} + +Expected> +SnappyCodec::makeStreamingDecompressor() { + return folly::makeUnexpected(Status::NotImplemented( + "Streaming decompression unsupported with Snappy")); +} + +CompressionKind SnappyCodec::compressionKind() const { + return CompressionKind_SNAPPY; +} + +int32_t SnappyCodec::minimumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t SnappyCodec::maximumCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +int32_t SnappyCodec::defaultCompressionLevel() const { + return kUseDefaultCompressionLevel; +} + +std::optional SnappyCodec::getUncompressedLength( + const uint8_t* input, + uint64_t inputLength) const { + size_t decompressedSize; + if (!snappy::GetUncompressedLength( + reinterpret_cast(input), + static_cast(inputLength), + &decompressedSize)) { + return std::nullopt; + } + return static_cast(decompressedSize); +} + +std::unique_ptr makeSnappyCodec() { + return std::make_unique(); +} +} // namespace facebook::velox::common diff --git a/velox/common/compression/SnappyCompression.h b/velox/common/compression/SnappyCompression.h new file mode 100644 index 000000000000..dceef73cfc73 --- /dev/null +++ b/velox/common/compression/SnappyCompression.h @@ -0,0 +1,64 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Derived from Apache Arrow. + +#include +#include +#include +#include +#include "velox/common/compression/Compression.h" + +namespace facebook::velox::common { + +class SnappyCodec : public Codec { + public: + uint64_t maxCompressedLength(uint64_t inputLength) override; + + Expected compress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + Expected decompress( + const uint8_t* input, + uint64_t inputLength, + uint8_t* output, + uint64_t outputLength) override; + + Expected> makeStreamingCompressor() + override; + + Expected> makeStreamingDecompressor() + override; + + CompressionKind compressionKind() const override; + + int32_t minimumCompressionLevel() const override; + + int32_t maximumCompressionLevel() const override; + + int32_t defaultCompressionLevel() const override; + + std::optional getUncompressedLength( + const uint8_t* input, + uint64_t inputLength) const override; +}; + +std::unique_ptr makeSnappyCodec(); + +} // namespace facebook::velox::common diff --git a/velox/common/compression/tests/CompressionTest.cpp b/velox/common/compression/tests/CompressionTest.cpp index 5029aeb45625..98e306891a50 100644 --- a/velox/common/compression/tests/CompressionTest.cpp +++ b/velox/common/compression/tests/CompressionTest.cpp @@ -525,6 +525,11 @@ INSTANTIATE_TEST_SUITE_P( CodecTest, ::testing::ValuesIn(generateZlibTestParams())); +INSTANTIATE_TEST_SUITE_P( + TestSnappy, + CodecTest, + ::testing::Values(CompressionKind::CompressionKind_SNAPPY)); + TEST(CodecLZ4HadoopTest, compatibility) { // LZ4 Hadoop codec should be able to read back LZ4 raw blocks. auto c1 = Codec::create(