Skip to content

Commit

Permalink
Detaching CloudFileSystemEnv from CloudFileSystem class hierarchy.
Browse files Browse the repository at this point in the history
  • Loading branch information
dpetrov4 committed Jan 14, 2024
1 parent a5e883e commit b2bf52d
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 101 deletions.
13 changes: 6 additions & 7 deletions cloud/aws/aws_file_system.cc
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
// Copyright (c) 2016-present, Rockset, Inc. All rights reserved.
//
#include "rocksdb/cloud/cloud_file_system.h"
#ifndef ROCKSDB_LITE
#include "cloud/aws/aws_file_system.h"

#include <chrono>
#include <cinttypes>
#include <fstream>
#include <iostream>
#include <memory>
#include <set>

#include "cloud/aws/aws_file_system.h"
#include "cloud/cloud_log_controller_impl.h"
#include "cloud/cloud_scheduler.h"
#include "rocksdb/cloud/cloud_storage_provider_impl.h"
#include "cloud/filename.h"
#include "port/port.h"
#include "rocksdb/cloud/cloud_log_controller.h"
#include "rocksdb/cloud/cloud_storage_provider.h"
#include "rocksdb/cloud/cloud_storage_provider_impl.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/utilities/object_registry.h"
Expand Down Expand Up @@ -175,8 +175,7 @@ Status AwsCloudAccessCredentials::GetCredentialsProvider(
AwsFileSystem::AwsFileSystem(const std::shared_ptr<FileSystem>& underlying_fs,
const CloudFileSystemOptions& _cloud_fs_options,
const std::shared_ptr<Logger>& info_log)
: CloudFileSystemImpl(_cloud_fs_options, underlying_fs, info_log) {
}
: CloudFileSystemImpl(_cloud_fs_options, underlying_fs, info_log) {}

// If you do not specify a region, then S3 buckets are created in the
// standard-region which might not satisfy read-your-own-writes. So,
Expand Down Expand Up @@ -223,8 +222,8 @@ Status AwsFileSystem::NewAwsFileSystem(
}
std::unique_ptr<AwsFileSystem> afs(
new AwsFileSystem(fs, cloud_options, info_log));

auto env = afs->NewCompositeEnvFromThis(Env::Default());
auto env =
CloudFileSystemEnv::NewCompositeEnvFromFs(afs.get(), Env::Default());
ConfigOptions config_options;
config_options.env = env.get();
status = afs->PrepareOptions(config_options);
Expand Down
27 changes: 8 additions & 19 deletions cloud/cloud_file_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,18 +417,6 @@ Status CloudFileSystemOptions::Serialize(const ConfigOptions& config_options,
reinterpret_cast<const char*>(this), value);
}

CloudFileSystemEnv::CloudFileSystemEnv(const CloudFileSystemOptions& options,
const std::shared_ptr<FileSystem>& base,
const std::shared_ptr<Logger>& logger)
: cloud_fs_options(options), base_fs_(base), info_log_(logger) {
RegisterOptions(&cloud_fs_options, &cloud_fs_option_type_info);
}

CloudFileSystemEnv::~CloudFileSystemEnv() {
cloud_fs_options.cloud_log_controller.reset();
cloud_fs_options.storage_provider.reset();
}

Status CloudFileSystemEnv::NewAwsFileSystem(
const std::shared_ptr<FileSystem>& base_fs,
const std::string& src_cloud_bucket, const std::string& src_cloud_object,
Expand Down Expand Up @@ -492,12 +480,13 @@ void CloudFileSystemEnv::RegisterCloudObjects(const std::string& arg) {
});
}

