-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add QAT support in zfile #250
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,7 +25,10 @@ | |
#include "photon/fs/filesystem.h" | ||
|
||
#ifdef ENABLE_QAT | ||
#include "lz4/lz4-qat.h" | ||
#include <qatzip.h> | ||
#include <condition_variable> | ||
#include <mutex> | ||
#include <optional> | ||
extern "C" { | ||
#include <pci/pci.h> | ||
} | ||
|
@@ -46,7 +49,7 @@ class BaseCompressor : public ICompressor { | |
vector<unsigned char *> compressed_data; | ||
vector<unsigned char *> uncompressed_data; | ||
|
||
const int DEFAULT_N_BATCH = 256; | ||
const int DEFAULT_N_BATCH = 1; | ||
|
||
virtual int init(const CompressArgs *args) { | ||
auto opt = &args->opt; | ||
|
@@ -129,20 +132,24 @@ class BaseCompressor : public ICompressor { | |
} | ||
}; | ||
|
||
class LZ4Compressor : public BaseCompressor { | ||
public: | ||
bool qat_enable = false; | ||
#ifdef ENABLE_QAT | ||
LZ4_qat_param *pQat = nullptr; | ||
struct QzSessionDeleter { | ||
void operator()(struct QzSession_S *session); | ||
}; | ||
|
||
void QzSessionDeleter::operator()(struct QzSession_S *session) { | ||
qzTeardownSession(session); | ||
delete session; | ||
} | ||
#endif | ||
|
||
~LZ4Compressor() { | ||
class LZ4Compressor : public BaseCompressor { | ||
public: | ||
#ifdef ENABLE_QAT | ||
if (pQat) { | ||
qat_uninit(pQat); | ||
delete pQat; | ||
} | ||
using session_ptr = std::unique_ptr<struct QzSession_S, QzSessionDeleter>; | ||
#endif | ||
bool qat_enable = false; | ||
~LZ4Compressor() { | ||
} | ||
|
||
bool check_qat() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does check_qat() need root permission? |
||
|
@@ -179,8 +186,6 @@ class LZ4Compressor : public BaseCompressor { | |
max_dst_size = LZ4_compressBound(src_blk_size); | ||
#ifdef ENABLE_QAT | ||
if (check_qat()) { | ||
pQat = new LZ4_qat_param(); | ||
qat_init(pQat); | ||
qat_enable = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may need to create a lot of decompressor instances, we should integrate check_qat() and session cache into a global qat object. |
||
} | ||
#endif | ||
|
@@ -192,69 +197,173 @@ class LZ4Compressor : public BaseCompressor { | |
return (qat_enable ? DEFAULT_N_BATCH : 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it better to use max_batch() instead of nbatch()? And the number is fixed in all QAT hardware, is it? |
||
} | ||
|
||
virtual int do_compress(size_t *src_chunk_len, size_t *dst_chunk_len, | ||
size_t dst_buffer_capacity, size_t nblock) override { | ||
int do_compress(size_t *src_chunk_len, size_t *dst_chunk_len, size_t dst_buffer_capacity, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Usually we don't need to put the function body backward. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't feel necessary to have separate do_compress() and do_decompress(). Simply realizes the logic in batch_compress() / batch_decompress(). I do feel necessary to have qat-related logic separated as a singleton object shared by all lz4_decompressor objects (and threads). The qat wrapper object simply provides 2 functions: batch_compress() / batch_decompress(). Whenever they fail, we fall back to the software-only implementation. |
||
size_t nblock) override; | ||
|
||
int ret = 0; | ||
int do_decompress(size_t *src_chunk_len, size_t *dst_chunk_len, size_t dst_buffer_capacity, | ||
size_t n) override; | ||
#ifdef ENABLE_QAT | ||
if (qat_enable) { | ||
ret = LZ4_compress_qat(pQat, &raw_data[0], src_chunk_len, &compressed_data[0], | ||
dst_chunk_len, n); | ||
if (ret < 0) { | ||
LOG_ERROR_RETURN(EFAULT, -1, "LZ4 compress data failed. (retcode: `).", ret); | ||
} | ||
return ret; | ||
private: | ||
session_ptr get_session(); | ||
friend struct cached_session_t; | ||
std::vector<session_ptr> sessions; | ||
std::mutex mutex; | ||
#endif | ||
}; | ||
|
||
#ifdef ENABLE_QAT | ||
static bool setup_session(LZ4Compressor::session_ptr &session) { | ||
int rc; | ||
rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT); | ||
if (rc != QZ_OK && rc != QZ_DUPLICATE) | ||
return false; | ||
QzSessionParamsLZ4_T params; | ||
rc = qzGetDefaultsLZ4(¶ms); | ||
if (rc != QZ_OK) | ||
return false; | ||
// params.data_fmt = QZ_LZ4; | ||
params.common_params.comp_algorithm = QZ_LZ4; | ||
params.common_params.comp_lvl = 1; | ||
params.common_params.direction = QZ_DIR_BOTH; | ||
rc = qzSetupSessionLZ4(session.get(), ¶ms); | ||
if (rc != QZ_OK) | ||
return false; | ||
|
||
return true; | ||
} | ||
|
||
// put the session back to the session pool in a RAII manner | ||
struct cached_session_t { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use photon::object_cache instead. It is more optimized for concurrency & performance. And it automatically releases objects as they get cold. |
||
cached_session_t(const cached_session_t &); | ||
|
||
cached_session_t(LZ4Compressor *accel, LZ4Compressor::session_ptr &&sess) | ||
: accel{accel}, session{std::move(sess)} { | ||
} | ||
|
||
~cached_session_t() { | ||
std::lock_guard<std::mutex> lock{accel->mutex}; | ||
// if the cache size is still under its upper bound, the current session is put into | ||
// accel->sessions. otherwise it's released right | ||
uint64_t sessions_num = 256; | ||
// g_ceph_context->_conf.get_val<uint64_t>("qat_compressor_session_max_number"); | ||
if (accel->sessions.size() < sessions_num) { | ||
accel->sessions.push_back(std::move(session)); | ||
} | ||
} | ||
|
||
struct QzSession_S *get() { | ||
assert(static_cast<bool>(session)); | ||
return session.get(); | ||
} | ||
|
||
LZ4Compressor *accel; | ||
LZ4Compressor::session_ptr session; | ||
}; | ||
|
||
LZ4Compressor::session_ptr LZ4Compressor::get_session() { | ||
{ | ||
std::lock_guard<std::mutex> lock{mutex}; | ||
if (!sessions.empty()) { | ||
auto session = std::move(sessions.back()); | ||
sessions.pop_back(); | ||
return session; | ||
} | ||
} | ||
|
||
// If there are no available session to use, we try allocate a new | ||
// session. | ||
session_ptr session(new struct QzSession_S()); | ||
memset(session.get(), 0, sizeof(struct QzSession_S)); | ||
if (setup_session(session)) { | ||
return session; | ||
} else { | ||
return nullptr; | ||
} | ||
} | ||
#endif | ||
for (size_t i = 0; i < nblock; i++) { | ||
ret = | ||
LZ4_compress_default((const char *)uncompressed_data[i], (char *)compressed_data[i], | ||
src_chunk_len[i], dst_buffer_capacity / nblock); | ||
|
||
dst_chunk_len[i] = ret; | ||
if (ret < 0) { | ||
LOG_ERROR_RETURN(EFAULT, -1, "LZ4 compress data failed. (retcode: `).", ret); | ||
} | ||
if (ret == 0) { | ||
LOG_ERROR_RETURN( | ||
EFAULT, -1, | ||
"Compression worked, but was stopped because the *dst couldn't hold all the information."); | ||
int LZ4Compressor::do_compress(size_t *src_chunk_len, size_t *dst_chunk_len, | ||
size_t dst_buffer_capacity, size_t nblock) { | ||
|
||
int ret = 0; | ||
#ifdef ENABLE_QAT | ||
if (qat_enable) { | ||
auto s = get_session(); // get a session from the pool | ||
if (!s) { | ||
LOG_INFO("get_session failed!"); | ||
return -1; // session initialization failed | ||
} | ||
auto session = | ||
cached_session_t{this, std::move(s)}; // returns to the session pool on destruction | ||
|
||
for (size_t i = 0; i < nblock; i++) { | ||
unsigned int len = src_chunk_len[i]; | ||
unsigned int out_len = qzMaxCompressedLength(len, session.get()); | ||
int rc = qzCompress(session.get(), uncompressed_data[i], (&len), compressed_data[i], | ||
(&out_len), 1); | ||
dst_chunk_len[i] = out_len; | ||
if (rc != QZ_OK) { | ||
LOG_INFO("qzCompress failed!, rc: `", rc); | ||
return -1; | ||
} | ||
} | ||
return 0; | ||
} | ||
#endif | ||
for (size_t i = 0; i < nblock; i++) { | ||
ret = LZ4_compress_default((const char *)uncompressed_data[i], (char *)compressed_data[i], | ||
src_chunk_len[i], dst_buffer_capacity / nblock); | ||
|
||
int do_decompress(size_t *src_chunk_len, size_t *dst_chunk_len, size_t dst_buffer_capacity, | ||
size_t n) override { | ||
dst_chunk_len[i] = ret; | ||
if (ret < 0) { | ||
LOG_ERROR_RETURN(EFAULT, -1, "LZ4 compress data failed. (retcode: `).", ret); | ||
} | ||
if (ret == 0) { | ||
LOG_ERROR_RETURN( | ||
EFAULT, -1, | ||
"Compression worked, but was stopped because the *dst couldn't hold all the information."); | ||
} | ||
} | ||
return 0; | ||
} | ||
int LZ4Compressor::do_decompress(size_t *src_chunk_len, size_t *dst_chunk_len, | ||
size_t dst_buffer_capacity, size_t n) { | ||
|
||
int ret = 0; | ||
int ret = 0; | ||
#ifdef ENABLE_QAT | ||
if (qat_enable) { | ||
ret = LZ4_decompress_qat(pQat, &compressed_data[0], src_chunk_len, | ||
&uncompressed_data[0], dst_chunk_len, n); | ||
if (ret < 0) { | ||
LOG_ERROR_RETURN(EFAULT, -1, "LZ4 decompress data failed. (retcode: `).", ret); | ||
} | ||
return ret; | ||
if (qat_enable) { | ||
auto s = get_session(); // get a session from the pool | ||
if (!s) { | ||
return -1; // session initialization failed | ||
} | ||
#endif | ||
for (size_t i = 0; i < n; i++) { | ||
ret = | ||
LZ4_decompress_safe((const char *)compressed_data[i], (char *)uncompressed_data[i], | ||
src_chunk_len[i], dst_buffer_capacity / n); | ||
auto session = | ||
cached_session_t{this, std::move(s)}; // returns to the session pool on destruction | ||
|
||
dst_chunk_len[i] = ret; | ||
if (ret < 0) { | ||
LOG_ERROR_RETURN(EFAULT, -1, "LZ4 decompress data failed. (retcode: `).", ret); | ||
} | ||
if (ret == 0) { | ||
LOG_ERROR_RETURN(EFAULT, -1, | ||
"LZ4 decompress returns 0. THIS SHOULD BE NEVER HAPPEN!"); | ||
} | ||
for (size_t i = 0; i < n; i++) { | ||
int rc = qzDecompress(session.get(), compressed_data[i], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that batch_decompress() is not used yet in zfile.cpp |
||
reinterpret_cast<unsigned int *>(&src_chunk_len[i]), | ||
uncompressed_data[i], | ||
reinterpret_cast<unsigned int *>(&dst_chunk_len[i])); | ||
if (rc != QZ_OK) | ||
return -1; | ||
} | ||
return 0; | ||
} | ||
}; | ||
#endif | ||
for (size_t i = 0; i < n; i++) { | ||
ret = LZ4_decompress_safe((const char *)compressed_data[i], (char *)uncompressed_data[i], | ||
src_chunk_len[i], dst_buffer_capacity / n); | ||
|
||
dst_chunk_len[i] = ret; | ||
if (ret < 0) { | ||
LOG_ERROR_RETURN(EFAULT, -1, "LZ4 decompress data failed. (retcode: `).", ret); | ||
} | ||
if (ret == 0) { | ||
LOG_ERROR_RETURN(EFAULT, -1, "LZ4 decompress returns 0. THIS SHOULD BE NEVER HAPPEN!"); | ||
} | ||
} | ||
return 0; | ||
} | ||
|
||
class Compressor_zstd : public BaseCompressor { | ||
public: | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the content of compressed_data[] / uncompressed_data[] are generated in compress_batch() / decompress_bath, which get used in do_compress() / do_decompress().
I believe these 2 vectors are useless. One can use function-local arrays instead.