Skip to content

Commit

Permalink
[SYS-6179] Add MultiReadAsync file API. (#290)
Browse files Browse the repository at this point in the history
* Detaching CloudFileSystemEnv from CloudFileSystem class hierarchy.

* [SYS-6179] Add MultiReadAsync file API.

Extending the file API to provide an option to implement asynchronous
multi read operation. The default implementation delegates to AsyncRead.
Updated the AsyncFileReader class implementation to use MultiReadAsync.

* Switch to sync default implementation of AsyncMultiRead.

* Add CloudManifest handling API to CloudFileSystem.

Expanidng the CloudFileSystem API to include methods
for handling CloudManifest. This allows us to better hide
CloudFileSystemImpl and allow different implementations of
CloudFileSystem.
  • Loading branch information
dpetrov4 authored Feb 14, 2024
1 parent 40f265a commit 61223e2
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 82 deletions.
17 changes: 16 additions & 1 deletion cloud/cloud_file_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "cloud/aws/aws_file_system.h"
#include "cloud/cloud_log_controller_impl.h"
#include "cloud/cloud_manifest.h"
#include "cloud/db_cloud_impl.h"
#include "cloud/filename.h"
#include "env/composite_env_wrapper.h"
Expand Down Expand Up @@ -495,7 +496,7 @@ Status CloudFileSystemEnv::CreateFromString(
std::string id;
std::unordered_map<std::string, std::string> options;
Status s;
if (value.find("=") == std::string::npos) {
if (value.find('=') == std::string::npos) {
id = value;
} else {
s = StringToMap(value, &options);
Expand Down Expand Up @@ -633,5 +634,19 @@ std::unique_ptr<Env> CloudFileSystemEnv::NewCompositeEnv(
return std::make_unique<CompositeEnvWrapper>(env, fs);
}

IOStatus CloudFileSystemEnv::LoadCloudManifest(
const std::string& dbname, const std::shared_ptr<FileSystem>& fs,
const std::string& cookie, std::unique_ptr<CloudManifest>* cloud_manifest) {
std::unique_ptr<SequentialFileReader> reader;
auto cloud_manifest_file_name = MakeCloudManifestFile(dbname, cookie);
auto s = SequentialFileReader::Create(fs, cloud_manifest_file_name,
FileOptions(), &reader, nullptr /*dbg*/,
nullptr /* rate_limiter */);
if (s.ok()) {
s = CloudManifest::LoadFromLog(std::move(reader), cloud_manifest);
}
return s;
}

} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
35 changes: 8 additions & 27 deletions cloud/cloud_file_system_impl.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright (c) 2017 Rockset.
#include "rocksdb/utilities/options_type.h"
#ifndef ROCKSDB_LITE

#include "rocksdb/cloud/cloud_file_system_impl.h"

#include <cinttypes>

#include "cloud/cloud_log_controller_impl.h"
Expand All @@ -14,14 +15,14 @@
#include "file/writable_file_writer.h"
#include "port/port_posix.h"
#include "rocksdb/cloud/cloud_file_deletion_scheduler.h"
#include "rocksdb/cloud/cloud_file_system_impl.h"
#include "rocksdb/cloud/cloud_log_controller.h"
#include "rocksdb/cloud/cloud_storage_provider.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/io_status.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/options_type.h"
#include "test_util/sync_point.h"
#include "util/xxhash.h"

Expand Down Expand Up @@ -850,22 +851,8 @@ IOStatus CloudFileSystemImpl::LoadLocalCloudManifest(
if (cloud_manifest_) {
cloud_manifest_.reset();
}
return CloudFileSystemImpl::LoadLocalCloudManifest(
dbname, GetBaseFileSystem(), cookie, &cloud_manifest_);
}

IOStatus CloudFileSystemImpl::LoadLocalCloudManifest(
const std::string& dbname, const std::shared_ptr<FileSystem>& base_fs,
const std::string& cookie, std::unique_ptr<CloudManifest>* cloud_manifest) {
std::unique_ptr<SequentialFileReader> reader;
auto cloud_manifest_file_name = MakeCloudManifestFile(dbname, cookie);
auto s = SequentialFileReader::Create(base_fs, cloud_manifest_file_name,
FileOptions(), &reader, nullptr /*dbg*/,
nullptr /* rate_limiter */);
if (s.ok()) {
s = CloudManifest::LoadFromLog(std::move(reader), cloud_manifest);
}
return s;
return CloudFileSystemEnv::LoadCloudManifest(dbname, GetBaseFileSystem(),
cookie, &cloud_manifest_);
}

std::string RemapFilenameWithCloudManifest(const std::string& logical_path,
Expand Down Expand Up @@ -1349,7 +1336,7 @@ IOStatus CloudFileSystemImpl::NeedsReinitialization(
// If the local MANIFEST is not compatible with local CLOUDMANIFEST, we will
// need to reinitialize the entire directory.
std::unique_ptr<CloudManifest> cloud_manifest;
auto load_status = LoadLocalCloudManifest(
auto load_status = CloudFileSystemEnv::LoadCloudManifest(
local_dir, base_fs, cloud_fs_options.cookie_on_open, &cloud_manifest);
if (load_status.ok()) {
std::string current_epoch = cloud_manifest->GetCurrentEpoch();
Expand Down Expand Up @@ -1616,9 +1603,8 @@ IOStatus CloudFileSystemImpl::LoadCloudManifest(const std::string& local_dbname,
//
// Create appropriate files in the clone dir
//
IOStatus CloudFileSystemImpl::SanitizeDirectory(const DBOptions& options,
const std::string& local_name,
bool read_only) {
IOStatus CloudFileSystemImpl::SanitizeLocalDirectory(
const DBOptions& options, const std::string& local_name, bool read_only) {
const auto& local_fs = GetBaseFileSystem();
const IOOptions io_opts;
IODebugContext* dbg = nullptr;
Expand Down Expand Up @@ -1913,11 +1899,6 @@ IOStatus CloudFileSystemImpl::FetchManifest(const std::string& local_dbname,
return IOStatus::NotFound();
}

IOStatus CloudFileSystemImpl::CreateCloudManifest(
const std::string& local_dbname) {
return CreateCloudManifest(local_dbname, cloud_fs_options.cookie_on_open);
}

IOStatus CloudFileSystemImpl::CreateCloudManifest(
const std::string& local_dbname, const std::string& cookie) {
// No cloud manifest, create an empty one
Expand Down
24 changes: 11 additions & 13 deletions cloud/db_cloud_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@

#include <cinttypes>

#include "rocksdb/cloud/cloud_file_system_impl.h"
#include "cloud/cloud_manifest.h"
#include "cloud/filename.h"
#include "cloud/manifest_reader.h"
#include "env/composite_env_wrapper.h"
#include "file/file_util.h"
#include "file/sst_file_manager_impl.h"
#include "logging/auto_roll_logger.h"
#include "rocksdb/cloud/cloud_file_system_impl.h"
#include "rocksdb/cloud/cloud_storage_provider.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
Expand Down Expand Up @@ -45,8 +45,8 @@ class ConstantSizeSstFileManager : public SstFileManagerImpl {
}

Status OnAddFile(const std::string& file_path) override {
return SstFileManagerImpl::OnAddFile(
file_path, uint64_t(constant_file_size_));
return SstFileManagerImpl::OnAddFile(file_path,
uint64_t(constant_file_size_));
}

private:
Expand Down Expand Up @@ -97,10 +97,10 @@ Status DBCloud::Open(const Options& opt, const std::string& local_dbname,
}

auto* cfs =
dynamic_cast<CloudFileSystemImpl*>(options.env->GetFileSystem().get());
dynamic_cast<CloudFileSystem*>(options.env->GetFileSystem().get());
assert(cfs);
if (!cfs->info_log_) {
cfs->info_log_ = options.info_log;
if (!cfs->GetLogger()) {
cfs->SetLogger(options.info_log);
}
// Use a constant sized SST File Manager if necesary.
// NOTE: if user already passes in an SST File Manager, we will respect user's
Expand Down Expand Up @@ -131,7 +131,7 @@ Status DBCloud::Open(const Options& opt, const std::string& local_dbname,
// If cloud manifest is already loaded, this means the directory has been
// sanitized (possibly by the call to ListColumnFamilies())
if (cfs->GetCloudManifest() == nullptr) {
st = cfs->SanitizeDirectory(options, local_dbname, read_only);
st = cfs->SanitizeLocalDirectory(options, local_dbname, read_only);

if (st.ok()) {
st = cfs->LoadCloudManifest(local_dbname, read_only);
Expand Down Expand Up @@ -324,8 +324,7 @@ Status DBCloudImpl::DoCheckpointToCloud(
const BucketOptions& destination, const CheckpointToCloudOptions& options) {
std::vector<std::string> live_files;
uint64_t manifest_file_size{0};
auto* cfs =
dynamic_cast<CloudFileSystemImpl*>(GetEnv()->GetFileSystem().get());
auto* cfs = dynamic_cast<CloudFileSystem*>(GetEnv()->GetFileSystem().get());
assert(cfs);
const auto& local_fs = cfs->GetBaseFileSystem();

Expand All @@ -334,7 +333,7 @@ Status DBCloudImpl::DoCheckpointToCloud(
if (!st.ok()) {
return st;
}

// Create a temp MANIFEST file first as this captures all the files we need
auto current_epoch = cfs->GetCloudManifest()->GetCurrentEpoch();
auto manifest_fname = ManifestFileWithEpoch("", current_epoch);
Expand All @@ -346,7 +345,6 @@ Status DBCloudImpl::DoCheckpointToCloud(
return st;
}


std::vector<std::pair<std::string, std::string>> files_to_copy;
for (auto& f : live_files) {
uint64_t number = 0;
Expand Down Expand Up @@ -453,13 +451,13 @@ Status DBCloud::ListColumnFamilies(const DBOptions& db_options,
const std::string& name,
std::vector<std::string>* column_families) {
auto* cfs =
dynamic_cast<CloudFileSystemImpl*>(db_options.env->GetFileSystem().get());
dynamic_cast<CloudFileSystem*>(db_options.env->GetFileSystem().get());
assert(cfs);

cfs->GetBaseFileSystem()->CreateDirIfMissing(name, IOOptions(),
nullptr /*dbg*/);

auto st = cfs->SanitizeDirectory(db_options, name, false);
auto st = cfs->SanitizeLocalDirectory(db_options, name, false);
if (st.ok()) {
st = cfs->LoadCloudManifest(name, false);
}
Expand Down
101 changes: 101 additions & 0 deletions file/random_access_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
#include "file/random_access_file_reader.h"

#include <algorithm>
#include <cstddef>
#include <mutex>
#include <utility>

#include "file/file_util.h"
#include "monitoring/histogram.h"
Expand Down Expand Up @@ -599,4 +601,103 @@ void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
delete read_async_info;
}

// RocksDB-Cloud contribution begin

// Callback data for non-direct IO version of MultiReadAsync.
struct MultiReadAsyncCbInfo {
MultiReadAsyncCbInfo(
std::function<void(const FSReadRequest*, size_t, void*)> cb, void* cb_arg,
uint64_t start_time)
: cb_(cb), cb_arg_(cb_arg), start_time_(start_time) {}

std::function<void(const FSReadRequest*, size_t, void*)> cb_;
void* cb_arg_;
uint64_t start_time_;
FileOperationInfo::StartTimePoint fs_start_ts_;
};

IOStatus RandomAccessFileReader::MultiReadAsync(
FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts,
std::function<void(const FSReadRequest*, size_t, void*)> cb, void* cb_arg,
void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns,
AlignedBuf* /* aligned_buf */) {
IOStatus s;
uint64_t elapsed = 0;

if (use_direct_io()) {
return IOStatus::InvalidArgument(
"DirectIO support not implemented for MultiReadAsync");
}

// Create a callback and populate info.
auto read_async_callback =
std::bind_front(&RandomAccessFileReader::MultiReadAsyncCallback, this);

auto cb_info =
new MultiReadAsyncCbInfo(std::move(cb), cb_arg, clock_->NowMicros());
if (ShouldNotifyListeners()) {
cb_info->fs_start_ts_ = FileOperationInfo::StartNow();
}

StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed,
true /*overwrite*/, true /*delay_enabled*/);
s = file_->MultiReadAsync(reqs, num_reqs, opts, read_async_callback, cb_info,
io_handles, num_io_handles, del_fns, nullptr);

RecordTick(stats_, READ_ASYNC_MICROS, elapsed);

// Suppress false positive clang analyzer warnings.
// Memory is not released if file_->ReadAsync returns !s.ok(), because
// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
// called then ReadAsync should always return IOStatus::OK().
#ifndef __clang_analyzer__
if (!s.ok()) {
delete cb_info;
}
#endif // __clang_analyzer__

return s;
}

void RandomAccessFileReader::MultiReadAsyncCallback(const FSReadRequest* reqs,
size_t n_reqs,
void* cb_arg) {
auto cb_info = static_cast<MultiReadAsyncCbInfo*>(cb_arg);
assert(cb_info);
assert(cb_info->cb_);

cb_info->cb_(reqs, n_reqs, cb_info->cb_arg_);

// Update stats and notify listeners.
if (stats_ != nullptr && file_read_hist_ != nullptr) {
// elapsed doesn't take into account delay and overwrite as StopWatch does
// in Read.
uint64_t elapsed = clock_->NowMicros() - cb_info->start_time_;
file_read_hist_->Add(elapsed);
}

for (size_t idx = 0; idx < n_reqs; idx++) {
auto& req = reqs[idx];
if (req.status.ok()) {
RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
} else if (!req.status.IsAborted()) {
RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1);
}
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileReadFinish(req.offset, req.result.size(),
cb_info->fs_start_ts_, finish_ts, req.status);
}
if (!req.status.ok()) {
NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
req.result.size(), req.offset);
}
RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
}
delete cb_info;
}

// RocksDB-Cloud contribution end

} // namespace ROCKSDB_NAMESPACE
14 changes: 13 additions & 1 deletion file/random_access_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#pragma once
#include <atomic>
#include <cstddef>
#include <sstream>
#include <string>

Expand Down Expand Up @@ -211,7 +212,18 @@ class RandomAccessFileReader {
std::function<void(const FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
AlignedBuf* aligned_buf);

void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg);

// RocksDB-Cloud contribution begin
IOStatus MultiReadAsync(
FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts,
std::function<void(const FSReadRequest*, size_t, void*)> cb, void* cb_arg,
void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns,
AlignedBuf* aligned_buf);

// Callback for non-directIO MultiReadAsync.
void MultiReadAsyncCallback(const FSReadRequest*, size_t, void*);
// RocksDB-Cloud contribution end
};
} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit 61223e2

Please sign in to comment.