Skip to content

Commit c1b12ca

Browse files
Tom-Newtonkou
andauthored
apacheGH-38333: [C++][FS][Azure] Implement file writes (apache#38780)
### Rationale for this change Writing files is an important part of the filesystem ### What changes are included in this PR? Implements `OpenOutputStream` and `OpenAppendStream` for Azure. - Initially I started with the implementation from apache#12914 but I made quite a few changes: - Removed the different code path for hierarchical namespace accounts. There should not be any performance advantage to using special APIs only available on hierachical namespace accounts. - Only implement `ObjectAppendStream`, not `ObjectOutputStream`. `OpenOutputStream` is implemented by truncating the existing file then returning a `ObjectAppendStream`. - More precise use of `try` `catch`. Every call to Azure is wrapped in a `try` `catch` and should return a descriptive error status. - Avoid unnecessary calls to Azure. For example we now maintain the block list in memory and commit it only once on flush. apache#12914 committed the block list after each block that was staged and on flush queried Azure to get the list of uncommitted blocks. The new approach is consistent with the Azure fsspec implementation https://github.com/fsspec/adlfs/blob/092685f102c5cd215550d10e8347e5bce0e2b93d/adlfs/spec.py#L2009 - Adjust the block_ids slightly to minimise the risk of them conflicting with blocks written by other blob storage clients. - Implement metadata writes. Includes adding default metadata to `AzureOptions`. - Tests are based on the `gscfs_test.cc` but I added a couple of extra. - Handle the TODO(apacheGH-38780) comments for using the Azure fs to write data in tests ### Are these changes tested? Yes. Everything should be covered by azurite tests ### Are there any user-facing changes? Yes. The Azure filesystem now supports file writes. * Closes: apache#38333 Lead-authored-by: Thomas Newton <[email protected]> Co-authored-by: Sutou Kouhei <[email protected]> Signed-off-by: Sutou Kouhei <[email protected]>
1 parent 5a0e8b6 commit c1b12ca

File tree

3 files changed

+429
-23
lines changed

3 files changed

+429
-23
lines changed

cpp/src/arrow/filesystem/azurefs.cc

Lines changed: 251 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "arrow/buffer.h"
2525
#include "arrow/filesystem/path_util.h"
2626
#include "arrow/filesystem/util_internal.h"
27+
#include "arrow/io/util_internal.h"
2728
#include "arrow/result.h"
2829
#include "arrow/util/checked_cast.h"
2930
#include "arrow/util/formatting.h"
@@ -43,7 +44,8 @@ AzureOptions::AzureOptions() {}
4344
bool AzureOptions::Equals(const AzureOptions& other) const {
4445
return (account_dfs_url == other.account_dfs_url &&
4546
account_blob_url == other.account_blob_url &&
46-
credentials_kind == other.credentials_kind);
47+
credentials_kind == other.credentials_kind &&
48+
default_metadata == other.default_metadata);
4749
}
4850

