Skip to content

Commit

Permalink
add snappy codec
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Dec 11, 2024
1 parent 6d58cc0 commit 2dc70d5
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 1 deletion.
8 changes: 7 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ option(VELOX_ENABLE_COMPRESSION_LZ4 "Enable Lz4 compression support." OFF)
option(VELOX_ENABLE_COMPRESSION_ZSTD "Enable Zstd compression support." OFF)
option(VELOX_ENABLE_COMPRESSION_ZLIB "Enable Zlib/Gzip compression support."
OFF)
option(VELOX_ENABLE_COMPRESSION_SNAPPY "Enable Snappy compression support." OFF)

option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF)
option(VELOX_BUILD_VECTOR_TEST_UTILS "Builds Velox vector test utilities" OFF)
Expand Down Expand Up @@ -181,6 +182,7 @@ if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR})
set(VELOX_ENABLE_COMPRESSION_LZ4 ON)
set(VELOX_ENABLE_COMPRESSION_ZSTD ON)
set(VELOX_ENABLE_COMPRESSION_ZLIB ON)
set(VELOX_ENABLE_COMPRESSION_SNAPPY ON)
endif()

if(${VELOX_BUILD_TESTING})
Expand All @@ -197,6 +199,7 @@ if(${VELOX_BUILD_TESTING})
set(VELOX_ENABLE_COMPRESSION_LZ4 ON)
set(VELOX_ENABLE_COMPRESSION_ZSTD ON)
set(VELOX_ENABLE_COMPRESSION_ZLIB ON)
set(VELOX_ENABLE_COMPRESSION_SNAPPY ON)
endif()

if(${VELOX_ENABLE_BENCHMARKS})
Expand Down Expand Up @@ -502,12 +505,15 @@ if(VELOX_ENABLE_COMPRESSION_ZLIB)
find_package(ZLIB REQUIRED)
endif()

if(VELOX_ENABLE_COMPRESSION_SNAPPY)
find_package(Snappy REQUIRED)
endif()

if(${VELOX_BUILD_MINIMAL_WITH_DWIO} OR ${VELOX_ENABLE_HIVE_CONNECTOR})
# DWIO needs all sorts of stream compression libraries.
#
# TODO: make these optional and pluggable.
find_package(lzo2 REQUIRED)
find_package(Snappy REQUIRED)
endif()

velox_set_source(re2)
Expand Down
7 changes: 7 additions & 0 deletions velox/common/compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ if(VELOX_ENABLE_COMPRESSION_ZLIB)
velox_compile_definitions(velox_common_compression
PRIVATE VELOX_ENABLE_COMPRESSION_ZLIB)
endif()

if(VELOX_ENABLE_COMPRESSION_SNAPPY)
velox_sources(velox_common_compression PRIVATE SnappyCompression.cpp)
velox_link_libraries(velox_common_compression PUBLIC Snappy::snappy)
velox_compile_definitions(velox_common_compression
PRIVATE VELOX_ENABLE_COMPRESSION_SNAPPY)
endif()
16 changes: 16 additions & 0 deletions velox/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
#ifdef VELOX_ENABLE_COMPRESSION_ZLIB
#include "velox/common/compression/ZlibCompression.h"
#endif
#ifdef VELOX_ENABLE_COMPRESSION_SNAPPY
#include "velox/common/compression/SnappyCompression.h"
#endif

#include <folly/Conv.h>