std::unique_ptr<Env> CloudFileSystemEnv::NewCompositeEnvFromThis(Env* env) {
std::unique_ptr<Env> CloudFileSystemEnv::NewCompositeEnvFromFs(FileSystem* fs,
Env* env) {
// We need a shared_ptr<FileSystem> pointing to "this", to initialize the
// env wrapper, but we don't want that shared_ptr to own the lifecycle for
// "this". Creating a shared_ptr with a custom no-op deleter instead.
std::shared_ptr<FileSystem> fs(this, [](auto* /*p*/) { /*noop*/ });
return std::make_unique<CompositeEnvWrapper>(env, fs);
std::shared_ptr<FileSystem> fsPtr(fs, [](auto* /*p*/) { /*noop*/ });
return std::make_unique<CompositeEnvWrapper>(env, fsPtr);
}

Status CloudFileSystemEnv::CreateFromString(
Expand Down Expand Up @@ -529,13 +518,13 @@ Status CloudFileSystemEnv::CreateFromString(
copy.invoke_prepare_options = false; // Prepare here, not there
s = ObjectRegistry::NewInstance()->NewUniqueObject<FileSystem>(id, &fs);
if (s.ok()) {
auto* cfs = dynamic_cast<CloudFileSystemEnv*>(fs.get());
auto* cfs = dynamic_cast<CloudFileSystemImpl*>(fs.get());
assert(cfs);
if (!options.empty()) {
s = cfs->ConfigureFromMap(copy, options);
}
if (s.ok() && config_options.invoke_prepare_options) {
auto env = cfs->NewCompositeEnvFromThis(copy.env);
auto env = NewCompositeEnvFromFs(cfs, copy.env);
copy.invoke_prepare_options = config_options.invoke_prepare_options;
copy.env = env.get();
s = cfs->PrepareOptions(copy);
Expand Down Expand Up @@ -583,15 +572,15 @@ Status CloudFileSystemEnv::CreateFromString(
copy.invoke_prepare_options = false; // Prepare here, not there
s = ObjectRegistry::NewInstance()->NewUniqueObject<FileSystem>(id, &fs);
if (s.ok()) {
auto* cfs = dynamic_cast<CloudFileSystemEnv*>(fs.get());
auto* cfs = dynamic_cast<CloudFileSystemImpl*>(fs.get());
assert(cfs);
auto copts = cfs->GetOptions<CloudFileSystemOptions>();
*copts = cloud_options;
if (!options.empty()) {
s = cfs->ConfigureFromMap(copy, options);
}
if (s.ok() && config_options.invoke_prepare_options) {
auto env = cfs->NewCompositeEnvFromThis(copy.env);
auto env = NewCompositeEnvFromFs(cfs, copy.env);
copy.invoke_prepare_options = config_options.invoke_prepare_options;
copy.env = env.get();
s = cfs->PrepareOptions(copy);
Expand Down
9 changes: 7 additions & 2 deletions cloud/cloud_file_system_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ namespace ROCKSDB_NAMESPACE {

CloudFileSystemImpl::CloudFileSystemImpl(
const CloudFileSystemOptions& opts, const std::shared_ptr<FileSystem>& base,
const std::shared_ptr<Logger>& l)
: CloudFileSystemEnv(opts, base, l), purger_is_running_(true) {
const std::shared_ptr<Logger>& logger)
: info_log_(logger),
cloud_fs_options(opts),
base_fs_(base),
purger_is_running_(true) {
if (opts.cloud_file_deletion_delay) {
cloud_file_deletion_scheduler_ = CloudFileDeletionScheduler::Create(
CloudScheduler::Get(), *opts.cloud_file_deletion_delay);
Expand All @@ -45,6 +48,8 @@ CloudFileSystemImpl::~CloudFileSystemImpl() {
cloud_fs_options.cloud_log_controller->StopTailingStream();
}
StopPurger();
cloud_fs_options.cloud_log_controller.reset();
cloud_fs_options.storage_provider.reset();
}

IOStatus CloudFileSystemImpl::ExistsCloudObject(const std::string& fname) {
Expand Down
11 changes: 5 additions & 6 deletions cloud/cloud_file_system_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#include "rocksdb/cloud/cloud_file_system.h"

#include "cloud/cloud_log_controller_impl.h"
#include "rocksdb/cloud/cloud_storage_provider_impl.h"
#include "rocksdb/cloud/cloud_log_controller.h"
#include "rocksdb/cloud/cloud_storage_provider.h"
#include "rocksdb/cloud/cloud_storage_provider_impl.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "test_util/testharness.h"
Expand Down Expand Up @@ -214,8 +214,8 @@ TEST(CloudFileSystemTest, DISABLED_ConfigureKinesisController) {
ASSERT_OK(CloudFileSystem::CreateFromString(
config_options, "id=aws; controller=kinesis; TEST=dbcloud:/test", &cfs));
ASSERT_STREQ(cfs->Name(), "aws");
ASSERT_NE(cfs->GetLogController(), nullptr);
ASSERT_STREQ(cfs->GetLogController()->Name(),
ASSERT_NE(cfs->GetCloudFileSystemOptions().cloud_log_controller, nullptr);
ASSERT_STREQ(cfs->GetCloudFileSystemOptions().cloud_log_controller->Name(),
CloudLogControllerImpl::kKinesis());
#endif
}
Expand All @@ -229,8 +229,8 @@ TEST(CloudFileSystemTest, ConfigureKafkaController) {
#ifdef USE_KAFKA
ASSERT_OK(s);
ASSERT_NE(cfs, nullptr);
ASSERT_NE(cfs->GetLogController(), nullptr);
ASSERT_STREQ(cfs->GetLogController()->Name(),
ASSERT_NE(cfs->GetCloudFileSystemOptions().cloud_log_controller, nullptr);
ASSERT_STREQ(cfs->GetCloudFileSystemOptions().cloud_log_controller->Name(),
CloudLogControllerImpl::kKafka());
#else
ASSERT_NOK(s);
Expand All @@ -244,4 +244,3 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

69 changes: 4 additions & 65 deletions include/rocksdb/cloud/cloud_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -616,29 +616,18 @@ class CloudFileSystem : public FileSystem {
};

//
// The Cloud file system
// The Cloud File System initialization/construction.
//
// NOTE: The AWS SDK must be initialized before the CloudFileSystem is
// constructed, and remain active (Aws::ShutdownAPI() not called) as long as any
// CloudFileSystem objects exist.
class CloudFileSystemEnv : public CloudFileSystem {
protected:
CloudFileSystemOptions cloud_fs_options;
std::shared_ptr<FileSystem> base_fs_; // The underlying file system

class CloudFileSystemEnv {
public:
// Creates a new CompositeEnv from "env" and "this".
// The returned Env must not outlive "this"
std::unique_ptr<Env> NewCompositeEnvFromThis(Env* env);

CloudFileSystemEnv(const CloudFileSystemOptions& options,
const std::shared_ptr<FileSystem>& base,
const std::shared_ptr<Logger>& logger);
static std::unique_ptr<Env> NewCompositeEnvFromFs(FileSystem* fs, Env* env);

public:
mutable std::shared_ptr<Logger> info_log_; // informational messages

virtual ~CloudFileSystemEnv();

static void RegisterCloudObjects(const std::string& mode = "");
static Status CreateFromString(const ConfigOptions& config_options,
const std::string& id,
Expand All @@ -648,56 +637,6 @@ class CloudFileSystemEnv : public CloudFileSystem {
const CloudFileSystemOptions& cloud_options,
std::unique_ptr<CloudFileSystem>* fs);

const char* Name() const override { return "cloud-env"; }

const std::shared_ptr<FileSystem>& GetBaseFileSystem() const override {
return base_fs_;
}

Logger* GetLogger() const override { return info_log_.get(); }

const std::shared_ptr<CloudStorageProvider>& GetStorageProvider()
const override {
return cloud_fs_options.storage_provider;
}

const std::shared_ptr<CloudLogController>& GetLogController() const {
return cloud_fs_options.cloud_log_controller;
}

const std::string& GetSrcBucketName() const override {
return cloud_fs_options.src_bucket.GetBucketName();
}
const std::string& GetSrcObjectPath() const override {
return cloud_fs_options.src_bucket.GetObjectPath();
}
bool HasSrcBucket() const override {
return cloud_fs_options.src_bucket.IsValid();
}

const std::string& GetDestBucketName() const override {
return cloud_fs_options.dest_bucket.GetBucketName();
}
const std::string& GetDestObjectPath() const override {
return cloud_fs_options.dest_bucket.GetObjectPath();
}

bool HasDestBucket() const override {
return cloud_fs_options.dest_bucket.IsValid();
}
bool SrcMatchesDest() const override {
if (HasSrcBucket() && HasDestBucket()) {
return cloud_fs_options.src_bucket == cloud_fs_options.dest_bucket;
} else {
return false;
}
}

// returns the options used to create this object
const CloudFileSystemOptions& GetCloudFileSystemOptions() const override {
return cloud_fs_options;
}

// Create a new AWS file system.
// src_bucket_name: bucket name suffix where db data is read from
// src_object_prefix: all db objects in source bucket are prepended with this
Expand Down
55 changes: 53 additions & 2 deletions include/rocksdb/cloud/cloud_file_system_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ class CloudFileDeletionScheduler;
//
// The Cloud file system
//
class CloudFileSystemImpl : public CloudFileSystemEnv {
class CloudFileSystemImpl : public CloudFileSystem {
friend class CloudFileSystemEnv;

public:
mutable std::shared_ptr<Logger> info_log_; // informational messages

static int RegisterAwsObjects(ObjectLibrary& library, const std::string& arg);
// Constructor
CloudFileSystemImpl(const CloudFileSystemOptions& options,
Expand All @@ -34,7 +36,12 @@ class CloudFileSystemImpl : public CloudFileSystemEnv {

virtual ~CloudFileSystemImpl();
static const char* kClassName() { return kCloud(); }
virtual const char* Name() const override { return kClassName(); }

const std::shared_ptr<FileSystem>& GetBaseFileSystem() const override {
return base_fs_;
}

const char* Name() const override { return kClassName(); }

IOStatus NewSequentialFile(const std::string& fname,
const FileOptions& file_opts,
Expand Down Expand Up @@ -285,6 +292,9 @@ class CloudFileSystemImpl : public CloudFileSystemEnv {
#endif

protected:
CloudFileSystemOptions cloud_fs_options;
std::shared_ptr<FileSystem> base_fs_; // The underlying file system

Status CheckValidity() const;
// Status TEST_Initialize(const std::string& name) override;
// The pathname that contains a list of all db's inside a bucket.
Expand Down Expand Up @@ -368,6 +378,47 @@ class CloudFileSystemImpl : public CloudFileSystemEnv {
const std::string& dbname,
const std::vector<std::string>& active_cookies) override;

public:
// returns the options used to create this object
const CloudFileSystemOptions& GetCloudFileSystemOptions() const override {
return cloud_fs_options;
}

const std::string& GetSrcBucketName() const override {
return cloud_fs_options.src_bucket.GetBucketName();
}
const std::string& GetSrcObjectPath() const override {
return cloud_fs_options.src_bucket.GetObjectPath();
}
bool HasSrcBucket() const override {
return cloud_fs_options.src_bucket.IsValid();
}

const std::string& GetDestBucketName() const override {
return cloud_fs_options.dest_bucket.GetBucketName();
}
const std::string& GetDestObjectPath() const override {
return cloud_fs_options.dest_bucket.GetObjectPath();
}

bool HasDestBucket() const override {
return cloud_fs_options.dest_bucket.IsValid();
}
bool SrcMatchesDest() const override {
if (HasSrcBucket() && HasDestBucket()) {
return cloud_fs_options.src_bucket == cloud_fs_options.dest_bucket;
} else {
return false;
}
}

const std::shared_ptr<CloudStorageProvider>& GetStorageProvider()
const override {
return cloud_fs_options.storage_provider;
}

Logger* GetLogger() const override { return info_log_.get(); }

private:
// Files are invisibile if:
// - It's CLOUDMANFIEST file and cookie is not active. NOTE: empty cookie is
Expand Down

0 comments on commit b2bf52d

Please sign in to comment.