4951
Status AzureOptions::ConfigureAccountKeyCredentials(const std::string& account_name,
@@ -461,6 +463,225 @@ class ObjectInputFile final : public io::RandomAccessFile {
461463
int64_t content_length_ = kNoSize;
462464
std::shared_ptr<const KeyValueMetadata> metadata_;
463465
};
466+
467+
Status CreateEmptyBlockBlob(
468+
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client) {
469+
try {
470+
block_blob_client->UploadFrom(nullptr, 0);
471+
} catch (const Azure::Storage::StorageException& exception) {
472+
return internal::ExceptionToStatus(
473+
"UploadFrom failed for '" + block_blob_client->GetUrl() +
474+
"' with an unexpected Azure error. There is no existing blob at this "
475+
"location or the existing blob must be replaced so ObjectAppendStream must "
476+
"create a new empty block blob.",
477+
exception);
478+
}
479+
return Status::OK();
480+
}
481+
482+
Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
483+
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client) {
484+
try {
485+
return block_blob_client->GetBlockList().Value;
486+
} catch (Azure::Storage::StorageException& exception) {
487+
return internal::ExceptionToStatus(
488+
"GetBlockList failed for '" + block_blob_client->GetUrl() +
489+
"' with an unexpected Azure error. Cannot write to a file without first "
490+
"fetching the existing block list.",
491+
exception);
492+
}
493+
}
494+
495+
Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
496+
const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
497+
Azure::Storage::Metadata azure_metadata;
498+
for (auto key_value : arrow_metadata->sorted_pairs()) {
499+
azure_metadata[key_value.first] = key_value.second;
500+
}
501+
return azure_metadata;
502+
}
503+
504+
Status CommitBlockList(
505+
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
506+
const std::vector<std::string>& block_ids, const Azure::Storage::Metadata& metadata) {
507+
Azure::Storage::Blobs::CommitBlockListOptions options;
508+
options.Metadata = metadata;
509+
try {
510+
// CommitBlockList puts all block_ids in the latest element. That means in the case of
511+
// overlapping block_ids the newly staged block ids will always replace the
512+
// previously committed blocks.
513+
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
514+
block_blob_client->CommitBlockList(block_ids, options);
515+
} catch (const Azure::Storage::StorageException& exception) {
516+
return internal::ExceptionToStatus(
517+
"CommitBlockList failed for '" + block_blob_client->GetUrl() +
518+
"' with an unexpected Azure error. Committing is required to flush an "
519+
"output/append stream.",
520+
exception);
521+
}
522+
return Status::OK();
523+
}
524+
525+
class ObjectAppendStream final : public io::OutputStream {
526+
public:
527+
ObjectAppendStream(
528+
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
529+
const io::IOContext& io_context, const AzureLocation& location,
530+
const std::shared_ptr<const KeyValueMetadata>& metadata,
531+
const AzureOptions& options, int64_t size = kNoSize)
532+
: block_blob_client_(std::move(block_blob_client)),
533+
io_context_(io_context),
534+
location_(location),
535+
content_length_(size) {
536+
if (metadata && metadata->size() != 0) {
537+
metadata_ = ArrowMetadataToAzureMetadata(metadata);
538+
} else if (options.default_metadata && options.default_metadata->size() != 0) {
539+
metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
540+
}
541+
}
542+
543+
~ObjectAppendStream() override {
544+
// For compliance with the rest of the IO stack, Close rather than Abort,
545+
// even though it may be more expensive.
546+
io::internal::CloseFromDestructor(this);
547+
}
548+
549+
Status Init() {
550+
if (content_length_ != kNoSize) {
551+
DCHECK_GE(content_length_, 0);
552+
pos_ = content_length_;
553+
} else {
554+
try {
555+
auto properties = block_blob_client_->GetProperties();
556+
content_length_ = properties.Value.BlobSize;
557+
pos_ = content_length_;
558+
} catch (const Azure::Storage::StorageException& exception) {
559+
if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
560+
RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
561+
} else {
562+
return internal::ExceptionToStatus(
563+
"GetProperties failed for '" + block_blob_client_->GetUrl() +
564+
"' with an unexpected Azure error. Can not initialise an "
565+
"ObjectAppendStream without knowing whether a file already exists at "
566+
"this path, and if it exists, its size.",
567+
exception);
568+
}
569+
content_length_ = 0;
570+
}
571+
}
572+
if (content_length_ > 0) {
573+
ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
574+
for (auto block : block_list.CommittedBlocks) {
575+
block_ids_.push_back(block.Name);
576+
}
577+
}
578+
return Status::OK();
579+
}
580+
581+
Status Abort() override {
582+
if (closed_) {
583+
return Status::OK();
584+
}
585+
block_blob_client_ = nullptr;
586+
closed_ = true;
587+
return Status::OK();
588+
}
589+
590+
Status Close() override {
591+
if (closed_) {
592+
return Status::OK();
593+
}
594+
RETURN_NOT_OK(Flush());
595+
block_blob_client_ = nullptr;
596+
closed_ = true;
597+
return Status::OK();
598+
}
599+
600+
bool closed() const override { return closed_; }
601+
602+
Status CheckClosed(const char* action) const {
603+
if (closed_) {
604+
return Status::Invalid("Cannot ", action, " on closed stream.");
605+
}
606+
return Status::OK();
607+
}
608+
609+
Result<int64_t> Tell() const override {
610+
RETURN_NOT_OK(CheckClosed("tell"));
611+
return pos_;
612+
}
613+
614+
Status Write(const std::shared_ptr<Buffer>& buffer) override {
615+
return DoAppend(buffer->data(), buffer->size(), buffer);
616+
}
617+
618+
Status Write(const void* data, int64_t nbytes) override {
619+
return DoAppend(data, nbytes);
620+
}
621+
622+
Status Flush() override {
623+
RETURN_NOT_OK(CheckClosed("flush"));
624+
return CommitBlockList(block_blob_client_, block_ids_, metadata_);
625+
}
626+
627+
private:
628+
Status DoAppend(const void* data, int64_t nbytes,
629+
std::shared_ptr<Buffer> owned_buffer = nullptr) {
630+
RETURN_NOT_OK(CheckClosed("append"));
631+
auto append_data = reinterpret_cast<const uint8_t*>(data);
632+
Azure::Core::IO::MemoryBodyStream block_content(append_data, nbytes);
633+
if (block_content.Length() == 0) {
634+
return Status::OK();
635+
}
636+
637+
const auto n_block_ids = block_ids_.size();
638+
639+
// New block ID must always be distinct from the existing block IDs. Otherwise we
640+
// will accidentally replace the content of existing blocks, causing corruption.
641+
// We will use monotonically increasing integers.
642+
auto new_block_id = std::to_string(n_block_ids);
643+
644+
// Pad to 5 digits, because Azure allows a maximum of 50,000 blocks.
645+
const size_t target_number_of_digits = 5;
646+
const auto required_padding_digits =
647+
target_number_of_digits - std::min(target_number_of_digits, new_block_id.size());
648+
new_block_id.insert(0, required_padding_digits, '0');
649+
// There is a small risk when appending to a blob created by another client that
650+
// `new_block_id` may overlapping with an existing block id. Adding the `-arrow`
651+
// suffix significantly reduces the risk, but does not 100% eliminate it. For example
652+
// if the blob was previously created with one block, with id `00001-arrow` then the
653+
// next block we append will conflict with that, and cause corruption.
654+
new_block_id += "-arrow";
655+
new_block_id = Azure::Core::Convert::Base64Encode(
656+
std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));
657+
658+
try {
659+
block_blob_client_->StageBlock(new_block_id, block_content);
660+
} catch (const Azure::Storage::StorageException& exception) {
661+
return internal::ExceptionToStatus(
662+
"StageBlock failed for '" + block_blob_client_->GetUrl() + "' new_block_id: '" +
663+
new_block_id +
664+
"' with an unexpected Azure error. Staging new blocks is fundamental to "
665+
"streaming writes to blob storage.",
666+
exception);
667+
}
668+
block_ids_.push_back(new_block_id);
669+
pos_ += nbytes;
670+
content_length_ += nbytes;
671+
return Status::OK();
672+
}
673+
674+
std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client_;
675+
const io::IOContext io_context_;
676+
const AzureLocation location_;
677+
678+
bool closed_ = false;
679+
int64_t pos_ = 0;
680+
int64_t content_length_ = kNoSize;
681+
std::vector<std::string> block_ids_;
682+
Azure::Storage::Metadata metadata_;
683+
};
684+
464685
} // namespace
465686

