Skip to content

Commit

Permalink
address comments & refine test
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Feb 27, 2025
1 parent 59ad345 commit 09600c7
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 65 deletions.
30 changes: 10 additions & 20 deletions velox/common/compression/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,6 @@ bool Codec::supportsGetUncompressedLength(CompressionKind kind) {
return false;
}

bool Codec::supportsStreamingCompression(CompressionKind kind) {
switch (kind) {
#ifdef VELOX_ENABLE_COMPRESSION_LZ4
case CompressionKind_LZ4:
return true;
#endif
default:
return false;
}
}

bool Codec::supportsCompressFixedLength(CompressionKind kind) {
// TODO: Return true if it's supported by compression kind.
return false;
Expand All @@ -133,9 +122,6 @@ Expected<std::unique_ptr<Codec>> Codec::create(
const CodecOptions& codecOptions) {
if (!isAvailable(kind)) {
auto name = compressionKindToString(kind);
VELOX_RETURN_UNEXPECTED_IF(
folly::StringPiece({name}).startsWith("unknown"),
Status::Invalid("Unrecognized codec: ", name));
return folly::makeUnexpected(Status::Invalid(
"Support for codec '{}' is either not built or not implemented.",
name));
Expand All @@ -158,9 +144,10 @@ Expected<std::unique_ptr<Codec>> Codec::create(
codec = makeLz4HadoopCodec();
break;
}
} else {
// By default, create LZ4 Frame codec.
codec = makeLz4FrameCodec(compressionLevel);
}
// By default, create LZ4 Frame codec.
codec = makeLz4FrameCodec(compressionLevel);
break;
#endif
default:
Expand All @@ -185,8 +172,6 @@ Expected<std::unique_ptr<Codec>> Codec::create(

bool Codec::isAvailable(CompressionKind kind) {
switch (kind) {
case CompressionKind_NONE:
return true;
#ifdef VELOX_ENABLE_COMPRESSION_LZ4
case CompressionKind_LZ4:
return true;
Expand All @@ -196,10 +181,11 @@ bool Codec::isAvailable(CompressionKind kind) {
}
}

std::optional<uint64_t> Codec::getUncompressedLength(
Expected<uint64_t> Codec::getUncompressedLength(
const uint8_t* input,
uint64_t inputLength) const {
return std::nullopt;
return folly::makeUnexpected(Status::Invalid(
"getUncompressedLength is unsupported with {} format.", name()));
}

Expected<uint64_t> Codec::compressFixedLength(
Expand All @@ -211,6 +197,10 @@ Expected<uint64_t> Codec::compressFixedLength(
Status::Invalid("'{}' doesn't support fixed-length compression", name()));
}

bool Codec::supportsStreamingCompression() const {
return false;
}

Expected<std::shared_ptr<StreamingCompressor>>
Codec::makeStreamingCompressor() {
return folly::makeUnexpected(Status::Invalid(
Expand Down
41 changes: 24 additions & 17 deletions velox/common/compression/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class StreamingCompressor {

/// Compress some input.
/// If CompressResult.outputTooSmall is true on return, compress() should be
/// called again with a larger output buffer.
/// called again with a larger output buffer, such as doubling its size.
virtual Expected<CompressResult> compress(
const uint8_t* input,
uint64_t inputLength,
Expand All @@ -82,15 +82,16 @@ class StreamingCompressor {

/// Flush part of the compressed output.
/// If FlushResult.outputTooSmall is true on return, flush() should be called
/// again with a larger output buffer.
/// again with a larger output buffer, such as doubling its size.
virtual Expected<FlushResult> flush(
uint8_t* output,
uint64_t outputLength) = 0;

/// End compressing, doing whatever is necessary to end the stream.
/// End compressing, doing whatever is necessary to end the stream, and
/// flushing the compressed output.
/// If EndResult.outputTooSmall is true on return, end() should be called
/// again with a larger output buffer. Otherwise, the StreamingCompressor
/// should not be used anymore. end() will flush the compressed output.
/// again with a larger output buffer, such as doubling its size.
/// Otherwise, the StreamingCompressor should not be used anymore.
virtual Expected<EndResult> end(uint8_t* output, uint64_t outputLength) = 0;
};

Expand All @@ -106,7 +107,7 @@ class StreamingDecompressor {

/// Decompress some input.
/// If DecompressResult.outputTooSmall is true on return, decompress() should
/// be called again with a larger output buffer.
/// be called again with a larger output buffer, such as doubling its size.
virtual Expected<DecompressResult> decompress(
const uint8_t* input,
uint64_t inputLength,
Expand Down Expand Up @@ -154,9 +155,6 @@ class Codec {
/// compressed length.
static bool supportsCompressFixedLength(CompressionKind kind);

// Return true if indicated kind supports creating streaming de/compressor.
static bool supportsStreamingCompression(CompressionKind kind);

/// Return the smallest supported compression level.
/// If the codec doesn't support compression level,
/// `kUseDefaultCompressionLevel` will be returned.
Expand Down Expand Up @@ -214,19 +212,28 @@ class Codec {
// Maximum compressed length of given input length.
virtual uint64_t maxCompressedLength(uint64_t inputLength) = 0;

/// Retrieves the actual uncompressed length of data using the specified
/// compression library.
/// Note: This functionality is not universally supported by all compression
/// libraries. If not supported, `std::nullopt` will be returned.
virtual std::optional<uint64_t> getUncompressedLength(
/// Retrieves the uncompressed length of the given compressed data using the
/// specified compression library.
/// If the input data is corrupted, returns `Unexpected` with
/// `Status::IOError`. Not all compression libraries support this
/// functionality. Use supportsGetUncompressedLength() to check before
/// calling. If unsupported, returns `Unexpected` with `Status::Invalid`.
virtual Expected<uint64_t> getUncompressedLength(
const uint8_t* input,
uint64_t inputLength) const;

// Create a streaming compressor instance.
// Return true if indicated kind supports creating streaming de/compressor.
virtual bool supportsStreamingCompression() const;

/// Create a streaming compressor instance.
/// Use supportsStreamingCompression() to check before calling.
/// If unsupported, returns `Unexpected` with `Status::Invalid`.
virtual Expected<std::shared_ptr<StreamingCompressor>>
makeStreamingCompressor();

// Create a streaming compressor instance.
/// Create a streaming decompressor instance.
/// Use supportsStreamingCompression() to check before calling.
/// If unsupported, returns `Unexpected` with `Status::Invalid`.
virtual Expected<std::shared_ptr<StreamingDecompressor>>
makeStreamingDecompressor();

Expand All @@ -237,7 +244,7 @@ class Codec {
virtual int32_t compressionLevel() const;

// The name of this Codec's compression type.
std::string name() const;
virtual std::string name() const;

private:
// Initializes the codec's resources.
Expand Down
16 changes: 14 additions & 2 deletions velox/common/compression/Lz4Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ Status LZ4Compressor::compressBegin(
return Status::OK();
}

Status common::LZ4Decompressor::init() {
Status LZ4Decompressor::init() {
finished_ = false;
auto ret = LZ4F_createDecompressionContext(&ctx_, LZ4F_VERSION);
VELOX_RETURN_IF(LZ4F_isError(ret), lz4Error("LZ4 init failed: ", ret));
Expand Down Expand Up @@ -304,7 +304,7 @@ int32_t Lz4CodecBase::compressionLevel() const {
}

CompressionKind Lz4CodecBase::compressionKind() const {
return CompressionKind::CompressionKind_LZ4;
return CompressionKind_LZ4;
}

Lz4FrameCodec::Lz4FrameCodec(int32_t compressionLevel)
Expand Down Expand Up @@ -363,6 +363,10 @@ Expected<uint64_t> Lz4FrameCodec::decompress(
});
}

bool Lz4FrameCodec::supportsStreamingCompression() const {
return true;
}

Expected<std::shared_ptr<StreamingCompressor>>
Lz4FrameCodec::makeStreamingCompressor() {
auto ptr = std::make_shared<LZ4Compressor>(compressionLevel_);
Expand Down Expand Up @@ -430,6 +434,10 @@ Expected<uint64_t> Lz4RawCodec::decompress(
return static_cast<uint64_t>(decompressedSize);
}

std::string Lz4RawCodec::name() const {
return "lz4_raw";
}

Lz4HadoopCodec::Lz4HadoopCodec() : Lz4RawCodec(kLz4DefaultCompressionLevel) {}

uint64_t Lz4HadoopCodec::maxCompressedLength(uint64_t inputLength) {
Expand Down Expand Up @@ -491,6 +499,10 @@ int32_t Lz4HadoopCodec::defaultCompressionLevel() const {
return kUseDefaultCompressionLevel;
}

std::string Lz4HadoopCodec::name() const {
return "lz4_hadoop";
}

Expected<uint64_t> Lz4HadoopCodec::decompressInternal(
const uint8_t* input,
uint64_t inputLength,
Expand Down
10 changes: 8 additions & 2 deletions velox/common/compression/Lz4Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ struct Lz4CodecOptions : CodecOptions {
enum Type { kLz4Frame, kLz4Raw, kLz4Hadoop };

Lz4CodecOptions(
Lz4CodecOptions::Type type,
Type type,
int32_t compressionLevel = kUseDefaultCompressionLevel)
: CodecOptions(compressionLevel), type(type) {}

Lz4CodecOptions::Type type;
Type type;
};

class Lz4CodecBase : public Codec {
Expand Down Expand Up @@ -72,6 +72,8 @@ class Lz4FrameCodec : public Lz4CodecBase {
uint8_t* output,
uint64_t outputLength) override;

bool supportsStreamingCompression() const override;

Expected<std::shared_ptr<StreamingCompressor>> makeStreamingCompressor()
override;

Expand Down Expand Up @@ -99,6 +101,8 @@ class Lz4RawCodec : public Lz4CodecBase {
uint64_t inputLength,
uint8_t* output,
uint64_t outputLength) override;

std::string name() const override;
};

/// The Hadoop Lz4Codec source code can be found here:
Expand Down Expand Up @@ -127,6 +131,8 @@ class Lz4HadoopCodec : public Lz4RawCodec, public HadoopCompressionFormat {

int32_t defaultCompressionLevel() const override;

std::string name() const override;

private:
Expected<uint64_t> decompressInternal(
const uint8_t* input,
Expand Down
63 changes: 39 additions & 24 deletions velox/common/compression/tests/CompressionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ std::vector<TestParams> generateLz4TestParams() {
Lz4CodecOptions::kLz4Frame,
Lz4CodecOptions::kLz4Hadoop}) {
params.emplace_back(
CompressionKind::CompressionKind_LZ4,
std::make_shared<Lz4CodecOptions>(type));
CompressionKind_LZ4, std::make_shared<Lz4CodecOptions>(type));
}
// Add default CodecOptions.
params.emplace_back(CompressionKind_LZ4);
return params;
}

Expand Down Expand Up @@ -386,14 +387,8 @@ class CodecTest : public ::testing::TestWithParam<TestParams> {
TEST_P(CodecTest, specifyCompressionLevel) {
std::vector<uint8_t> data = makeRandomData(2000);
const auto kind = getCompressionKind();
if (!Codec::isAvailable(kind)) {
// Support for this codec hasn't been built.
VELOX_ASSERT_THROW(
Codec::create(kind, kUseDefaultCompressionLevel),
"Support for codec '" + compressionKindToString(kind) +
"' is either not built or not implemented.");
return;
}
ASSERT_TRUE(Codec::isAvailable(kind));

auto codecDefault =
Codec::create(kind).thenOrThrow(folly::identity, throwsNotOk);
checkCodecRoundtrip(codecDefault, data);
Expand Down Expand Up @@ -421,14 +416,22 @@ TEST_P(CodecTest, getUncompressedLength) {
compressed.resize(compressedLength);

if (Codec::supportsGetUncompressedLength(getCompressionKind())) {
ASSERT_EQ(
codec->getUncompressedLength(compressed.data(), compressedLength),
inputLength);
auto uncompressedLength =
codec->getUncompressedLength(compressed.data(), compressedLength)
.thenOrThrow(folly::identity, throwsNotOk);
ASSERT_EQ(uncompressedLength, inputLength);
} else {
ASSERT_EQ(
codec->getUncompressedLength(compressed.data(), compressedLength),
std::nullopt);
VELOX_ASSERT_ERROR_STATUS(
codec->getUncompressedLength(compressed.data(), compressedLength)
.error(),
StatusCode::kInvalid,
fmt::format(
"getUncompressedLength is unsupported with {} format.",
codec->name()));
}

// TODO: For codecs that support getUncompressedLength(), verify the error
// message for corrupted data.
}

TEST_P(CodecTest, codecRoundtrip) {
Expand All @@ -440,47 +443,47 @@ TEST_P(CodecTest, codecRoundtrip) {
}

TEST_P(CodecTest, streamingCompressor) {
if (!Codec::supportsStreamingCompression(getCompressionKind())) {
const auto codec = makeCodec();
if (!codec->supportsStreamingCompression()) {
return;
}

for (auto dataSize : {0, 10, 10000, 100000}) {
auto codec = makeCodec();
checkStreamingCompressor(codec.get(), makeRandomData(dataSize));
checkStreamingCompressor(codec.get(), makeCompressibleData(dataSize));
}
}

TEST_P(CodecTest, streamingDecompressor) {
if (!Codec::supportsStreamingCompression(getCompressionKind())) {
const auto codec = makeCodec();
if (!codec->supportsStreamingCompression()) {
return;
}

for (auto dataSize : {0, 10, 10000, 100000}) {
auto codec = makeCodec();
checkStreamingDecompressor(codec.get(), makeRandomData(dataSize));
checkStreamingDecompressor(codec.get(), makeCompressibleData(dataSize));
}
}

TEST_P(CodecTest, streamingRoundtrip) {
if (!Codec::supportsStreamingCompression(getCompressionKind())) {
const auto codec = makeCodec();
if (!codec->supportsStreamingCompression()) {
return;
}

for (auto dataSize : {0, 10, 10000, 100000}) {
auto codec = makeCodec();
checkStreamingRoundtrip(codec.get(), makeRandomData(dataSize));
checkStreamingRoundtrip(codec.get(), makeCompressibleData(dataSize));
}
}

TEST_P(CodecTest, streamingDecompressorReuse) {
if (!Codec::supportsStreamingCompression(getCompressionKind())) {
const auto codec = makeCodec();
if (!codec->supportsStreamingCompression()) {
return;
}

auto codec = makeCodec();
const auto& decompressor = makeStreamingDecompressor(codec.get());
checkStreamingRoundtrip(
makeStreamingCompressor(codec.get()), decompressor, makeRandomData(100));
Expand Down Expand Up @@ -512,4 +515,16 @@ TEST(CodecLZ4HadoopTest, compatibility) {
checkCodecRoundtrip(c1, c2, makeRandomData(dataSize));
}
}

TEST(CodecTestInvalid, invalidKind) {
CompressionKind kind = CompressionKind_NONE;
ASSERT_FALSE(Codec::isAvailable(kind));

VELOX_ASSERT_ERROR_STATUS(
Codec::create(kind, kUseDefaultCompressionLevel).error(),
StatusCode::kInvalid,
fmt::format(
"Support for codec '{}' is either not built or not implemented.",
compressionKindToString(kind)));
}
} // namespace facebook::velox::common

0 comments on commit 09600c7

Please sign in to comment.