diff --git a/cloud/cloud_file_system.cc b/cloud/cloud_file_system.cc index ff9a4f9924d..0d55313577e 100644 --- a/cloud/cloud_file_system.cc +++ b/cloud/cloud_file_system.cc @@ -11,6 +11,7 @@ #include "cloud/aws/aws_file_system.h" #include "cloud/cloud_log_controller_impl.h" +#include "cloud/cloud_manifest.h" #include "cloud/db_cloud_impl.h" #include "cloud/filename.h" #include "env/composite_env_wrapper.h" @@ -495,7 +496,7 @@ Status CloudFileSystemEnv::CreateFromString( std::string id; std::unordered_map options; Status s; - if (value.find("=") == std::string::npos) { + if (value.find('=') == std::string::npos) { id = value; } else { s = StringToMap(value, &options); @@ -633,5 +634,19 @@ std::unique_ptr CloudFileSystemEnv::NewCompositeEnv( return std::make_unique(env, fs); } +IOStatus CloudFileSystemEnv::LoadCloudManifest( + const std::string& dbname, const std::shared_ptr& fs, + const std::string& cookie, std::unique_ptr* cloud_manifest) { + std::unique_ptr reader; + auto cloud_manifest_file_name = MakeCloudManifestFile(dbname, cookie); + auto s = SequentialFileReader::Create(fs, cloud_manifest_file_name, + FileOptions(), &reader, nullptr /*dbg*/, + nullptr /* rate_limiter */); + if (s.ok()) { + s = CloudManifest::LoadFromLog(std::move(reader), cloud_manifest); + } + return s; +} + } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/cloud/cloud_file_system_impl.cc b/cloud/cloud_file_system_impl.cc index 3d064242237..d534e86c1ec 100644 --- a/cloud/cloud_file_system_impl.cc +++ b/cloud/cloud_file_system_impl.cc @@ -1,7 +1,8 @@ // Copyright (c) 2017 Rockset. -#include "rocksdb/utilities/options_type.h" #ifndef ROCKSDB_LITE +#include "rocksdb/cloud/cloud_file_system_impl.h" + #include #include "cloud/cloud_log_controller_impl.h" @@ -14,7 +15,6 @@ #include "file/writable_file_writer.h" #include "port/port_posix.h" #include "rocksdb/cloud/cloud_file_deletion_scheduler.h" -#include "rocksdb/cloud/cloud_file_system_impl.h" #include "rocksdb/cloud/cloud_log_controller.h" #include "rocksdb/cloud/cloud_storage_provider.h" #include "rocksdb/db.h" @@ -22,6 +22,7 @@ #include "rocksdb/io_status.h" #include "rocksdb/options.h" #include "rocksdb/status.h" +#include "rocksdb/utilities/options_type.h" #include "test_util/sync_point.h" #include "util/xxhash.h" @@ -850,22 +851,8 @@ IOStatus CloudFileSystemImpl::LoadLocalCloudManifest( if (cloud_manifest_) { cloud_manifest_.reset(); } - return CloudFileSystemImpl::LoadLocalCloudManifest( - dbname, GetBaseFileSystem(), cookie, &cloud_manifest_); -} - -IOStatus CloudFileSystemImpl::LoadLocalCloudManifest( - const std::string& dbname, const std::shared_ptr& base_fs, - const std::string& cookie, std::unique_ptr* cloud_manifest) { - std::unique_ptr reader; - auto cloud_manifest_file_name = MakeCloudManifestFile(dbname, cookie); - auto s = SequentialFileReader::Create(base_fs, cloud_manifest_file_name, - FileOptions(), &reader, nullptr /*dbg*/, - nullptr /* rate_limiter */); - if (s.ok()) { - s = CloudManifest::LoadFromLog(std::move(reader), cloud_manifest); - } - return s; + return CloudFileSystemEnv::LoadCloudManifest(dbname, GetBaseFileSystem(), + cookie, &cloud_manifest_); } std::string RemapFilenameWithCloudManifest(const std::string& logical_path, @@ -1349,7 +1336,7 @@ IOStatus CloudFileSystemImpl::NeedsReinitialization( // If the local MANIFEST is not compatible with local CLOUDMANIFEST, we will // need to reinitialize the entire directory. std::unique_ptr cloud_manifest; - auto load_status = LoadLocalCloudManifest( + auto load_status = CloudFileSystemEnv::LoadCloudManifest( local_dir, base_fs, cloud_fs_options.cookie_on_open, &cloud_manifest); if (load_status.ok()) { std::string current_epoch = cloud_manifest->GetCurrentEpoch(); @@ -1616,9 +1603,8 @@ IOStatus CloudFileSystemImpl::LoadCloudManifest(const std::string& local_dbname, // // Create appropriate files in the clone dir // -IOStatus CloudFileSystemImpl::SanitizeDirectory(const DBOptions& options, - const std::string& local_name, - bool read_only) { +IOStatus CloudFileSystemImpl::SanitizeLocalDirectory( + const DBOptions& options, const std::string& local_name, bool read_only) { const auto& local_fs = GetBaseFileSystem(); const IOOptions io_opts; IODebugContext* dbg = nullptr; @@ -1913,11 +1899,6 @@ IOStatus CloudFileSystemImpl::FetchManifest(const std::string& local_dbname, return IOStatus::NotFound(); } -IOStatus CloudFileSystemImpl::CreateCloudManifest( - const std::string& local_dbname) { - return CreateCloudManifest(local_dbname, cloud_fs_options.cookie_on_open); -} - IOStatus CloudFileSystemImpl::CreateCloudManifest( const std::string& local_dbname, const std::string& cookie) { // No cloud manifest, create an empty one diff --git a/cloud/db_cloud_impl.cc b/cloud/db_cloud_impl.cc index e85a6d9c986..c31c17d02f1 100644 --- a/cloud/db_cloud_impl.cc +++ b/cloud/db_cloud_impl.cc @@ -5,7 +5,6 @@ #include -#include "rocksdb/cloud/cloud_file_system_impl.h" #include "cloud/cloud_manifest.h" #include "cloud/filename.h" #include "cloud/manifest_reader.h" @@ -13,6 +12,7 @@ #include "file/file_util.h" #include "file/sst_file_manager_impl.h" #include "logging/auto_roll_logger.h" +#include "rocksdb/cloud/cloud_file_system_impl.h" #include "rocksdb/cloud/cloud_storage_provider.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -45,8 +45,8 @@ class ConstantSizeSstFileManager : public SstFileManagerImpl { } Status OnAddFile(const std::string& file_path) override { - return SstFileManagerImpl::OnAddFile( - file_path, uint64_t(constant_file_size_)); + return SstFileManagerImpl::OnAddFile(file_path, + uint64_t(constant_file_size_)); } private: @@ -97,10 +97,10 @@ Status DBCloud::Open(const Options& opt, const std::string& local_dbname, } auto* cfs = - dynamic_cast(options.env->GetFileSystem().get()); + dynamic_cast(options.env->GetFileSystem().get()); assert(cfs); - if (!cfs->info_log_) { - cfs->info_log_ = options.info_log; + if (!cfs->GetLogger()) { + cfs->SetLogger(options.info_log); } // Use a constant sized SST File Manager if necesary. // NOTE: if user already passes in an SST File Manager, we will respect user's @@ -131,7 +131,7 @@ Status DBCloud::Open(const Options& opt, const std::string& local_dbname, // If cloud manifest is already loaded, this means the directory has been // sanitized (possibly by the call to ListColumnFamilies()) if (cfs->GetCloudManifest() == nullptr) { - st = cfs->SanitizeDirectory(options, local_dbname, read_only); + st = cfs->SanitizeLocalDirectory(options, local_dbname, read_only); if (st.ok()) { st = cfs->LoadCloudManifest(local_dbname, read_only); @@ -324,8 +324,7 @@ Status DBCloudImpl::DoCheckpointToCloud( const BucketOptions& destination, const CheckpointToCloudOptions& options) { std::vector live_files; uint64_t manifest_file_size{0}; - auto* cfs = - dynamic_cast(GetEnv()->GetFileSystem().get()); + auto* cfs = dynamic_cast(GetEnv()->GetFileSystem().get()); assert(cfs); const auto& local_fs = cfs->GetBaseFileSystem(); @@ -334,7 +333,7 @@ Status DBCloudImpl::DoCheckpointToCloud( if (!st.ok()) { return st; } - + // Create a temp MANIFEST file first as this captures all the files we need auto current_epoch = cfs->GetCloudManifest()->GetCurrentEpoch(); auto manifest_fname = ManifestFileWithEpoch("", current_epoch); @@ -346,7 +345,6 @@ Status DBCloudImpl::DoCheckpointToCloud( return st; } - std::vector> files_to_copy; for (auto& f : live_files) { uint64_t number = 0; @@ -453,13 +451,13 @@ Status DBCloud::ListColumnFamilies(const DBOptions& db_options, const std::string& name, std::vector* column_families) { auto* cfs = - dynamic_cast(db_options.env->GetFileSystem().get()); + dynamic_cast(db_options.env->GetFileSystem().get()); assert(cfs); cfs->GetBaseFileSystem()->CreateDirIfMissing(name, IOOptions(), nullptr /*dbg*/); - auto st = cfs->SanitizeDirectory(db_options, name, false); + auto st = cfs->SanitizeLocalDirectory(db_options, name, false); if (st.ok()) { st = cfs->LoadCloudManifest(name, false); } diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 030cd8d07a2..0077febf888 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -10,7 +10,9 @@ #include "file/random_access_file_reader.h" #include +#include #include +#include #include "file/file_util.h" #include "monitoring/histogram.h" @@ -599,4 +601,103 @@ void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req, RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size()); delete read_async_info; } + +// RocksDB-Cloud contribution begin + +// Callback data for non-direct IO version of MultiReadAsync. +struct MultiReadAsyncCbInfo { + MultiReadAsyncCbInfo( + std::function cb, void* cb_arg, + uint64_t start_time) + : cb_(cb), cb_arg_(cb_arg), start_time_(start_time) {} + + std::function cb_; + void* cb_arg_; + uint64_t start_time_; + FileOperationInfo::StartTimePoint fs_start_ts_; +}; + +IOStatus RandomAccessFileReader::MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + AlignedBuf* /* aligned_buf */) { + IOStatus s; + uint64_t elapsed = 0; + + if (use_direct_io()) { + return IOStatus::InvalidArgument( + "DirectIO support not implemented for MultiReadAsync"); + } + + // Create a callback and populate info. + auto read_async_callback = + std::bind_front(&RandomAccessFileReader::MultiReadAsyncCallback, this); + + auto cb_info = + new MultiReadAsyncCbInfo(std::move(cb), cb_arg, clock_->NowMicros()); + if (ShouldNotifyListeners()) { + cb_info->fs_start_ts_ = FileOperationInfo::StartNow(); + } + + StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed, + true /*overwrite*/, true /*delay_enabled*/); + s = file_->MultiReadAsync(reqs, num_reqs, opts, read_async_callback, cb_info, + io_handles, num_io_handles, del_fns, nullptr); + + RecordTick(stats_, READ_ASYNC_MICROS, elapsed); + +// Suppress false positive clang analyzer warnings. +// Memory is not released if file_->ReadAsync returns !s.ok(), because +// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is +// called then ReadAsync should always return IOStatus::OK(). +#ifndef __clang_analyzer__ + if (!s.ok()) { + delete cb_info; + } +#endif // __clang_analyzer__ + + return s; +} + +void RandomAccessFileReader::MultiReadAsyncCallback(const FSReadRequest* reqs, + size_t n_reqs, + void* cb_arg) { + auto cb_info = static_cast(cb_arg); + assert(cb_info); + assert(cb_info->cb_); + + cb_info->cb_(reqs, n_reqs, cb_info->cb_arg_); + + // Update stats and notify listeners. + if (stats_ != nullptr && file_read_hist_ != nullptr) { + // elapsed doesn't take into account delay and overwrite as StopWatch does + // in Read. + uint64_t elapsed = clock_->NowMicros() - cb_info->start_time_; + file_read_hist_->Add(elapsed); + } + + for (size_t idx = 0; idx < n_reqs; idx++) { + auto& req = reqs[idx]; + if (req.status.ok()) { + RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size()); + } else if (!req.status.IsAborted()) { + RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1); + } + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + NotifyOnFileReadFinish(req.offset, req.result.size(), + cb_info->fs_start_ts_, finish_ts, req.status); + } + if (!req.status.ok()) { + NotifyOnIOError(req.status, FileOperationType::kRead, file_name(), + req.result.size(), req.offset); + } + RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size()); + } + delete cb_info; +} + +// RocksDB-Cloud contribution end + } // namespace ROCKSDB_NAMESPACE diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index ea7cfd234f9..4069c4cb3f1 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -9,6 +9,7 @@ #pragma once #include +#include #include #include @@ -211,7 +212,18 @@ class RandomAccessFileReader { std::function cb, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf); - + void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg); + +// RocksDB-Cloud contribution begin + IOStatus MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + AlignedBuf* aligned_buf); + + // Callback for non-directIO MultiReadAsync. + void MultiReadAsyncCallback(const FSReadRequest*, size_t, void*); +// RocksDB-Cloud contribution end }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/cloud/cloud_file_system.h b/include/rocksdb/cloud/cloud_file_system.h index 3a6c6e3531e..ff2e7dbab0c 100644 --- a/include/rocksdb/cloud/cloud_file_system.h +++ b/include/rocksdb/cloud/cloud_file_system.h @@ -29,6 +29,7 @@ namespace ROCKSDB_NAMESPACE { class CloudFileSystem; class CloudLogController; +class CloudManifest; class CloudStorageProvider; enum CloudType : unsigned char { @@ -530,6 +531,27 @@ class CloudFileSystem : public FileSystem { virtual IOStatus CopyLocalFileToDest(const std::string& local_name, const std::string& cloud_name) = 0; + // Returns CloudManifest file name for a given db. + virtual std::string CloudManifestFile(const std::string& dbname) = 0; + + virtual CloudManifest* GetCloudManifest() = 0; + + // TODO(wei): this function is used to temporarily support open db and switch + // cookie. Change it to use the cookie from options once migration complete. + virtual IOStatus CreateCloudManifest(const std::string& local_dbname, + const std::string& cookie) = 0; + + virtual IOStatus LoadCloudManifest(const std::string& local_dbname, + bool read_only) = 0; + + virtual IOStatus UploadCloudManifest(const std::string& local_dbname, + const std::string& cookie) const = 0; + + // Prepare a local directory for use as a clone of the cloud storage + virtual IOStatus SanitizeLocalDirectory(const DBOptions& options, + const std::string& local_name, + bool read_only) = 0; + // Transfers the filename from RocksDB's domain to the physical domain, based // on information stored in CLOUDMANIFEST. // For example, it will map 00010.sst to 00010.sst-[epoch] where [epoch] is @@ -617,6 +639,7 @@ class CloudFileSystem : public FileSystem { const = 0; virtual Logger* GetLogger() const = 0; + virtual void SetLogger(std::shared_ptr) = 0; }; // @@ -671,6 +694,12 @@ class CloudFileSystemEnv { // calls to env, and all file operations to fs static std::unique_ptr NewCompositeEnv( Env* env, const std::shared_ptr& fs); + + // Load CloudManifest from agiven fs/db. + static IOStatus LoadCloudManifest( + const std::string& dbname, const std::shared_ptr& fs, + const std::string& cookie, + std::unique_ptr* cloud_manifest); }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/cloud/cloud_file_system_impl.h b/include/rocksdb/cloud/cloud_file_system_impl.h index cb3d58ca3ce..cb672b10e8a 100644 --- a/include/rocksdb/cloud/cloud_file_system_impl.h +++ b/include/rocksdb/cloud/cloud_file_system_impl.h @@ -141,9 +141,11 @@ class CloudFileSystemImpl : public CloudFileSystem { IOStatus DeleteDbid(const std::string& bucket, const std::string& dbid) override; - IOStatus SanitizeDirectory(const DBOptions& options, - const std::string& clone_name, bool read_only); - IOStatus LoadCloudManifest(const std::string& local_dbname, bool read_only); + IOStatus SanitizeLocalDirectory(const DBOptions& options, + const std::string& local_name, + bool read_only) override; + IOStatus LoadCloudManifest(const std::string& local_dbname, + bool read_only) override; // The separator used to separate dbids while creating the dbid of a clone static constexpr const char* DBID_SEPARATOR = "rockset"; @@ -180,17 +182,10 @@ class CloudFileSystemImpl : public CloudFileSystem { IOStatus LoadLocalCloudManifest(const std::string& dbname, const std::string& cookie); - // Local CLOUDMANIFEST from `base_env` into `cloud_manifest`. - static IOStatus LoadLocalCloudManifest( - const std::string& dbname, const std::shared_ptr& base_fs, - const std::string& cookie, - std::unique_ptr* cloud_manifest); - - IOStatus CreateCloudManifest(const std::string& local_dbname); // TODO(wei): this function is used to temporarily support open db and switch - // cookie. Remove it once that's not needed + // cookie. Change it to use the cookie from options once migration complete. IOStatus CreateCloudManifest(const std::string& local_dbname, - const std::string& cookie); + const std::string& cookie) override; // Transfers the filename from RocksDB's domain to the physical domain, based // on information stored in CLOUDMANIFEST. @@ -243,7 +238,7 @@ class CloudFileSystemImpl : public CloudFileSystem { "CloudFileSystemImpl::IsDirectory() not supported."); } - CloudManifest* GetCloudManifest() { return cloud_manifest_.get(); } + CloudManifest* GetCloudManifest() override { return cloud_manifest_.get(); } IOStatus DeleteCloudFileFromDest(const std::string& fname) override; IOStatus CopyLocalFileToDest(const std::string& local_name, @@ -253,7 +248,7 @@ class CloudFileSystemImpl : public CloudFileSystem { Status ValidateOptions(const DBOptions& /*db_opts*/, const ColumnFamilyOptions& /*cf_opts*/) const override; - std::string CloudManifestFile(const std::string& dbname); + std::string CloudManifestFile(const std::string& dbname) override; // Apply cloud manifest delta to in-memory cloud manifest. Does not change the // on-disk state. @@ -275,7 +270,7 @@ class CloudFileSystemImpl : public CloudFileSystem { // Upload local CLOUDMANIFEST-cookie file only. // REQURIES: the file exists locally IOStatus UploadCloudManifest(const std::string& local_dbname, - const std::string& cookie) const; + const std::string& cookie) const override; // Delete invisible files in cloud. // @@ -418,6 +413,9 @@ class CloudFileSystemImpl : public CloudFileSystem { } Logger* GetLogger() const override { return info_log_.get(); } + void SetLogger(std::shared_ptr l) override { + info_log_ = std::move(l); + } private: // Files are invisibile if: diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 91ad47218e6..453ff6560f5 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -672,7 +673,7 @@ class FileSystem : public Customizable { // // Default implementation is to return IOStatus::OK. - virtual IOStatus Poll(std::vector& /*io_handles*/, + virtual IOStatus Poll(std::vector& io_handles, size_t /*min_completions*/) { return IOStatus::OK(); } @@ -891,7 +892,7 @@ class FSRandomAccessFile { // // When the read request is completed, callback function specified in cb // should be called with arguments cb_arg and the result populated in - // FSReadRequest with result and status fileds updated by FileSystem. + // FSReadRequest with result and status fields updated by FileSystem. // cb_arg should be used by the callback to track the original request // submitted. // @@ -918,6 +919,58 @@ class FSRandomAccessFile { return IOStatus::OK(); } + // RocksDB-Cloud contribution begin + + // This API reads the requested data in a set of FSReadRequest asynchronously. + // This is an asynchronous call, i.e it should return after submitting the + // request. + // + // When the read request is completed, callback function specified in cb + // should be called with arguments cb_arg and the result populated in + // FSReadRequest with result and status fields updated by FileSystem. + // cb_arg should be used by the callback to track the original request + // submitted. + // + // This API should also populate io_handles which should be used by + // underlying FileSystem to store the context in order to distinguish the read + // requests at their side and provide the custom deletion functions in + // del_fns. RocksDB guarantees that the del_fn for io_handle will be called + // after receiving the callback. Furthermore, RocksDB guarantees that if it + // calls the Poll API for this io_handle, del_fn will be called after the Poll + // returns. RocksDB is responsible for managing the lifetime of io_handles. + // + // The caller preallocates io_handles and del_fns arrays to be be the same + // size as the number of requests (num_reqs). num_io_handles parameter is + // used to pass out the information about how many io_handles (and + // corresponding del_funs) were populated during the call. num_io_handles + // must be pre-initiailized to the maximum size of io_handles/del_funs arrays + // (num_reqs) on the function call. + // + // reqs contains the request offset and size passed as input parameter of read + // request and result and status fields are output parameter set by underlying + // FileSystem. The data should always be read into scratch field. + // + // Default implementation is syncrhonous and delegates to MultiRead. + virtual IOStatus MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** /*io_handles*/, size_t* num_io_handles, + IOHandleDeleter* /*del_fns*/, IODebugContext* dbg) { + assert(*num_io_handles == num_reqs); + *num_io_handles = 0; + + auto status = MultiRead(reqs, num_reqs, opts, dbg); + if (!status.ok()) { + return status; + } + + // the operation has completed successfully, execute callbacks + cb(reqs, num_reqs, cb_arg); + return IOStatus::OK(); + } + + // RocksDB-Cloud contribution end + // EXPERIMENTAL // When available, returns the actual temperature for the file. This is // useful in case some outside process moves a file from one tier to another, @@ -1616,6 +1669,16 @@ class FSRandomAccessFileWrapper : public FSRandomAccessFile { IODebugContext* dbg) override { return target()->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, dbg); } + // RocksDB-Cloud contribution begin + IOStatus MultiReadAsync( + FSReadRequest* reqs, size_t num_reqs, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handles, size_t* num_io_handles, IOHandleDeleter* del_fns, + IODebugContext* dbg) override { + return target()->MultiReadAsync(reqs, num_reqs, opts, cb, cb_arg, + io_handles, num_io_handles, del_fns, dbg); + } + // RocksDB-Cloud contribution end Temperature GetTemperature() const override { return target_->GetTemperature(); } diff --git a/util/async_file_reader.cc b/util/async_file_reader.cc index 080c1ae9668..64241a44f1f 100644 --- a/util/async_file_reader.cc +++ b/util/async_file_reader.cc @@ -7,6 +7,8 @@ #if USE_COROUTINES #include "util/async_file_reader.h" +// RocksDB-Cloud contribution: restructured to use MultiReadAsync + namespace ROCKSDB_NAMESPACE { bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) { if (tail_) { @@ -16,25 +18,37 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) { if (!head_) { head_ = awaiter; } + awaiter->next_ = nullptr; + num_reqs_ += awaiter->num_reqs_; awaiter->io_handle_.resize(awaiter->num_reqs_); awaiter->del_fn_.resize(awaiter->num_reqs_); - for (size_t i = 0; i < awaiter->num_reqs_; ++i) { - IOStatus s = awaiter->file_->ReadAsync( - awaiter->read_reqs_[i], awaiter->opts_, - [](const FSReadRequest& req, void* cb_arg) { - FSReadRequest* read_req = static_cast(cb_arg); - read_req->status = req.status; - read_req->result = req.result; - }, - &awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->del_fn_[i], - /*aligned_buf=*/nullptr); - if (!s.ok()) { - // For any non-ok status, the FileSystem will not call the callback - // So let's update the status ourselves + size_t num_io_handles = awaiter->num_reqs_; + IOStatus s = awaiter->file_->MultiReadAsync( + awaiter->read_reqs_, awaiter->num_reqs_, awaiter->opts_, + [](const FSReadRequest* reqs, size_t n_reqs, void* cb_arg) { + FSReadRequest* read_reqs = static_cast(cb_arg); + if (read_reqs != reqs) { + for (size_t idx = 0; idx < n_reqs; idx++) { + read_reqs[idx].status = reqs[idx].status; + read_reqs[idx].result = reqs[idx].result; + } + } + }, + awaiter->read_reqs_, (void**)&awaiter->io_handle_[0], &num_io_handles, + &awaiter->del_fn_[0], + /*aligned_buf=*/nullptr); + if (!s.ok()) { + assert(num_io_handles == 0); + // For any non-ok status, the FileSystem will not call the callback + // So let's update the status ourselves assuming the whole batch failed. + for (size_t i = 0; i < awaiter->num_reqs_; ++i) { awaiter->read_reqs_[i].status = s; } } + assert(num_io_handles <= awaiter->num_reqs_); + awaiter->io_handle_.resize(num_io_handles); + awaiter->del_fn_.resize(num_io_handles); return true; } @@ -42,33 +56,42 @@ void AsyncFileReader::Wait() { if (!head_) { return; } - ReadAwaiter* waiter; + + // TODO: No need to copy if we have 1 awaiter. + // Poll API seems to encourage inefficiency. std::vector io_handles; - IOStatus s; io_handles.reserve(num_reqs_); - waiter = head_; + + ReadAwaiter* waiter = head_; do { - for (size_t i = 0; i < waiter->num_reqs_; ++i) { + for (size_t i = 0; i < waiter->io_handle_.size(); ++i) { if (waiter->io_handle_[i]) { io_handles.push_back(waiter->io_handle_[i]); } } } while (waiter != tail_ && (waiter = waiter->next_)); + + IOStatus s = IOStatus::OK(); if (io_handles.size() > 0) { StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS); s = fs_->Poll(io_handles, io_handles.size()); } + do { waiter = head_; head_ = waiter->next_; - for (size_t i = 0; i < waiter->num_reqs_; ++i) { + for (size_t i = 0; i < waiter->io_handle_.size(); ++i) { if (waiter->io_handle_[i] && waiter->del_fn_[i]) { waiter->del_fn_[i](waiter->io_handle_[i]); } - if (waiter->read_reqs_[i].status.ok() && !s.ok()) { - // Override the request status with the Poll error - waiter->read_reqs_[i].status = s; + } + if (!s.ok()) { + for (size_t i = 0; i < waiter->num_reqs_; ++i) { + if (waiter->read_reqs_[i].status.ok()) { + // Override the request status with the Poll error + waiter->read_reqs_[i].status = s; + } } } waiter->awaiting_coro_.resume(); diff --git a/util/async_file_reader.h b/util/async_file_reader.h index df69a840ebb..4ed9b4c3246 100644 --- a/util/async_file_reader.h +++ b/util/async_file_reader.h @@ -82,8 +82,8 @@ class AsyncFileReader { const IOOptions& opts_; FSReadRequest* read_reqs_; size_t num_reqs_; - autovector io_handle_; - autovector del_fn_; + std::vector io_handle_; + std::vector del_fn_; folly::coro::impl::coroutine_handle<> awaiting_coro_; // Use this to link to the next ReadAwaiter in the suspended coroutine // list. The head and tail of the list are tracked by AsyncFileReader.