466687
// -----------------------------------------------------------------------
@@ -724,6 +945,30 @@ class AzureFileSystem::Impl {
724945

725946
return Status::OK();
726947
}
948+
949+
Result<std::shared_ptr<ObjectAppendStream>> OpenAppendStream(
950+
const AzureLocation& location,
951+
const std::shared_ptr<const KeyValueMetadata>& metadata, const bool truncate,
952+
AzureFileSystem* fs) {
953+
RETURN_NOT_OK(ValidateFileLocation(location));
954+
ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path));
955+
956+
auto block_blob_client = std::make_shared<Azure::Storage::Blobs::BlockBlobClient>(
957+
blob_service_client_->GetBlobContainerClient(location.container)
958+
.GetBlockBlobClient(location.path));
959+
960+
std::shared_ptr<ObjectAppendStream> stream;
961+
if (truncate) {
962+
RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client));
963+
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
964+
location, metadata, options_, 0);
965+
} else {
966+
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
967+
location, metadata, options_);
968+
}
969+
RETURN_NOT_OK(stream->Init());
970+
return stream;
971+
}
727972
};
728973

729974
const AzureOptions& AzureFileSystem::options() const { return impl_->options(); }
@@ -805,12 +1050,14 @@ Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(
8051050

8061051
Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenOutputStream(
8071052
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
808-
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
1053+
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
1054+
return impl_->OpenAppendStream(location, metadata, true, this);
8091055
}
8101056

8111057
Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenAppendStream(
812-
const std::string&, const std::shared_ptr<const KeyValueMetadata>&) {
813-
return Status::NotImplemented("The Azure FileSystem is not fully implemented");
1058+
const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
1059+
ARROW_ASSIGN_OR_RAISE(auto location, AzureLocation::FromString(path));
1060+
return impl_->OpenAppendStream(location, metadata, false, this);
8141061
}
8151062

8161063
Result<std::shared_ptr<AzureFileSystem>> AzureFileSystem::Make(

cpp/src/arrow/filesystem/azurefs.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ struct ARROW_EXPORT AzureOptions {
7777
std::shared_ptr<Azure::Core::Credentials::TokenCredential>
7878
service_principle_credentials_provider;
7979

80+
/// \brief Default metadata for OpenOutputStream.
81+
///
82+
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
83+
std::shared_ptr<const KeyValueMetadata> default_metadata;
84+
8085
AzureOptions();
8186

8287
Status ConfigureAccountKeyCredentials(const std::string& account_name,

0 commit comments

Comments
 (0)