Expand Down Expand Up @@ -117,6 +120,10 @@ bool Codec::supportsGetUncompressedLength(CompressionKind kind) {
#ifdef VELOX_ENABLE_COMPRESSION_ZSTD
case CompressionKind::CompressionKind_ZSTD:
return true;
#endif
#ifdef VELOX_ENABLE_COMPRESSION_SNAPPY
case CompressionKind::CompressionKind_SNAPPY:
return true;
#endif
default:
return false;
Expand Down Expand Up @@ -221,6 +228,11 @@ Expected<std::unique_ptr<Codec>> Codec::create(
codec = makeGzipCodec(compressionLevel);
break;
}
#endif
#ifdef VELOX_ENABLE_COMPRESSION_SNAPPY
case CompressionKind::CompressionKind_SNAPPY:
codec = makeSnappyCodec();
break;
#endif
default:
break;
Expand Down Expand Up @@ -260,6 +272,10 @@ bool Codec::isAvailable(CompressionKind kind) {
[[fallthrough]];
case CompressionKind::CompressionKind_GZIP:
return true;
#endif
#ifdef VELOX_ENABLE_COMPRESSION_SNAPPY
case CompressionKind::CompressionKind_SNAPPY:
return true;
#endif
default:
return false;
Expand Down
110 changes: 110 additions & 0 deletions velox/common/compression/SnappyCompression.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/common/compression/SnappyCompression.h"
#include "velox/common/base/Exceptions.h"

namespace facebook::velox::common {

uint64_t SnappyCodec::maxCompressedLength(uint64_t inputLength) {
DCHECK_GE(inputLength, 0);
return static_cast<uint64_t>(
snappy::MaxCompressedLength(static_cast<size_t>(inputLength)));
}

Expected<uint64_t> SnappyCodec::compress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) {
size_t output_size;
snappy::RawCompress(
reinterpret_cast<const char*>(input),
static_cast<size_t>(inputLength),
reinterpret_cast<char*>(output),
&output_size);
return static_cast<uint64_t>(output_size);
}

Expected<uint64_t> SnappyCodec::decompress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) {
size_t decompressedSize;
VELOX_RETURN_UNEXPECTED_IF(
!snappy::GetUncompressedLength(
reinterpret_cast<const char*>(input),
static_cast<size_t>(inputLength),
&decompressedSize),
Status::IOError("Corrupt snappy compressed data."));
VELOX_RETURN_UNEXPECTED_IF(
outputLength < decompressedSize,
Status::IOError("Output length is too small"));
VELOX_RETURN_UNEXPECTED_IF(
!snappy::RawUncompress(
reinterpret_cast<const char*>(input),
static_cast<size_t>(inputLength),
reinterpret_cast<char*>(output)),
Status::IOError("Corrupt snappy compressed data."));
return static_cast<uint64_t>(decompressedSize);
}

Expected<std::shared_ptr<StreamingCompressor>>
SnappyCodec::makeStreamingCompressor() {
return folly::makeUnexpected(
Status::NotImplemented("Streaming compression unsupported with Snappy"));
}

Expected<std::shared_ptr<StreamingDecompressor>>
SnappyCodec::makeStreamingDecompressor() {
return folly::makeUnexpected(Status::NotImplemented(
"Streaming decompression unsupported with Snappy"));
}

CompressionKind SnappyCodec::compressionKind() const {
return CompressionKind_SNAPPY;
}

int32_t SnappyCodec::minimumCompressionLevel() const {
return kUseDefaultCompressionLevel;
}

int32_t SnappyCodec::maximumCompressionLevel() const {
return kUseDefaultCompressionLevel;
}

int32_t SnappyCodec::defaultCompressionLevel() const {
return kUseDefaultCompressionLevel;
}

std::optional<uint64_t> SnappyCodec::getUncompressedLength(
const uint8_t* input,
uint64_t inputLength) const {
size_t decompressedSize;
if (!snappy::GetUncompressedLength(
reinterpret_cast<const char*>(input),
static_cast<size_t>(inputLength),
&decompressedSize)) {
return std::nullopt;
}
return static_cast<uint64_t>(decompressedSize);
}

std::unique_ptr<Codec> makeSnappyCodec() {
return std::make_unique<SnappyCodec>();
}
} // namespace facebook::velox::common
64 changes: 64 additions & 0 deletions velox/common/compression/SnappyCompression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Derived from Apache Arrow.

#include <snappy.h>
#include <cstddef>
#include <cstdint>
#include <memory>
#include "velox/common/compression/Compression.h"

namespace facebook::velox::common {

class SnappyCodec : public Codec {
public:
uint64_t maxCompressedLength(uint64_t inputLength) override;

Expected<uint64_t> compress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) override;

Expected<uint64_t> decompress(
const uint8_t* input,
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) override;

Expected<std::shared_ptr<StreamingCompressor>> makeStreamingCompressor()
override;

Expected<std::shared_ptr<StreamingDecompressor>> makeStreamingDecompressor()
override;

CompressionKind compressionKind() const override;

int32_t minimumCompressionLevel() const override;

int32_t maximumCompressionLevel() const override;

int32_t defaultCompressionLevel() const override;

std::optional<uint64_t> getUncompressedLength(
const uint8_t* input,
uint64_t inputLength) const override;
};

std::unique_ptr<Codec> makeSnappyCodec();

} // namespace facebook::velox::common
5 changes: 5 additions & 0 deletions velox/common/compression/tests/CompressionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,11 @@ INSTANTIATE_TEST_SUITE_P(
CodecTest,
::testing::ValuesIn(generateZlibTestParams()));

INSTANTIATE_TEST_SUITE_P(
TestSnappy,
CodecTest,
::testing::Values(CompressionKind::CompressionKind_SNAPPY));

TEST(CodecLZ4HadoopTest, compatibility) {
// LZ4 Hadoop codec should be able to read back LZ4 raw blocks.
auto c1 = Codec::create(
Expand Down

0 comments on commit 2dc70d5

Please sign in to comment.