From 587966675ebab29ffe68393307bd3354d886b70c Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Sat, 4 Mar 2023 17:16:48 -0500 Subject: [PATCH 1/9] Initial set of files (compile-tested only) --- cmake/sdks.cmake | 2 +- .../include/aws/transfer-crt/DownloadStream.h | 96 +++++++++ .../include/aws/transfer-crt/Metadata.h | 99 +++++++++ .../include/aws/transfer-crt/TransferHandle.h | 200 ++++++++++++++++++ .../source/transfer-crt/DownloadStream.cpp | 153 ++++++++++++++ .../source/transfer-crt/TransferHandle.cpp | 110 ++++++++++ 6 files changed, 659 insertions(+), 1 deletion(-) create mode 100644 src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/DownloadStream.h create mode 100644 src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h create mode 100644 src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h create mode 100644 src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp create mode 100644 src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp diff --git a/cmake/sdks.cmake b/cmake/sdks.cmake index 1c6dd8c8c98..dc601a49b81 100644 --- a/cmake/sdks.cmake +++ b/cmake/sdks.cmake @@ -2,7 +2,7 @@ include(sdksCommon) set(SDK_DEPENDENCY_BUILD_LIST "") -set(NON_GENERATED_CLIENT_LIST access-management text-to-speech core queues s3-encryption identity-management transfer) ## Manually generated code with a name mimicking client name +set(NON_GENERATED_CLIENT_LIST access-management text-to-speech core queues s3-encryption identity-management transfer transfer-crt) ## Manually generated code with a name mimicking client name if(REGENERATE_CLIENTS OR REGENERATE_DEFAULTS) message(STATUS "Checking for SDK generation requirements") diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/DownloadStream.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/DownloadStream.h new file mode 100644 index 00000000000..f5fb696240f --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/DownloadStream.h @@ -0,0 +1,96 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { + +// Used by the classes below to notify the receiver of low-level file errors. +using ErrorCallback = std::function; + +// Default size for the put buffer (which is bypassed when xsputn is used). +// Measurements in "The Linux Programming Interface" show that a minimum 4096B is +// required when O_SYNC is enabled. Use a larger value to aggregate small writes. +constexpr size_t DEFAULT_BUFSIZE = 1 << 20; + +// Helper class for DownloadStream. +// +// This implements only what DownloadStream needs: a simple, file-descriptor based streambuf. +// Hence many std::streambuf operations, such as seekoff/pos, are not supported. +// The expected use-case is that mostly xsputn(const char *, size_t) will be called. +// +// The ErrorCallback that is passed into the constructor is invoked when encountering a +// low-level write error, receiving a string describing the error cause (based on errno). +class FileDescriptorBuf : public std::streambuf { + public: + // Class does not own the file descriptor @fd - caller is responsible for closing it. + FileDescriptorBuf(int fd, ErrorCallback errorCallback, size_t bufsize = DEFAULT_BUFSIZE) + : fd_{fd}, errorCallback_{errorCallback}, buffer_{Aws::MakeUniqueArray(bufsize, "FdBuf")} { + setp(buffer_.get(), buffer_.get() + bufsize); + } + + protected: + int sync() override; + int overflow(int_type c) override; + std::streamsize xsputn(const char *data, std::streamsize datalen) override; + + private: + int fd_; + ErrorCallback errorCallback_; + Aws::UniqueArrayPtr buffer_; +}; + +// Download output stream class for a given @dstPath. +// +// This takes an Error Callback which gets invoked with descriptive error message when a failure +// occurs in either this class, or the contained FileDescriptorBuf. +// +// The constructor does the following: +// 1. Create any missing directory components of @dstPath. +// 2. Generate a temporary .partial file to write to. This file will be renamed into @dstPath +// upon successful completion, or removed on failure. The implementation uses mkostemp(3), +// which is the reason we are using a file-descriptor based backend. +// 3. Open a file descriptor to the temporary file and advise the kernel about its use. +// 4. Complete the construction of the iostream, using a FileDescriptorBuf as rdbuf. +// +// The Error Callback @ec may be invoked already before the constructor call returns. +// It is also invoked by the contained FileDescriptorBuf, and during close(). +class DownloadStream final : public std::iostream { + public: + // Create a DownloadStream for @dstPath, calling @ec if any failure happens. + // Enabling O_SYNC via @sync_always is optional, as it degrades download performance. + DownloadStream(const Aws::String &dstPath, ErrorCallback ec, bool sync_always = false); + ~DownloadStream(); + + // Set eof, close the temporary file and atomically rename it into @dstPath. + void close() noexcept; + + private: + void _error(Aws::String msg) { + setstate(std::ios::badbit); + errorCallback_(std::move(msg)); + } + + private: + const Aws::String dstPath_; + Aws::String dstTempPath_; + ErrorCallback errorCallback_; + + int fd_ = -1; + Aws::UniquePtr buf_; + std::mutex close_mutex_; +}; + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h new file mode 100644 index 00000000000..468f31ec35c --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h @@ -0,0 +1,99 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once + +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { + +// WriteMetadata specifies blob metadata information. +// @uri: destination URI of the blob, in :/// format. +// @content_type: MIME type of the @uri content. +// @content_encoding: content encoding that was applied. +// @metadata: metadata key/value pairs. +struct WriteMetadata { + // Constructor for the default case - just create a blob at @uri. + explicit WriteMetadata(Aws::String uri) : WriteMetadata(uri, "", "") {} + + WriteMetadata(Aws::String uri, + Aws::String content_type, + Aws::String content_encoding, + Aws::Map metadata = {}) + : uri{uri}, + content_type{content_type}, + content_encoding{content_encoding}, + metadata{metadata} {} + + // Destination URI of the blob, in :/// format. + Aws::String uri; + + // Content-Type (MIME type) of @uri. + Aws::String content_type; + + // Content-Encoding (if any) of @uri. + Aws::String content_encoding; + + // Metadata key/value pairs. + Aws::Map metadata; + + // S3 Object Tagging key/value pairs (S3 objects only). + // These require s3:PutObjectTagging permissions on @uri, otherwise requests fail with 403. + // The tags also have to satisfy the following syntax restrictions and limits: + // * https://docs.aws.amazon.com/AmazonS3/latest/userguide/tagging-managing.html + // * https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html + // * https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/allocation-tag-restrictions.html + Aws::Map tags; +}; + +// ReadMetadata encapsulates the metadata associated with a given blob. +struct ReadMetadata { + // URI of the blob. + Aws::String uri; + + // Size of @path in bytes. + size_t size = 0; + + // Date/time the blob was last modified. + std::chrono::system_clock::time_point last_modified; + + // MIME type of the blob. + Aws::String content_type; + + // Indicates whether the data at @path is stored in compressed format (RFC 7231, 3.1.2.2). + Aws::String content_encoding; + + // ETag value. + Aws::String etag; + + // Metadata key/value pairs. + Aws::Map metadata; +}; + +static inline std::ostream &operator<<(std::ostream &os, const ReadMetadata &md) { + os << "ReadMetadata(\"" << md.uri << "\", " << md.size; + + time_t lm = std::chrono::system_clock::to_time_t(md.last_modified); + if (lm) { // Format: "Wed Jun 30 21:49:08 1993\n" - truncate before " 1993\n": + os << ", " << Aws::String{ctime(&lm), 19}; + } + if (!md.etag.empty()) os << ", " << md.etag; + if (!md.content_type.empty()) { + os << ", " << md.content_type; + if (!md.content_encoding.empty()) os << " (" << md.content_encoding << ")"; + } + if (!md.metadata.empty()) { + os << ","; + for (const auto &e : md.metadata) os << " " << e.first << "=" << e.second; + } + return os << ")"; +} + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h new file mode 100644 index 00000000000..547fc7c537d --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h @@ -0,0 +1,200 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { + +enum class TransferStatus : uint8_t { + // + // TransferHandle states: + // + // The three terminal states are CANCELED, FAILED, and COMPLETED. + // + // NOT_STARTED indicates that no S3CrtClient API call has been made yet. This means that no + // status/progress callbacks will be invoked. Unlike IN_PROGRESS, we do not have to wait for + // these when transitioning to FAILED or CANCELED. + // + // IN_PROGRESS indicates that an API call was made, which means we must wait until its final + // .shutdown_callback (Handle{Put,Get}ObjectResponse()) is called. The AWS SDK and aws-c-s3 + // expect all request data structures to still be alive until then (in particular the response + // body, which may be updated several times in order to record errors). + // + // FAILING is an intermediate state that is entered from IN_PROGRESS to record that a failure + // (e.g. local write error) occurred while the transfer was progressing. It will be converted + // into FAILED once the .shutdown_callback is called. + // + NOT_STARTED, // No S3CrtClient API call has been made yet. + IN_PROGRESS, // An API call was made, and the transfer is running. + FAILING, // A failure occurred while the transfer was running. + CANCELED, // Transfer was canceled. + FAILED, // Transfer failed. + COMPLETED, // Transfer completed successfully. +}; +Aws::String StatusToString(TransferStatus status); +Aws::OStream &operator<<(Aws::OStream &s, TransferStatus status); + +// Interface for interacting with asynchronous UPLOAD/DOWNLOAD transfers. +class TransferHandle final { + public: + // Upload to the destination specified in @md. + TransferHandle(const WriteMetadata &md, + const std::shared_ptr &context); + + // Download from @srcUri. + TransferHandle(const Aws::String &srcUri, + const std::shared_ptr &context); + + /* + * Thread-safe Getter methods (values set only at initialization time). + */ + + // Bucket/key part of blob location in Amazon S3. + Aws::String GetBucket() const; + Aws::String GetKey() const; + + // S3 storage tags (key/value pairs). + const Aws::String &GetTagging() const { return tagging_; } + + // Get the user-provided context that was passed at construction time. + std::shared_ptr GetContext() const { return m_context; } + + /* + * Get/set methods called after initialization that are based on atomic variables. + * These methods synchronize-with each other via the affected atomic variable. + */ + // Get/Set the CANCEL flag, which cancels any further processing. + bool ShouldContinue() const { return !m_cancel.load(); } + void Cancel() { m_cancel.store(true); } + + // Get/update the cumulative byte count transferred since start of the transfer. + uint64_t GetBytesTransferred() const { return m_bytesTransferred.load(); } + void UpdateBytesTransferred(uint64_t amount) { m_bytesTransferred.fetch_add(amount); } + + /* + * Getters/setters called after initialization that synchronize via @m_getterSetterLock. + */ + + // Return the blob metadata information (populated for both download and upload). + // Pre-condition: function may only be called if BytesTotalSizeHasBeenSet() returns true. + const ReadMetadata &GetReadMetadata() const { + std::lock_guard guard{m_getterSetterLock}; + assert(total_size_has_been_set_); + return rmd_; + } + + // Set the total size of the object being transferred. May be called at most once. + void SetBytesTotalSize(uint64_t size) { + std::lock_guard guard{m_getterSetterLock}; + assert(!total_size_has_been_set_); + total_size_has_been_set_ = true; + rmd_.size = size; + } + // Check whether SetBytesTotalSize has been called (see TransferManager::GetObject for details). + bool BytesTotalSizeHasBeenSet() const { return total_size_has_been_set_; } + + // Set the LastModified time of the blob. + void SetLastModified(const Aws::Utils::DateTime &lastDateTime) { + std::lock_guard guard{m_getterSetterLock}; + rmd_.last_modified = lastDateTime.UnderlyingTimestamp(); + } + + // Set the ETag of the blob. + void SetETag(const Aws::String &etag) { + std::lock_guard guard{m_getterSetterLock}; + rmd_.etag = etag; + } + + // Set the Content-Type of the blob. + void SetContentType(const Aws::String &contentType) { + std::lock_guard guard{m_getterSetterLock}; + rmd_.content_type = contentType; + } + + // Set the Content-Encoding of the blob (e.g. "gzip" when compressing content). + void SetContentEncoding(const Aws::String &encoding) { + std::lock_guard guard{m_getterSetterLock}; + rmd_.content_encoding = encoding; + } + + // Set the metadata key/value pairs associated with the blob. + void SetMetadata(const Aws::Map &metadata) { + std::lock_guard guard{m_getterSetterLock}; + rmd_.metadata = metadata; + } + + // Get/set the last error that was encountered (if any). + // Check GetStatus() first, as the default value is S3Crt::S3CrtErrors::UNKNOWN. + const Aws::Client::AWSError GetLastError() const { + std::lock_guard guard{m_getterSetterLock}; + return m_lastError; + } + void SetError(const Aws::Client::AWSError &error) { + std::lock_guard guard{m_getterSetterLock}; + m_lastError = error; + } + + /* + * Modifiers that only synchronize via @m_statusLock. + */ + // Get current TransferStatus of this handle. + // Synchronizes-with UpdateStatus. + TransferStatus GetStatus() const; + + // Block on (internal) condition variable until handle reaches a 'finished' status. + // Synchronizes-with UpdateStatus. + void WaitUntilFinished() const; + + // Conditionally transition into @status. + void UpdateStatus(TransferStatus status); + + // Return true if @value equals one of the terminal states. + static bool IsFinishedStatus(TransferStatus value) { + return value == TransferStatus::COMPLETED || value == TransferStatus::FAILED || + value == TransferStatus::CANCELED; + } + + private: + std::atomic m_bytesTransferred{0}; + std::atomic m_cancel{false}; + const std::shared_ptr m_context{nullptr}; + // S3 storage tags (upload only). + const Aws::String tagging_; + + // Variables that are protected by @m_getterSetterLock: + mutable std::mutex m_getterSetterLock; + // Blob metadata (uri, size, ...) - used for both upload and download blobs. + ReadMetadata rmd_{}; + // Flag that indicates whether @rmd_.size has been initialized. + std::atomic total_size_has_been_set_{false}; + Aws::Client::AWSError m_lastError{S3Crt::S3CrtErrors::UNKNOWN, true}; + + // Variables that are protected by @m_statusLock: + mutable std::mutex m_statusLock; + mutable std::promise m_finishedSignal; + TransferStatus m_status = TransferStatus::NOT_STARTED; +}; + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp new file mode 100644 index 00000000000..5aa25c3b878 --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp @@ -0,0 +1,153 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include + +#ifndef _GNU_SOURCE /* mkostemp() */ +#define _GNU_SOURCE +#endif +#include +#include +#include +#include +#include +#include + +#include + +namespace Aws { +namespace TransferCrt { + +namespace { +// Return the parent directory of @path, or an empty string if not possible. +Aws::String parent_directory(const Aws::String &path) { + const size_t n = path.find_last_of(Aws::FileSystem::PATH_DELIM); + return n == Aws::String::npos ? "" : path.substr(0, n); +} +} // namespace + +/* + * FileDescriptorBuf methods. + */ +int FileDescriptorBuf::overflow(int_type c) { + return sync() == EOF ? EOF : (c == EOF ? 0 : sputc(c)); +} + +int FileDescriptorBuf::sync() { + if (pbase() && pptr() > pbase()) { + std::streamsize n = xsputn(pbase(), pptr() - pbase()); + if (n == EOF) { + return EOF; + } + pbump(-n); + } + return 0; +} + +// On failure, invoke the error callback with the description of the errno value. +// It also throws an ios::failure in case exceptions() has been called on the stream. +std::streamsize FileDescriptorBuf::xsputn(const char *data, std::streamsize datalen) { + for (std::streamsize written = 0, n = 0; written < datalen; written += n) { + n = ::write(fd_, data + written, datalen - written); + if (n < 0 && errno != EINTR && errno != EAGAIN) { + Aws::StringStream ss; + ss << "write error: " << ::strerror(errno); + errorCallback_(ss.str()); + return EOF; + } + } + return datalen; +} + +/* + * DownloadStream methods. + */ +DownloadStream::DownloadStream(const Aws::String &dstPath, ErrorCallback ec, bool sync_always) + : std::iostream{nullptr}, + dstPath_{dstPath}, + dstTempPath_{dstPath + ".partial.XXXXXX"}, + errorCallback_{ec} { + const Aws::String parent_path = parent_directory(dstPath_); + Aws::StringStream ss; + + assert(!dstPath_.empty()); + assert(errorCallback_); + + // Generate any missing directory components. + if (!parent_path.empty() && Aws::FileSystem::CreateDirectoryIfNotExists(parent_path.c_str(), true)) { + ss << "Failed to create " << dstPath << " parent directories."; + _error(ss.str()); + return; + } + + // Produce unique temporary-file suffix. Use O_SYNC to ensure data gets written out to disk. + fd_ = ::mkostemp(&dstTempPath_[0], sync_always ? O_SYNC : 0); + if (fd_ < 0) { + ss << "Failed to create " << dstTempPath_ << ": " << ::strerror(errno); + _error(ss.str()); + return; + } + + // Advise the kernel that the data used by @fd_ will not be accessed in the near time. + if (::posix_fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED) < 0) { + ss << "Failed to posix_fadvise('" << dstTempPath_ << "'): " << ::strerror(errno); + _error(ss.str()); + return; + } + + buf_ = Aws::MakeUnique("FdBuf", fd_, [this](Aws::String writeError) { + Aws::StringStream ss; + ss << "Failed to write " << dstTempPath_ << ": " << std::move(writeError); + _error(ss.str()); + return; + }); + rdbuf(buf_.get()); +} + +void DownloadStream::close() noexcept { + Aws::StringStream ss; + std::lock_guard closer(close_mutex_); + + if (eof()) { // Idempotent. + return; + } + + // Call rdbuf()->pubsync() one last time, to empty the put-buffer: + flush(); + + setstate(std::ios::eofbit); + + if (fd_ > 0 && ::close(fd_) < 0) { + ss << "Failed to close " << dstTempPath_ << ": " << ::strerror(errno); + setstate(std::ios::failbit); + _error(ss.str()); + } + fd_ = -1; + + if (bad()) { + _error("Stream is corrupt on close"); + } else if (!dstTempPath_.empty()) { + if (::rename(dstTempPath_.c_str(), dstPath_.c_str())) { + ss << "Failed to rename " << dstTempPath_ << ": " << ::strerror(errno); + _error(ss.str()); + } + dstTempPath_.clear(); + } +} + +DownloadStream::~DownloadStream() { + if (fd_ > 0) { + ::close(fd_); + } + + if (!dstTempPath_.empty()) { + ::unlink(dstTempPath_.c_str()); + } +} + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp new file mode 100644 index 00000000000..24147e5b74f --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp @@ -0,0 +1,110 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { + +namespace { +using KeyValue = std::pair; + +Aws::String encode_query_string(const Aws::Map &key_values) { + return std::accumulate(key_values.begin(), + key_values.end(), + Aws::String{}, + [](const Aws::String &prev, const KeyValue &cur) { + return (prev.empty() ? "" : prev + "&") + cur.first + + (cur.second.empty() ? "" : "=") + cur.second; + }); +} +} // namespace + +TransferHandle::TransferHandle(const WriteMetadata &md, + const std::shared_ptr &ctx) + : m_context{ctx}, tagging_{encode_query_string(md.tags)}, rmd_{} { + rmd_.uri = md.uri; + SetContentType(md.content_type); + SetContentEncoding(md.content_encoding); + SetMetadata(md.metadata); +} + +TransferHandle::TransferHandle(const Aws::String &srcUri, + const std::shared_ptr &ctx) + : m_context{ctx}, rmd_{} { + rmd_.uri = srcUri; +} + +Aws::String TransferHandle::GetBucket() const { + const size_t start = sizeof("s3://") - 1; + const size_t end = rmd_.uri.find('/', start); + return rmd_.uri.substr(start, end - start); +} + +Aws::String TransferHandle::GetKey() const { + const size_t bucket_start = sizeof("s3://") - 1; + const size_t bucket_end = rmd_.uri.find('/', bucket_start); + if (bucket_end == Aws::String::npos) { + return ""; + } + return rmd_.uri.substr(bucket_end + 1); +} + +void TransferHandle::UpdateStatus(TransferStatus value) { + // Release any pending per-chunk requests on failure. + if (value == TransferStatus::FAILING || value == TransferStatus::FAILED) { + Cancel(); + } + + std::unique_lock lock(m_statusLock); + assert(m_status != TransferStatus::FAILING || value == TransferStatus::FAILED); + + // The following ensures exactly one transition from "not finished" to "finished": + if (!IsFinishedStatus(m_status) && value != m_status) { + m_status = value; + if (IsFinishedStatus(value)) { + m_finishedSignal.set_value(); + } + } +} + +TransferStatus TransferHandle::GetStatus() const { + std::lock_guard lock(m_statusLock); + return m_status; +} + +void TransferHandle::WaitUntilFinished() const { + if (!IsFinishedStatus(GetStatus())) { + m_finishedSignal.get_future().wait(); + } +} + +Aws::String StatusToString(TransferStatus status) { + switch (status) { + case TransferStatus::NOT_STARTED: + return "NOT_STARTED"; + case TransferStatus::IN_PROGRESS: + return "IN_PROGRESS"; + case TransferStatus::FAILING: + return "FAILING"; + case TransferStatus::CANCELED: + return "CANCELED"; + case TransferStatus::FAILED: + return "FAILED"; + case TransferStatus::COMPLETED: + return "COMPLETED"; + } + return "UNKNOWN"; +} + +Aws::OStream &operator<<(Aws::OStream &s, TransferStatus status) { + return s << StatusToString(status); +} + +} // namespace TransferCrt +} // namespace Aws From 1c89a8ee13ae2511e7722908d17cfa057494c9dc Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Sun, 5 Mar 2023 16:02:27 -0500 Subject: [PATCH 2/9] Add TransferManager sources (leaving metadata encoding/decoding out for now) --- .../include/aws/transfer-crt/TransferHandle.h | 3 +- .../aws/transfer-crt/TransferManager.h | 125 ++++++ .../source/transfer-crt/TransferManager.cpp | 392 ++++++++++++++++++ 3 files changed, 519 insertions(+), 1 deletion(-) create mode 100644 src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h create mode 100644 src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h index 547fc7c537d..af3695e68c6 100644 --- a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferHandle.h @@ -7,11 +7,12 @@ #include #include +#include #include #include #include -#include #include +#include #include #include diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h new file mode 100644 index 00000000000..d31cf7ebca4 --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h @@ -0,0 +1,125 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { + +// Callback type used for updates. +using HandleUpdatedCallback = std::function &)>; + +// Helper structs to carry around a TransferHandle as part of the caller context. +struct UploadContext final : Aws::Client::AsyncCallerContext { + explicit UploadContext(std::shared_ptr th) : handle{std::move(th)} {} + std::shared_ptr handle; +}; + +struct DownloadContext final : Aws::Client::AsyncCallerContext { + explicit DownloadContext(std::shared_ptr th) : handle{std::move(th)} {} + + ~DownloadContext() { + if (!dstStreamOwnershipHasBeenTransferred) { + delete dstStream; + } + } + + std::shared_ptr handle; + bool dstStreamOwnershipHasBeenTransferred = false; + std::iostream *dstStream = nullptr; + std::streambuf *dstStreamBuf = nullptr; +}; + +// Minimal re-implementation of the S3 TransferManager for the S3CrtClient. +// +// All public methods are non-blocking and return a pointer to an asynchronous TransferHandle. +class TransferManager final : public std::enable_shared_from_this { + public: + // Callbacks are invoked under the following conditions: + // - uploadProgressCallback: when the number of bytes-sent changes, + // - downloadProgressCallback: when the number of bytes-received changes, + // - statusChangedCallback: when the handle changes status. + // NOTE: code maintains the invariant that @statusChangedCallback + // is called at most once for a "finished" TransferStatus. + static std::shared_ptr Create( + std::shared_ptr s3Client, + const HandleUpdatedCallback &uploadProgressCallback, + const HandleUpdatedCallback &downloadProgressCallback, + const HandleUpdatedCallback &statusChangedCallback); + + // Upload from @srcPath or @srcStream to @md.uri via PutObjectAsync. + // If @srcStream is not set, open the input file at @srcPath. + std::shared_ptr UploadFile( + const Aws::String &srcPath, + const std::shared_ptr &srcStream, + const WriteMetadata &md, + const std::shared_ptr &context = nullptr); + + // Download from @srcUri to local @dstPath or @dstStreamBuf, via GetObjectAsync. + // If both @dstPath and @dstStreamBuf are specified, @dstStreamBuf is used. + std::shared_ptr DownloadFile( + const Aws::String &srcUri, + const Aws::String &dstPath, + std::streambuf *dstStreamBuf = nullptr, + const std::shared_ptr &context = nullptr); + + private: + // The constructor supports a "cancel all transfers when the first failure is encountered" + // optional policy, which is not exposed to the outside. It enforces the invariant that a + // bulk transfer succeeds only after all of its managed transfers have succeeded. + TransferManager(std::shared_ptr s3Client, + const HandleUpdatedCallback &uploadProgressCallback, + const HandleUpdatedCallback &downloadProgressCallback, + const HandleUpdatedCallback &statusChangedCallback, + bool cancel_on_first_failure = true); + + void PutObject(const std::shared_ptr &streamToPut, + const std::shared_ptr &handle); + + void GetObject(const std::shared_ptr &context, + const std::shared_ptr &handle); + + void HandlePutObjectResponse(const Aws::S3Crt::S3CrtClient *, + const Aws::S3Crt::Model::PutObjectRequest &, + const Aws::S3Crt::Model::PutObjectOutcome &, + const std::shared_ptr &); + + void HandleGetObjectResponse(const Aws::S3Crt::S3CrtClient *, + const Aws::S3Crt::Model::GetObjectRequest &, + Aws::S3Crt::Model::GetObjectOutcome, + const std::shared_ptr &); + + // Record that an error has occurred. + // Set FAILED state if no API call has been made yet, otherwise transition to FAILING state. + void Fail(std::shared_ptr handle, + std::string errorMsg, + std::string exceptionMsg = "FATAL ERROR"); + + void OnUploadProgress(const std::shared_ptr &handle); + void OnDownloadProgress(const std::shared_ptr &handle); + void OnStatusChanged(const std::shared_ptr &handle); + + private: + std::shared_ptr s3Client_; + // Cancel all new/pending transfers after the first failure (optional policy): + const bool cancel_on_first_failure_; + std::atomic failure_occurred_{false}; + + HandleUpdatedCallback uploadProgressCallback; + HandleUpdatedCallback downloadProgressCallback; + HandleUpdatedCallback statusChangedCallback; +}; + +} // namespace TransferCrt +} // namespace Aws diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp new file mode 100644 index 00000000000..c039bd32fe8 --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp @@ -0,0 +1,392 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include +#include + +namespace Aws { +namespace TransferCrt { +constexpr char CLASS_TAG[] = "TransferManager"; + +std::shared_ptr TransferManager::Create( + std::shared_ptr s3Client, + const HandleUpdatedCallback &up, + const HandleUpdatedCallback &down, + const HandleUpdatedCallback &statusChanged) { + return std::shared_ptr(new TransferManager{s3Client, up, down, statusChanged}); +} + +TransferManager::TransferManager(std::shared_ptr s3Client, + const HandleUpdatedCallback &uploadProgressCallback, + const HandleUpdatedCallback &downloadProgressCallback, + const HandleUpdatedCallback &statusChangedCallback, + bool cancel_on_first_failure) + : s3Client_{s3Client}, + cancel_on_first_failure_{cancel_on_first_failure}, + uploadProgressCallback{uploadProgressCallback}, + downloadProgressCallback{downloadProgressCallback}, + statusChangedCallback{statusChangedCallback} {} + +std::shared_ptr TransferManager::UploadFile( + const Aws::String &srcPath, + const std::shared_ptr &inputStream, + const WriteMetadata &md, + const std::shared_ptr &ctx) { + auto handle = Aws::MakeShared(CLASS_TAG, md, ctx); + Aws::StringStream ss; + + if (cancel_on_first_failure_ && failure_occurred_) { + handle->UpdateStatus(TransferStatus::CANCELED); + OnStatusChanged(handle); + return handle; + } + + std::shared_ptr srcStream{inputStream}; + if (!srcStream) { + srcStream = + Aws::MakeShared(CLASS_TAG, + srcPath, std::ios_base::in | std::ios_base::binary); + } + + if (!srcStream->good()) { + ss << "Failed to open stream '" << srcPath << "': " + << (errno ? ::strerror(errno) : "not in a good state"); + Fail(handle, ss.str()); + return handle; + } + + // Determine length by seeking to the end. + std::streampos cur = srcStream->tellg(); + std::streampos end = srcStream->rdbuf()->pubseekoff(0, std::ios_base::end); + srcStream->seekg(cur, std::ios_base::beg); + if (!srcStream->good() || cur < 0 || end < 0) { + ss << "Failed to determine size of '" << srcPath << "': " + << (errno ? ::strerror(errno) : "stream error"); + Fail(handle, ss.str()); + return handle; + } + handle->SetBytesTotalSize(end - cur); + + PutObject(srcStream, handle); + return handle; +} + +void TransferManager::PutObject(const std::shared_ptr &streamToPut, + const std::shared_ptr &handle) { + const auto &blobMetadata = handle->GetReadMetadata(); + auto putObjectRequest = Aws::S3Crt::Model::PutObjectRequest() + .WithBucket(handle->GetBucket()) + .WithKey(handle->GetKey()) + .WithContentLength(blobMetadata.size); + + // Grant the bucket owner full control (see AV-48995). + putObjectRequest.SetACL(Aws::S3Crt::Model::ObjectCannedACL::bucket_owner_full_control); + putObjectRequest.SetContentType(blobMetadata.content_type); + if (!blobMetadata.content_encoding.empty()) { + putObjectRequest.SetContentEncoding(blobMetadata.content_encoding); + } + if (!handle->GetTagging().empty()) { + putObjectRequest.SetTagging(handle->GetTagging()); + } + + putObjectRequest.SetBody(streamToPut); + + // AmazonWebServiceRequest methods: + putObjectRequest.SetContinueRequestHandler( + [handle](const Aws::Http::HttpRequest *) { return handle->ShouldContinue(); }); + + // Keep transfer manager alive until all callbacks are finished: + auto self = shared_from_this(); + + putObjectRequest.SetDataSentEventHandler( + [self, handle](const Aws::Http::HttpRequest *, long long /*NOLINT*/ progress) { + handle->UpdateBytesTransferred(progress); + self->OnUploadProgress(handle); + }); + + auto callback = Aws::S3Crt::PutObjectResponseReceivedHandler{ + [self](const Aws::S3Crt::S3CrtClient *client, + const Aws::S3Crt::Model::PutObjectRequest &request, + const Aws::S3Crt::Model::PutObjectOutcome &outcome, + const std::shared_ptr &context) { + self->HandlePutObjectResponse(client, request, outcome, context); + }}; + + auto asyncContext = Aws::MakeShared(CLASS_TAG, handle); + + // Transition to IN_PROGRESS right before the API call is made. + // This is necessary since the body of PutObjectAsync() may call HandlePutObjectResponse(). + handle->UpdateStatus(TransferStatus::IN_PROGRESS); + OnStatusChanged(handle); + + s3Client_->PutObjectAsync(putObjectRequest, callback, asyncContext); +} + +void TransferManager::HandlePutObjectResponse( + const Aws::S3Crt::S3CrtClient *, + const Aws::S3Crt::Model::PutObjectRequest &, + const Aws::S3Crt::Model::PutObjectOutcome &outcome, + const std::shared_ptr &context) { + const auto &handle = std::dynamic_pointer_cast(context)->handle; + + switch (handle->GetStatus()) { + case TransferStatus::IN_PROGRESS: + if (outcome.IsSuccess()) { + handle->SetETag(outcome.GetResult().GetETag()); + handle->UpdateStatus(TransferStatus::COMPLETED); + } else { + handle->UpdateStatus(handle->ShouldContinue() ? TransferStatus::FAILED + : TransferStatus::CANCELED); + handle->SetError(outcome.GetError()); + } + break; + case TransferStatus::FAILING: + break; + default: + Fail(handle, "Invalid pre-final state " + StatusToString(handle->GetStatus())); + } + + if (handle->GetStatus() == TransferStatus::FAILING) { + handle->UpdateStatus(TransferStatus::FAILED); + } + OnStatusChanged(handle); +} + +std::shared_ptr TransferManager::DownloadFile( + const Aws::String &srcUri, + const Aws::String &dstPath, + std::streambuf *dstStreamBuf, + const std::shared_ptr &ctx) { + auto handle = Aws::MakeShared(CLASS_TAG, srcUri, ctx); + auto context = Aws::MakeShared(CLASS_TAG, handle); + + if (cancel_on_first_failure_ && failure_occurred_) { + handle->UpdateStatus(TransferStatus::CANCELED); + OnStatusChanged(handle); + return handle; + } + + if (dstStreamBuf) { + context->dstStream = new std::iostream(dstStreamBuf); + context->dstStreamBuf = dstStreamBuf; + } else { + // Note that the OnStatusChanged callback may be used before this function returns: + context->dstStream = new DownloadStream(dstPath, [handle, this](std::string msg) { + Fail(handle, std::move(msg), "DownloadStream Failure"); + }); + } + + // Creation of the DownloadStream may have failed, transitioning from NOT_STARTED => FAILED. + if (handle->GetStatus() != TransferStatus::FAILED) { + GetObject(context, handle); + } + return handle; +} + +// Perform the actual download. +void TransferManager::GetObject(const std::shared_ptr &context, + const std::shared_ptr &handle) { + auto getRequest = Aws::S3Crt::Model::GetObjectRequest() + .WithBucket(handle->GetBucket()) + .WithKey(handle->GetKey()); + + // Invoking the factory function below passes the @dstStream to a StandardHttpResponse object, + // whose ResponseStream owns and manages the pointer once the factory function is called. + // + // Unlike the Aws::Transfer::TransferManager, we are not allocating a new object here; instead, + // a pointer to the already allocated object is passed. It is ok to do this, since the lambda + // is only invoked once during the lifetime of the request - in InitCommonCrtRequestOption, when + // populating the response. In contrast to the Aws::Transfer::TransferManager, the CRT code does + // not use the factory function for retries (these are internally handled by aws-c-s3). + getRequest.SetResponseStreamFactory(Aws::IOStreamFactory([context]() { + context->dstStreamOwnershipHasBeenTransferred = true; + return context->dstStream; + })); + + // Keep transfer manager alive until all callbacks are finished: + auto self = shared_from_this(); + + getRequest.SetContinueRequestHandler( + [handle](const Aws::Http::HttpRequest *) { return handle->ShouldContinue(); }); + + getRequest.SetDataReceivedEventHandler([self, context, handle](const Aws::Http::HttpRequest *, + Aws::Http::HttpResponse *res, + long long /*NOLINT*/ amount) { + // + // Set the total size after the first chunk has been processed (AV-120158, AV-171186). + // + // The aws-c-s3 code populates the Content-Length header of the HttpResponse after the + // first part of the ranged-Get has completed. Extract total-size information from this. + if (!handle->BytesTotalSizeHasBeenSet() && handle->ShouldContinue() && + handle->GetStatus() == TransferStatus::IN_PROGRESS) { + std::string length_str; + + for (const auto &hdr : res->GetHeaders()) { + if (Aws::Utils::StringUtils::ToLower(hdr.first.c_str()) == "content-length") { + length_str = hdr.second; + } + } + if (length_str.empty()) { + self->Fail(handle, "Response lacks Content-Length header"); + } else { + char *end = nullptr; + const uint64_t content_length = std::strtoull(length_str.c_str(), &end, 0); + + if (end != nullptr && *end != '\0') { + self->Fail(handle, "Invalid Content-Length: " + length_str); + } else { + handle->SetBytesTotalSize(content_length); + } + } + } + + handle->UpdateBytesTransferred(amount); + self->OnDownloadProgress(handle); + }); + + auto callback = Aws::S3Crt::GetObjectResponseReceivedHandler{ + [self](const Aws::S3Crt::S3CrtClient *client, + const Aws::S3Crt::Model::GetObjectRequest &request, + Aws::S3Crt::Model::GetObjectOutcome outcome, + const std::shared_ptr &context) { + self->HandleGetObjectResponse(client, request, std::move(outcome), context); + }}; + + // Transition to IN_PROGRESS right before the API call is made. + // This is necessary since the body of GetObjectAsync() may call HandleGetObjectResponse(). + handle->UpdateStatus(TransferStatus::IN_PROGRESS); + OnStatusChanged(handle); + + s3Client_->GetObjectAsync(getRequest, callback, context); +} + +void TransferManager::HandleGetObjectResponse( + const Aws::S3Crt::S3CrtClient *, + const Aws::S3Crt::Model::GetObjectRequest &, + Aws::S3Crt::Model::GetObjectOutcome outcome, + const std::shared_ptr &context) { + auto ctx = std::dynamic_pointer_cast(context); + const auto &handle = ctx->handle; + + switch (handle->GetStatus()) { + case TransferStatus::IN_PROGRESS: + if (outcome.IsSuccess()) { + // At this stage, the total size should have been filled in. This is done by the + // aws-c-s3 code, passing the Content-Length header via the headers_callback to + // the HttpResponse (see GetObject() above and AV-120158). + if (!handle->BytesTotalSizeHasBeenSet()) { + // In the special case of an empty file, the aws-c-s3 code does not use the + // body_callback and so the DataReceivedEventHandler is also not called. + if (outcome.GetResult().GetContentLength() == 0) { + handle->SetBytesTotalSize(0); + } else { + Fail(handle, "Total size has not been filled in during transfer"); + break; + } + } else if (handle->GetReadMetadata().size != + static_cast(outcome.GetResult().GetContentLength())) { + Fail(handle, "Total size differs from Content-Length", "DATA CORRUPTED"); + break; + } + + // If the user specified a DownloadStream, close and rename the partial file here. + // In the error case, the DownloadStream destructor will remove the partial file. + DownloadStream *d = dynamic_cast(ctx->dstStream); + if (d != nullptr) { + // On error, the call to Fail() within d invokes UpdateStatus(FAILING), which + // completes before d->close() returns. This causes a transition to FAILING. + d->close(); + } else { + // Explicitly close the file buffer here in order to catch local write errors. + std::filebuf *fb = dynamic_cast(ctx->dstStreamBuf); + if (fb != nullptr && fb->close() == nullptr) { + Fail(handle, "Failed to close download stream - output likely corrupt"); + } + } + + // State may have changed due to calling Fail(). + if (handle->GetStatus() != TransferStatus::FAILING) { + handle->UpdateStatus(TransferStatus::COMPLETED); + handle->SetLastModified(outcome.GetResult().GetLastModified()); + handle->SetContentType(outcome.GetResult().GetContentType()); + handle->SetContentEncoding(outcome.GetResult().GetContentEncoding()); + handle->SetETag(outcome.GetResult().GetETag()); + } + } else { + handle->UpdateStatus(handle->ShouldContinue() ? TransferStatus::FAILED + : TransferStatus::CANCELED); + handle->SetError(outcome.GetError()); + } + break; + case TransferStatus::FAILING: + break; + default: + Fail(handle, "Invalid pre-final state " + StatusToString(handle->GetStatus())); + } + + if (handle->GetStatus() == TransferStatus::FAILING) { + handle->UpdateStatus(TransferStatus::FAILED); + } + OnStatusChanged(handle); +} + +void TransferManager::Fail(std::shared_ptr handle, + std::string msg, + std::string exceptionMsg) { + switch (handle->GetStatus()) { + case TransferStatus::NOT_STARTED: + handle->UpdateStatus(TransferStatus::FAILED); + break; + case TransferStatus::IN_PROGRESS: + handle->UpdateStatus(TransferStatus::FAILING); + break; + case TransferStatus::FAILING: + msg = handle->GetLastError().GetMessage() + "\n" + msg; + break; + case TransferStatus::FAILED: + case TransferStatus::COMPLETED: + case TransferStatus::CANCELED: + return; + } + handle->SetError({S3Crt::S3CrtErrors::UNKNOWN, std::move(exceptionMsg), std::move(msg), false}); + OnStatusChanged(handle); +} + +void TransferManager::OnStatusChanged(const std::shared_ptr &handle) { + if (cancel_on_first_failure_ && (handle->GetStatus() == TransferStatus::FAILING || + handle->GetStatus() == TransferStatus::FAILED)) { + failure_occurred_ = true; + } + if (statusChangedCallback) { + statusChangedCallback(handle); + } +} + +void TransferManager::OnUploadProgress(const std::shared_ptr &handle) { + // The progress callbacks are only called while the transfer is in progress. + // If another transfer has failed in the meantime, cancel this and all other ones (AV-171186). + if (cancel_on_first_failure_ && failure_occurred_) { + handle->Cancel(); + } + if (uploadProgressCallback) { + uploadProgressCallback(handle); + } +} + +void TransferManager::OnDownloadProgress(const std::shared_ptr &handle) { + if (cancel_on_first_failure_ && failure_occurred_) { + handle->Cancel(); + } + if (downloadProgressCallback) { + downloadProgressCallback(handle); + } +} + +} // namespace TransferCrt +} // namespace Aws From 04cd1f770370080a804851fadb31e071f906099b Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Sun, 5 Mar 2023 18:03:51 -0500 Subject: [PATCH 3/9] Add DownloadStream test --- cmake/sdksCommon.cmake | 1 + src/aws-cpp-sdk-transfer-crt/CMakeLists.txt | 53 +++ .../source/transfer-crt/DownloadStream.cpp | 13 +- .../source/transfer-crt/TransferHandle.cpp | 2 +- .../CMakeLists.txt | 30 ++ .../DownloadStreamTests.cpp | 320 ++++++++++++++++++ .../RunTests.cpp | 29 ++ 7 files changed, 441 insertions(+), 7 deletions(-) create mode 100644 src/aws-cpp-sdk-transfer-crt/CMakeLists.txt create mode 100644 tests/aws-cpp-sdk-transfer-crt-tests/CMakeLists.txt create mode 100644 tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp create mode 100644 tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp diff --git a/cmake/sdksCommon.cmake b/cmake/sdksCommon.cmake index f622543374d..e127f73b7c8 100644 --- a/cmake/sdksCommon.cmake +++ b/cmake/sdksCommon.cmake @@ -171,6 +171,7 @@ list(APPEND SDK_TEST_PROJECT_LIST "s3-encryption:tests/aws-cpp-sdk-s3-encryption list(APPEND SDK_TEST_PROJECT_LIST "s3control:tests/aws-cpp-sdk-s3control-integration-tests") list(APPEND SDK_TEST_PROJECT_LIST "sqs:tests/aws-cpp-sdk-sqs-integration-tests") list(APPEND SDK_TEST_PROJECT_LIST "transfer:tests/aws-cpp-sdk-transfer-tests") +list(APPEND SDK_TEST_PROJECT_LIST "transfer-crt:tests/aws-cpp-sdk-transfer-crt-tests") list(APPEND SDK_TEST_PROJECT_LIST "text-to-speech:tests/aws-cpp-sdk-text-to-speech-tests,tests/aws-cpp-sdk-polly-sample") list(APPEND SDK_TEST_PROJECT_LIST "transcribestreaming:tests/aws-cpp-sdk-transcribestreaming-integration-tests") list(APPEND SDK_TEST_PROJECT_LIST "eventbridge:tests/aws-cpp-sdk-eventbridge-tests") diff --git a/src/aws-cpp-sdk-transfer-crt/CMakeLists.txt b/src/aws-cpp-sdk-transfer-crt/CMakeLists.txt new file mode 100644 index 00000000000..3a1c7802fd2 --- /dev/null +++ b/src/aws-cpp-sdk-transfer-crt/CMakeLists.txt @@ -0,0 +1,53 @@ +add_project(aws-cpp-sdk-transfer-crt + "High-level C++ SDK for file transfer to/from AWS S3 (CRT variant)" + aws-cpp-sdk-s3-crt + aws-cpp-sdk-core) + +file( GLOB TRANSFER_HEADERS "include/aws/transfer-crt/*.h" ) + +file( GLOB TRANSFER_SOURCE "source/transfer-crt/*.cpp" ) + +if(MSVC) + source_group("Header Files\\aws\\transfer-crt" FILES ${TRANSFER_HEADERS}) + source_group("Source Files\\transfer-crt" FILES ${TRANSFER_SOURCE}) +endif() + +file(GLOB ALL_TRANSFER_HEADERS + ${TRANSFER_HEADERS} +) + +file(GLOB ALL_TRANSFER_SOURCE + ${TRANSFER_SOURCE} +) + +file(GLOB ALL_TRANSFER + ${ALL_TRANSFER_HEADERS} + ${ALL_TRANSFER_SOURCE} +) + +set(TRANSFER_INCLUDES + "${CMAKE_CURRENT_SOURCE_DIR}/include/" + ) + +include_directories(${TRANSFER_INCLUDES}) + +if(USE_WINDOWS_DLL_SEMANTICS AND BUILD_SHARED_LIBS) + add_definitions("-DAWS_TRANSFER_EXPORTS") +endif() + +add_library(${PROJECT_NAME} ${ALL_TRANSFER}) +add_library(AWS::${PROJECT_NAME} ALIAS ${PROJECT_NAME}) + +target_include_directories(${PROJECT_NAME} PUBLIC + $ + $) +target_link_libraries(${PROJECT_NAME} PRIVATE ${PLATFORM_DEP_LIBS} ${PROJECT_LIBS}) + +set_compiler_flags(${PROJECT_NAME}) +set_compiler_warnings(${PROJECT_NAME}) + +setup_install() + +install (FILES ${ALL_TRANSFER_HEADERS} DESTINATION ${INCLUDE_DIRECTORY}/aws/transfer) + +do_packaging() diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp index 5aa25c3b878..cc245d23610 100644 --- a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp @@ -24,12 +24,12 @@ namespace TransferCrt { namespace { // Return the parent directory of @path, or an empty string if not possible. -Aws::String parent_directory(const Aws::String &path) { +Aws::String ParentPath(const Aws::String &path) { const size_t n = path.find_last_of(Aws::FileSystem::PATH_DELIM); return n == Aws::String::npos ? "" : path.substr(0, n); -} +} } // namespace - + /* * FileDescriptorBuf methods. */ @@ -71,14 +71,14 @@ DownloadStream::DownloadStream(const Aws::String &dstPath, ErrorCallback ec, boo dstPath_{dstPath}, dstTempPath_{dstPath + ".partial.XXXXXX"}, errorCallback_{ec} { - const Aws::String parent_path = parent_directory(dstPath_); + const Aws::String parent_path = ParentPath(dstPath_); Aws::StringStream ss; assert(!dstPath_.empty()); assert(errorCallback_); // Generate any missing directory components. - if (!parent_path.empty() && Aws::FileSystem::CreateDirectoryIfNotExists(parent_path.c_str(), true)) { + if (!parent_path.empty() && !Aws::FileSystem::CreateDirectoryIfNotExists(parent_path.c_str(), true)) { ss << "Failed to create " << dstPath << " parent directories."; _error(ss.str()); return; @@ -86,6 +86,7 @@ DownloadStream::DownloadStream(const Aws::String &dstPath, ErrorCallback ec, boo // Produce unique temporary-file suffix. Use O_SYNC to ensure data gets written out to disk. fd_ = ::mkostemp(&dstTempPath_[0], sync_always ? O_SYNC : 0); + std::cerr << "fd " << fd_ << " " << dstTempPath_ <<" " << strerror(errno)<< "\n"; if (fd_ < 0) { ss << "Failed to create " << dstTempPath_ << ": " << ::strerror(errno); _error(ss.str()); @@ -101,7 +102,7 @@ DownloadStream::DownloadStream(const Aws::String &dstPath, ErrorCallback ec, boo buf_ = Aws::MakeUnique("FdBuf", fd_, [this](Aws::String writeError) { Aws::StringStream ss; - ss << "Failed to write " << dstTempPath_ << ": " << std::move(writeError); + ss << "Failed to write " << dstTempPath_ << ": " << std::move(writeError); _error(ss.str()); return; }); diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp index 24147e5b74f..72aede39ac4 100644 --- a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp @@ -11,7 +11,7 @@ namespace Aws { namespace TransferCrt { -namespace { +namespace { using KeyValue = std::pair; Aws::String encode_query_string(const Aws::Map &key_values) { diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/CMakeLists.txt b/tests/aws-cpp-sdk-transfer-crt-tests/CMakeLists.txt new file mode 100644 index 00000000000..403b6d6b865 --- /dev/null +++ b/tests/aws-cpp-sdk-transfer-crt-tests/CMakeLists.txt @@ -0,0 +1,30 @@ +add_project(aws-cpp-sdk-transfer-crt-tests + "Tests for the AWS TransferManager (CRT version) of the C++ SDK" + aws-cpp-sdk-transfer-crt + aws-cpp-sdk-s3-crt + testing-resources + aws-cpp-sdk-core) + +# Headers are included in the source so that they show up in Visual Studio. +# They are included elsewhere for consistency. + +file(GLOB TRANSFER_TEST_SRC + "${CMAKE_CURRENT_SOURCE_DIR}/*.cpp" +) + +if(MSVC AND BUILD_SHARED_LIBS) + add_definitions(-DGTEST_LINKED_AS_SHARED_LIBRARY=1) +endif() + +enable_testing() + +if(PLATFORM_ANDROID AND BUILD_SHARED_LIBS) + add_library(${PROJECT_NAME} ${TRANSFER_TEST_SRC}) +else() + add_executable(${PROJECT_NAME} ${TRANSFER_TEST_SRC}) +endif() + +set_compiler_flags(${PROJECT_NAME}) +set_compiler_warnings(${PROJECT_NAME}) + +target_link_libraries(${PROJECT_NAME} ${PROJECT_LIBS}) diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp b/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp new file mode 100644 index 00000000000..ebcc2be0a71 --- /dev/null +++ b/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp @@ -0,0 +1,320 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace Aws { +namespace TransferCrt { +namespace { + +TEST(Construction, FileDescriptorBuf) { + static_assert(!std::is_default_constructible{}, "Should not permit default construction"); + static_assert(!std::is_trivially_destructible{}, "Non-trivial destructor"); + static_assert(std::is_nothrow_destructible{}, "Should not throw during destruction"); + static_assert(!std::is_copy_constructible{}, "Should not have copy constructor"); + static_assert(!std::is_copy_assignable{}, "Should not permit copy assignment"); + static_assert(std::is_move_constructible{}, "Should be move-constructible"); + static_assert(std::is_move_assignable{}, "Should support move assignment"); + static_assert(!std::is_trivially_move_constructible{}, "Non-trivial move constructor"); + static_assert(!std::is_trivially_move_assignable{}, "Non-trivial move assignment"); +} + +TEST(Construction, DownloadStream) { + static_assert(!std::is_default_constructible{}, "Should not permit default construction"); + static_assert(!std::is_trivially_destructible{}, "Non-trivial destructor"); + static_assert(std::is_nothrow_destructible{}, "Should not throw during destruction"); + static_assert(!std::is_copy_constructible{}, "Should not have copy constructor"); + static_assert(!std::is_copy_assignable{}, "Should not permit copy assignment"); + static_assert(!std::is_move_constructible{}, "Should not be move-constructible"); + static_assert(!std::is_move_assignable{}, "Should not support move assignment"); +} + +namespace { +// Return the parent directory of @path, or an empty string if not possible. +Aws::String ParentPath(const Aws::String &path) { + const size_t n = path.find_last_of(Aws::FileSystem::PATH_DELIM); + return n == Aws::String::npos ? "" : path.substr(0, n); +} + +// Stolen from endpoint/BuiltInParameters.cpp. +bool StringEndsWith(const Aws::String& str, const Aws::String& suffix) { + if (suffix.size() > str.size()) + return false; + return std::equal(suffix.rbegin(), suffix.rend(), str.rbegin()); +} +} // namespace + +// Test fixture to help set up / tear down DownloadStream test cases. +class DownloadStreamtest : public ::testing::Test { + public: + DownloadStreamtest() : dst_{GetTestFilesDirectory() + "/test.file"} {} + + int TestFile() { + const Aws::String parent_path = ParentPath(dst_); + Aws::FileSystem::CreateDirectoryIfNotExists(parent_path.c_str(), true); + std::cerr << dst_ << " " << parent_path << "\n"; // XXX + return ::open(dst_.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600); + } + + Aws::String TestFileContents() { + Aws::StringStream ss; + std::ifstream is{dst_}; + + is >> std::noskipws >> ss.rdbuf(); + return ss.str(); + } + + int UnlinkTestFile() { return ::unlink(dst_.c_str()); } + + static Aws::String GetTestFilesDirectory() { + Aws::String directory; +#ifdef __ANDROID__ + directory = Aws::FileSystem::Join(Aws::Platform::GetCacheDirectory(), "TransferCrtTests"); +#else + directory = "TransferCrtTests"; +#endif // __ANDROID__ + + Aws::FileSystem::CreateDirectoryIfNotExists(directory.c_str()); + return directory; + } + + static void SetUpTestCase() { + Aws::FileSystem::DeepDeleteDirectory(GetTestFilesDirectory().c_str()); + } + + static void TearDownTestCase() { + Aws::FileSystem::DeepDeleteDirectory(GetTestFilesDirectory().c_str()); + } + + protected: + Aws::String dst_; + Aws::String test_data_{"the quick brown fox jumps over the lazy lackadaisical lapdog"}; +}; + +TEST_F(DownloadStreamtest, FdEnsureCallbackAndExceptionWork) { + Aws::String errMsg; + FileDescriptorBuf fdb(-1, [&errMsg](Aws::String e) { errMsg = std::move(e); }); + // ACTION + ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), EOF); + // VERIFICATION + EXPECT_EQ(errMsg, "write error: Bad file descriptor"); +} + +TEST_F(DownloadStreamtest, FdSupportedMethods) { + // Test assumptions as to which methods are supported. Mostly VERIFICATION in this test. + Aws::String errMsg; + int fd = TestFile(); + FileDescriptorBuf fdb(fd, [&errMsg](Aws::String e) { errMsg = std::move(e); }); + char alternateBuf[128] = {0}; + + std::cerr << "ERROR: "<< errMsg << "\n";; // XXX + ASSERT_GT(fd, 2); + ASSERT_EQ(fdb.pubsync(), 0); + ASSERT_EQ(fdb.sputc('a'), 'a'); + ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), (std::streamsize)test_data_.size()); + ASSERT_EQ(errMsg, ""); + + // Show that pubsetbuf has no effect on this class. + ASSERT_EQ(::lseek(fd, 0, SEEK_SET), 0); + ASSERT_EQ(::ftruncate(fd, 0), 0); + + ASSERT_EQ(fdb.pubsetbuf(alternateBuf, sizeof(alternateBuf)), &fdb); + ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), (std::streamsize)test_data_.size()); + ASSERT_EQ(::close(fd), 0); + ASSERT_STREQ(alternateBuf, ""); // Nothing got transferred. + ASSERT_EQ(TestFileContents(), test_data_); // Wrote to fd, as intended. + + ASSERT_EQ(errMsg, ""); +} + +TEST_F(DownloadStreamtest, FdUnsupportedMethods) { + // Document which methods are not supported. Mostly VERIFICATION in this test. + Aws::String errMsg; + int fd = TestFile(); + FileDescriptorBuf fdb(fd, [&errMsg](Aws::String e) { errMsg = std::move(e); }); + char buf[3] = {0}; + + ASSERT_GT(fd, 2); + + ASSERT_EQ(fdb.pubseekoff(0, std::ios_base::beg), -1); + ASSERT_EQ(fdb.pubseekoff(0, std::ios_base::cur), -1); + ASSERT_EQ(fdb.pubseekoff(0, std::ios_base::end), -1); + + ASSERT_EQ(fdb.sputn(test_data_.c_str(), test_data_.size()), (std::streamsize)test_data_.size()); + ASSERT_EQ(fdb.pubseekpos(0), -1); + ASSERT_EQ(fdb.pubseekpos(test_data_.size()), -1); + + ASSERT_EQ(fdb.in_avail(), 0); + ASSERT_EQ(fdb.snextc(), -1); + ASSERT_EQ(fdb.sbumpc(), -1); + ASSERT_EQ(fdb.sgetc(), -1); + ASSERT_EQ(fdb.sgetn(buf, sizeof(buf)), 0); + ASSERT_EQ(fdb.sputbackc('a'), -1); + ASSERT_EQ(fdb.sungetc(), -1); + + ASSERT_EQ(errMsg, ""); +} + +TEST_F(DownloadStreamtest, DownloadStreamHappyPath) { + // Document expected use case in ACTION/VERIFICATION blocks. + Aws::String errMsg; + DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + + d << test_data_; + ASSERT_TRUE(d.good()); + ASSERT_FALSE(d.eof()); + + d.close(); + ASSERT_FALSE(d.bad()); + ASSERT_FALSE(d.fail()); + ASSERT_TRUE(d.eof()); + + ASSERT_EQ(TestFileContents(), test_data_); + ASSERT_EQ(errMsg, ""); +} + +TEST_F(DownloadStreamtest, SupportedMethodsShouldSucceed) { + // Document expected use-cases for supported methods in ACTION/VERIFICATION blocks. + Aws::String errMsg; + DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + + // These characters use the internal buffer (testing flush), for strings xsputn is called. + for (const char &c : test_data_) { + d.put(c); + } + ASSERT_TRUE(d.good()); + + d << test_data_; + ASSERT_TRUE(d.good()); + + d.write(test_data_.c_str(), test_data_.size()); + ASSERT_TRUE(d.good()); + + d.close(); + ASSERT_FALSE(d.fail()); + + // Call flush() after the file has been closed; to require that close() flushed the buffer. + d.flush(); + ASSERT_FALSE(d.fail()); + ASSERT_TRUE(d.eof()); + + ASSERT_EQ(TestFileContents(), test_data_ + test_data_ + test_data_); + ASSERT_EQ(errMsg, ""); +} + +TEST_F(DownloadStreamtest, DownloadStreamUnsupportedMethods) { + // Document unsupported methods via VERIFICATION (ASSERT) statements. + Aws::String errMsg; + DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + + d << test_data_; + ASSERT_TRUE(d.good()); + + ASSERT_EQ(d.get(), -1); + ASSERT_TRUE(d.fail() && !d.bad() && d.eof()); + + d.clear(); + ASSERT_EQ(d.peek(), -1); + ASSERT_FALSE(d.good()); + ASSERT_TRUE(d.eof()); + + d.clear(); + ASSERT_EQ(d.tellg(), -1); + ASSERT_TRUE(d.good()); + + d.seekg(0); + ASSERT_TRUE(d.fail() && !d.bad() && !d.eof()); + + d.clear(); + ASSERT_EQ(d.sync(), 0); + ASSERT_TRUE(d.good()); + ASSERT_TRUE(d.flush().good()); + + // Ensure the data is written out despite the failures in between: + d.close(); + ASSERT_EQ(TestFileContents(), test_data_); + + // It is now too late to flush any data: + ASSERT_TRUE(d.eof()); + ASSERT_EQ(d.sync(), -1); + ASSERT_TRUE(d.fail() && !d.bad() && d.eof()); + + // Flush proceeds without error, since the put buffer was empty: + d.clear(); + ASSERT_TRUE(d.flush().good()); + + ASSERT_EQ(errMsg, ""); + ASSERT_EQ(UnlinkTestFile(), 0); + + // Try flush again with non-empty put buffer: + DownloadStream d2{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + for (const char &c : test_data_) { + d2.put(c); + } + ASSERT_TRUE(d2.good()); + + d2.close(); + ASSERT_TRUE(!d2.fail() && !d2.bad() && d2.eof()); + + // The following is only false at eof since we empty the put-buffer in close(): + ASSERT_FALSE(d2.flush().bad()); + + ASSERT_EQ(TestFileContents(), test_data_); + ASSERT_EQ(errMsg, ""); +} + +TEST_F(DownloadStreamtest, EnsureOutputDoesNotExistIfStreamIsCorrupted) { + // When a stream is corrupted (badbit set), ensure that no output file is generated. + Aws::String errMsg; + DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + + d << test_data_; + ASSERT_TRUE(d.good()); + ASSERT_FALSE(d.eof()); + + // ACTION + d.setstate(std::ios::badbit); + d.close(); + + // VERIFICATION + ASSERT_EQ(d.rdstate(), std::ios::badbit | std::ios::eofbit | std::ios::failbit); + ASSERT_EQ(errMsg, "Stream is corrupt on close"); + ASSERT_EQ(::access(dst_.c_str(), F_OK), -1); +} + +TEST_F(DownloadStreamtest, PermissionsError) { + Aws::String errMsg; + DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; + + // ACTION + + // Change the directory permissions so that renaming the file will fail: + ASSERT_EQ(::chmod(ParentPath(dst_).c_str(), 0), 0); + + d << test_data_; + ASSERT_TRUE(d.good()); + + d.close(); + ASSERT_TRUE(d.fail() && d.bad() && d.eof()); + + // VERIFICATION + EXPECT_TRUE(StringEndsWith(errMsg, "Permission denied")); +} + +} // namespace +} // namespace TransferCrt +} // namespace Aws diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp b/tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp new file mode 100644 index 00000000000..77734e8b295 --- /dev/null +++ b/tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp @@ -0,0 +1,29 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include + +int main(int argc, char** argv) +{ + Aws::SDKOptions options; + options.loggingOptions.logLevel = Aws::Utils::Logging::LogLevel::Trace; + + AWS_BEGIN_MEMORY_TEST_EX(options, 1024, 128); + Aws::Testing::InitPlatformTest(options); + Aws::Testing::ParseArgs(argc, argv); + + Aws::InitAPI(options); + ::testing::InitGoogleTest(&argc, argv); + int exitCode = RUN_ALL_TESTS(); + Aws::ShutdownAPI(options); + + AWS_END_MEMORY_TEST_EX; + Aws::Testing::ShutdownPlatformTest(options); + return exitCode; +} From 83e6488d3cadddbbcb853858b93eef0ab3833107 Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Mon, 6 Mar 2023 13:15:16 -0500 Subject: [PATCH 4/9] Add DownloadStream tests --- .../source/transfer-crt/DownloadStream.cpp | 1 - .../DownloadStreamTests.cpp | 29 ++++++++++--------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp index cc245d23610..0a56c4c459b 100644 --- a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp @@ -86,7 +86,6 @@ DownloadStream::DownloadStream(const Aws::String &dstPath, ErrorCallback ec, boo // Produce unique temporary-file suffix. Use O_SYNC to ensure data gets written out to disk. fd_ = ::mkostemp(&dstTempPath_[0], sync_always ? O_SYNC : 0); - std::cerr << "fd " << fd_ << " " << dstTempPath_ <<" " << strerror(errno)<< "\n"; if (fd_ < 0) { ss << "Failed to create " << dstTempPath_ << ": " << ::strerror(errno); _error(ss.str()); diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp b/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp index ebcc2be0a71..d9dab67d7a3 100644 --- a/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp +++ b/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp @@ -61,15 +61,27 @@ bool StringEndsWith(const Aws::String& str, const Aws::String& suffix) { // Test fixture to help set up / tear down DownloadStream test cases. class DownloadStreamtest : public ::testing::Test { public: + static void SetUpTestCase() { + Aws::FileSystem::DeepDeleteDirectory(GetTestFilesDirectory().c_str()); + } + static void TearDownTestCase() { + Aws::FileSystem::DeepDeleteDirectory(GetTestFilesDirectory().c_str()); + } + DownloadStreamtest() : dst_{GetTestFilesDirectory() + "/test.file"} {} + ~DownloadStreamtest() { (void)UnlinkTestFile(); } + // Open up a file descriptor to @dst_, creating any missing directory components. int TestFile() { const Aws::String parent_path = ParentPath(dst_); Aws::FileSystem::CreateDirectoryIfNotExists(parent_path.c_str(), true); - std::cerr << dst_ << " " << parent_path << "\n"; // XXX return ::open(dst_.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0600); } + // Remove @dst_ from the filesystem. + int UnlinkTestFile() { return ::unlink(dst_.c_str()); } + + // Return the contents of @dst_. Aws::String TestFileContents() { Aws::StringStream ss; std::ifstream is{dst_}; @@ -78,8 +90,6 @@ class DownloadStreamtest : public ::testing::Test { return ss.str(); } - int UnlinkTestFile() { return ::unlink(dst_.c_str()); } - static Aws::String GetTestFilesDirectory() { Aws::String directory; #ifdef __ANDROID__ @@ -92,14 +102,6 @@ class DownloadStreamtest : public ::testing::Test { return directory; } - static void SetUpTestCase() { - Aws::FileSystem::DeepDeleteDirectory(GetTestFilesDirectory().c_str()); - } - - static void TearDownTestCase() { - Aws::FileSystem::DeepDeleteDirectory(GetTestFilesDirectory().c_str()); - } - protected: Aws::String dst_; Aws::String test_data_{"the quick brown fox jumps over the lazy lackadaisical lapdog"}; @@ -121,7 +123,6 @@ TEST_F(DownloadStreamtest, FdSupportedMethods) { FileDescriptorBuf fdb(fd, [&errMsg](Aws::String e) { errMsg = std::move(e); }); char alternateBuf[128] = {0}; - std::cerr << "ERROR: "<< errMsg << "\n";; // XXX ASSERT_GT(fd, 2); ASSERT_EQ(fdb.pubsync(), 0); ASSERT_EQ(fdb.sputc('a'), 'a'); @@ -301,7 +302,6 @@ TEST_F(DownloadStreamtest, PermissionsError) { DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }}; // ACTION - // Change the directory permissions so that renaming the file will fail: ASSERT_EQ(::chmod(ParentPath(dst_).c_str(), 0), 0); @@ -313,6 +313,9 @@ TEST_F(DownloadStreamtest, PermissionsError) { // VERIFICATION EXPECT_TRUE(StringEndsWith(errMsg, "Permission denied")); + + // CLEAN-UP (need to restore directory permissions to enable deletion). + ASSERT_EQ(::chmod(ParentPath(dst_).c_str(), 755), 0); } } // namespace From 61920089664534c99f2d41bf1d0c2935a3a6319b Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Mon, 6 Mar 2023 16:48:06 -0500 Subject: [PATCH 5/9] Add TransferHandleTests --- .../TransferHandleTests.cpp | 295 ++++++++++++++++++ 1 file changed, 295 insertions(+) create mode 100644 tests/aws-cpp-sdk-transfer-crt-tests/TransferHandleTests.cpp diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/TransferHandleTests.cpp b/tests/aws-cpp-sdk-transfer-crt-tests/TransferHandleTests.cpp new file mode 100644 index 00000000000..fb268dfaf4b --- /dev/null +++ b/tests/aws-cpp-sdk-transfer-crt-tests/TransferHandleTests.cpp @@ -0,0 +1,295 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include + +#include + +namespace Aws { +namespace TransferCrt { +namespace { +/* + * TransferHandle Tests. + */ +TEST(TransferHandleTest, DefaultValues) { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + ASSERT_TRUE(th.ShouldContinue()); + ASSERT_FALSE(th.BytesTotalSizeHasBeenSet()); + ASSERT_EQ(th.GetBytesTransferred(), 0u); + ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + ASSERT_EQ(th.GetLastError().GetErrorType(), S3Crt::S3CrtErrors::UNKNOWN); + ASSERT_EQ(th.GetLastError().GetMessage(), ""); + ASSERT_EQ(th.GetLastError().GetExceptionName(), ""); + ASSERT_TRUE(th.GetLastError().ShouldRetry()); +} + +TEST(TransferHandleTest, ReadMetadata) { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + th.SetBytesTotalSize(42); + + // VERIFICATION + EXPECT_EQ(th.GetReadMetadata().size, 42u); + + // Validate default values: + EXPECT_NE(th.GetReadMetadata().content_encoding, "gzip"); + EXPECT_EQ(th.GetReadMetadata().content_type, ""); + EXPECT_EQ(th.GetReadMetadata().metadata.size(), 0u); +} + +TEST(TransferHandleTest, StateMachine) { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + // Initial state. + ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + + // NOT_STARTED => IN_PROGRESS + th.UpdateStatus(TransferStatus::IN_PROGRESS); + ASSERT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS); + + // IN_PROGRESS => NOT_STARTED + th.UpdateStatus(TransferStatus::NOT_STARTED); + ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + + // NOT_STARTED => COMPLETED + th.UpdateStatus(TransferStatus::COMPLETED); + ASSERT_EQ(th.GetStatus(), TransferStatus::COMPLETED); +} + +TEST(TransferHandleTest, FailingATransfer) { + // Mimic a TransferHandle on which FailWithError is called shortly after construction. + TransferHandle th{"s3://some.bucket/some.path", {}}; + + // PRECONDITION + ASSERT_TRUE(th.ShouldContinue()); + + // ACTION + th.UpdateStatus(TransferStatus::FAILED); + th.SetError({S3Crt::S3CrtErrors::UNKNOWN, "FATAL ERROR", "Something went wrong", false}); + + // VERIFICATION + EXPECT_FALSE(th.ShouldContinue()); + EXPECT_EQ(th.GetStatus(), TransferStatus::FAILED); + EXPECT_EQ(th.GetLastError().GetErrorType(), S3Crt::S3CrtErrors::UNKNOWN); + EXPECT_EQ(th.GetLastError().GetMessage(), "Something went wrong"); + EXPECT_EQ(th.GetLastError().GetExceptionName(), "FATAL ERROR"); +} + +TEST(TransferHandleTest, UpdateStatusIsIdempotent) { + // NOT_STARTED is the initial state. Can not set it again. + { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + th.UpdateStatus(TransferStatus::NOT_STARTED); + EXPECT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + } + + // Can update to IN_PROGRESS at most once. + { + TransferHandle th{"s3://some.bucket/some.path", {}}; + th.UpdateStatus(TransferStatus::NOT_STARTED); + th.UpdateStatus(TransferStatus::IN_PROGRESS); + th.UpdateStatus(TransferStatus::IN_PROGRESS); + EXPECT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS); + } + + // May transition from IN_PROGRESS back to NOT_STARTED (not currently used by the code). + { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + ASSERT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + th.UpdateStatus(TransferStatus::IN_PROGRESS); + EXPECT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS); + + th.UpdateStatus(TransferStatus::NOT_STARTED); + th.UpdateStatus(TransferStatus::NOT_STARTED); + EXPECT_EQ(th.GetStatus(), TransferStatus::NOT_STARTED); + + th.UpdateStatus(TransferStatus::IN_PROGRESS); + th.UpdateStatus(TransferStatus::IN_PROGRESS); + EXPECT_EQ(th.GetStatus(), TransferStatus::IN_PROGRESS); + } + + // Once a final state is reached, no more state transitions are possible. + { + for (auto &&finalState : { + TransferStatus::CANCELED, + TransferStatus::FAILED, + TransferStatus::COMPLETED, + }) { + TransferHandle th{"s3://some.bucket/some.path", {}}; + + th.UpdateStatus(finalState); + + for (auto &&testState : { + TransferStatus::NOT_STARTED, + TransferStatus::IN_PROGRESS, + TransferStatus::FAILING, + TransferStatus::CANCELED, + TransferStatus::FAILED, + TransferStatus::COMPLETED, + }) { + th.UpdateStatus(testState); + EXPECT_EQ(th.GetStatus(), finalState); + } + } + } +} + +TEST(TransferHandleTest, Upload) { + // Validate handling of upload parameters. + WriteMetadata wmd{"s3://some.bucket/some.path"}; + TransferHandle th{wmd, {}}; + + // Need to initialize the ReadMetadata (size must be set in order to call GetReadMetadata()). + th.SetBytesTotalSize(42); + + // VERIFICATION + EXPECT_EQ(th.GetBucket(), "some.bucket"); + EXPECT_EQ(th.GetKey(), "some.path"); + EXPECT_EQ(th.GetReadMetadata().content_type, ""); + EXPECT_EQ(th.GetReadMetadata().content_encoding, ""); +} + +TEST(TransferHandleTest, UploadMetadata) { + // Validate encoding of non-ASCII metadata. + WriteMetadata wmd{"s3://some.bucket/some.path"}; + wmd.metadata = {{"src", "Âûröræ"}, {"dst", "ÄMÄZÕÑ S3"}, {"purpose", "upload"}}; + TransferHandle th{wmd, {}}; + + // Need to initialize the ReadMetadata (see above). + th.SetBytesTotalSize(0); + + // VERIFICATION + ASSERT_EQ(th.GetReadMetadata().metadata.size(), wmd.metadata.size()); + EXPECT_EQ(th.GetReadMetadata().metadata, wmd.metadata); +} + +TEST(TransferHandleTest, UploadGzipped) { + // Ensure compressed (gzip) upload is handled as expected. + WriteMetadata wmd{"s3://", "text/plain", "gzip"}; + TransferHandle th{wmd, {}}; + + th.SetBytesTotalSize(0); // Needed to initialize ReadMetadata. + + ASSERT_EQ(th.GetReadMetadata().content_type, "text/plain"); + ASSERT_EQ(th.GetReadMetadata().content_encoding, "gzip"); +} + +/* + * Parameterized test to check combinations of (terminal) TransferStatus states. + */ +class DownloadHandleFixture : public ::testing::TestWithParam { + protected: + TransferHandle th{"s3://some.bucket/some.path", {}}; +}; + +TEST_P(DownloadHandleFixture, TerminalStatesMustNotBeChanged) { + const TransferStatus cs = GetParam(); + + th.UpdateStatus(cs); + ASSERT_EQ(th.GetStatus(), cs); + + for (const auto &ts : {TransferStatus::NOT_STARTED, + TransferStatus::IN_PROGRESS, + TransferStatus::CANCELED, + TransferStatus::FAILED, + TransferStatus::COMPLETED}) { + th.UpdateStatus(ts); + ASSERT_EQ(th.GetStatus(), cs); + if (ts != cs) { + ASSERT_NE(th.GetStatus(), ts); + } + } +} + +// Test the combinations of terminal states. +INSTANTIATE_TEST_SUITE_P(TerminalStatesTests, + DownloadHandleFixture, + ::testing::Values(TransferStatus::CANCELED, + TransferStatus::FAILED, + TransferStatus::COMPLETED)); + +/* + * Limited TransferManager tests (full tests require network). + */ +class TransferManagerTest : public ::testing::Test {}; + +TEST_F(TransferManagerTest, AsyncCallerContext) { + auto ctx = std::make_shared(); + // Pre-condition + EXPECT_EQ(ctx.use_count(), 1); + { + // Context is passed as const std:shared_ptr<>&, but copy-constructed internally. + // Verify that the use-count behaves as expected. + TransferHandle th{"s3://some.bucket/some.path", ctx}; + EXPECT_EQ(ctx.use_count(), 2); + EXPECT_EQ(th.GetContext().use_count(), 3); + EXPECT_EQ(ctx.use_count(), 2); + } + EXPECT_EQ(ctx.use_count(), 1); +} + +TEST_F(TransferManagerTest, DownloadPaths) { + // Ensure that an initialization failure is caught and handled. + auto mgr = TransferManager::Create(nullptr, {}, {}, {}); + + // Attempt to create a file below an invalid path. + { + auto th = mgr->DownloadFile("s3://", "/a.path **that does not exist**!"); + + EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED); + EXPECT_FALSE(th->ShouldContinue()); + EXPECT_EQ(th->GetLastError().GetExceptionName(), "DownloadStream Failure"); + } +} + +TEST_F(TransferManagerTest, UploadPaths) { + WriteMetadata s3_uri{"s3://some.bucket/some.path"}; + + // Open a non-existing file. + { + auto mgr = TransferManager::Create(nullptr, {}, {}, {}); + auto th = mgr->UploadFile("/.no::such^path!", {}, s3_uri); + + EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED); + EXPECT_FALSE(th->ShouldContinue()); + EXPECT_EQ(th->GetLastError().GetExceptionName(), "FATAL ERROR"); + } + + // Pass a bad (closed) input stream. + { + auto mgr = TransferManager::Create(nullptr, {}, {}, {}); + auto is = std::make_shared("/bin/ls"); + is->close(); + + auto th = mgr->UploadFile("", is, s3_uri); + EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED); + EXPECT_FALSE(th->ShouldContinue()); + EXPECT_EQ(th->GetLastError().GetMessage(), "Failed to open stream '': Permission denied"); + EXPECT_EQ(th->GetLastError().GetExceptionName(), "FATAL ERROR"); + } + + // Pass a non-seekable input stream. + { + auto mgr = TransferManager::Create(nullptr, {}, {}, {}); + auto is = std::make_shared(std::cout.rdbuf()); + auto th = mgr->UploadFile("", is, s3_uri); + + EXPECT_EQ(th->GetStatus(), TransferStatus::FAILED); + EXPECT_FALSE(th->ShouldContinue()); + EXPECT_EQ(th->GetLastError().GetMessage(), "Failed to determine size of '': Illegal seek"); + EXPECT_EQ(th->GetLastError().GetExceptionName(), "FATAL ERROR"); + } +} + +} // namespace +} // namespace TransferCrt +} // namespace Aws From dac5964726e56b88da7497c1cb470690692013c8 Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Mon, 6 Mar 2023 16:55:59 -0500 Subject: [PATCH 6/9] Update comments --- .../source/transfer-crt/TransferManager.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp index c039bd32fe8..db9141be9f6 100644 --- a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferManager.cpp @@ -85,7 +85,7 @@ void TransferManager::PutObject(const std::shared_ptr &streamToPu .WithKey(handle->GetKey()) .WithContentLength(blobMetadata.size); - // Grant the bucket owner full control (see AV-48995). + // Grant the bucket owner full control. putObjectRequest.SetACL(Aws::S3Crt::Model::ObjectCannedACL::bucket_owner_full_control); putObjectRequest.SetContentType(blobMetadata.content_type); if (!blobMetadata.content_encoding.empty()) { @@ -219,7 +219,7 @@ void TransferManager::GetObject(const std::shared_ptr &context, Aws::Http::HttpResponse *res, long long /*NOLINT*/ amount) { // - // Set the total size after the first chunk has been processed (AV-120158, AV-171186). + // Set the total size after the first chunk has been processed. // // The aws-c-s3 code populates the Content-Length header of the HttpResponse after the // first part of the ranged-Get has completed. Extract total-size information from this. @@ -279,7 +279,7 @@ void TransferManager::HandleGetObjectResponse( if (outcome.IsSuccess()) { // At this stage, the total size should have been filled in. This is done by the // aws-c-s3 code, passing the Content-Length header via the headers_callback to - // the HttpResponse (see GetObject() above and AV-120158). + // the HttpResponse (see GetObject() above). if (!handle->BytesTotalSizeHasBeenSet()) { // In the special case of an empty file, the aws-c-s3 code does not use the // body_callback and so the DataReceivedEventHandler is also not called. @@ -370,7 +370,7 @@ void TransferManager::OnStatusChanged(const std::shared_ptr &han void TransferManager::OnUploadProgress(const std::shared_ptr &handle) { // The progress callbacks are only called while the transfer is in progress. - // If another transfer has failed in the meantime, cancel this and all other ones (AV-171186). + // If another transfer has failed in the meantime, cancel this and all other ones. if (cancel_on_first_failure_ && failure_occurred_) { handle->Cancel(); } From 966cddb41b67e8d1a19b5e2a76b4cfee66b7c9fa Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Mon, 6 Mar 2023 16:59:25 -0500 Subject: [PATCH 7/9] Use const-ref --- .../include/aws/transfer-crt/Metadata.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h index 468f31ec35c..9aa2851c3f7 100644 --- a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h @@ -20,12 +20,12 @@ namespace TransferCrt { // @metadata: metadata key/value pairs. struct WriteMetadata { // Constructor for the default case - just create a blob at @uri. - explicit WriteMetadata(Aws::String uri) : WriteMetadata(uri, "", "") {} + explicit WriteMetadata(const Aws::String &uri) : WriteMetadata(uri, "", "") {} - WriteMetadata(Aws::String uri, - Aws::String content_type, - Aws::String content_encoding, - Aws::Map metadata = {}) + WriteMetadata(const Aws::String &uri, + const Aws::String &content_type, + const Aws::String &content_encoding, + const Aws::Map &metadata = {}) : uri{uri}, content_type{content_type}, content_encoding{content_encoding}, From 49e6a1ea17029b2cdee2e94385b9d913e67d5aac Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Mon, 6 Mar 2023 17:03:04 -0500 Subject: [PATCH 8/9] Trim whitespace --- tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp b/tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp index 77734e8b295..d8c76cc8bcc 100644 --- a/tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp +++ b/tests/aws-cpp-sdk-transfer-crt-tests/RunTests.cpp @@ -20,7 +20,7 @@ int main(int argc, char** argv) Aws::InitAPI(options); ::testing::InitGoogleTest(&argc, argv); - int exitCode = RUN_ALL_TESTS(); + int exitCode = RUN_ALL_TESTS(); Aws::ShutdownAPI(options); AWS_END_MEMORY_TEST_EX; From 28eaf89ca8ede0768625b9c9e5c5f9606bb44c91 Mon Sep 17 00:00:00 2001 From: Gerrit Renker Date: Tue, 7 Mar 2023 10:45:15 -0500 Subject: [PATCH 9/9] Final touches --- src/aws-cpp-sdk-transfer-crt/CMakeLists.txt | 2 +- .../include/aws/transfer-crt/Metadata.h | 1 + .../include/aws/transfer-crt/TransferManager.h | 2 +- .../source/transfer-crt/DownloadStream.cpp | 2 +- .../source/transfer-crt/TransferHandle.cpp | 4 ++-- tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp | 2 +- 6 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/aws-cpp-sdk-transfer-crt/CMakeLists.txt b/src/aws-cpp-sdk-transfer-crt/CMakeLists.txt index 3a1c7802fd2..25fa9313aa1 100644 --- a/src/aws-cpp-sdk-transfer-crt/CMakeLists.txt +++ b/src/aws-cpp-sdk-transfer-crt/CMakeLists.txt @@ -48,6 +48,6 @@ set_compiler_warnings(${PROJECT_NAME}) setup_install() -install (FILES ${ALL_TRANSFER_HEADERS} DESTINATION ${INCLUDE_DIRECTORY}/aws/transfer) +install (FILES ${ALL_TRANSFER_HEADERS} DESTINATION ${INCLUDE_DIRECTORY}/aws/transfer-crt) do_packaging() diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h index 9aa2851c3f7..c7821a75047 100644 --- a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/Metadata.h @@ -18,6 +18,7 @@ namespace TransferCrt { // @content_type: MIME type of the @uri content. // @content_encoding: content encoding that was applied. // @metadata: metadata key/value pairs. +// @tags: S3 object storage tagging key/value pairs. struct WriteMetadata { // Constructor for the default case - just create a blob at @uri. explicit WriteMetadata(const Aws::String &uri) : WriteMetadata(uri, "", "") {} diff --git a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h index d31cf7ebca4..520810d970d 100644 --- a/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h +++ b/src/aws-cpp-sdk-transfer-crt/include/aws/transfer-crt/TransferManager.h @@ -41,7 +41,7 @@ struct DownloadContext final : Aws::Client::AsyncCallerContext { std::streambuf *dstStreamBuf = nullptr; }; -// Minimal re-implementation of the S3 TransferManager for the S3CrtClient. +// S3 TransferManager for the S3CrtClient. // // All public methods are non-blocking and return a pointer to an asynchronous TransferHandle. class TransferManager final : public std::enable_shared_from_this { diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp index 0a56c4c459b..cc066ee7008 100644 --- a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/DownloadStream.cpp @@ -79,7 +79,7 @@ DownloadStream::DownloadStream(const Aws::String &dstPath, ErrorCallback ec, boo // Generate any missing directory components. if (!parent_path.empty() && !Aws::FileSystem::CreateDirectoryIfNotExists(parent_path.c_str(), true)) { - ss << "Failed to create " << dstPath << " parent directories."; + ss << "Failed to create " << dstPath << " parent directories"; _error(ss.str()); return; } diff --git a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp index 72aede39ac4..d18d62e0328 100644 --- a/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp +++ b/src/aws-cpp-sdk-transfer-crt/source/transfer-crt/TransferHandle.cpp @@ -13,7 +13,7 @@ namespace TransferCrt { namespace { using KeyValue = std::pair; - +// Encode @key_values as query-parameter string. Aws::String encode_query_string(const Aws::Map &key_values) { return std::accumulate(key_values.begin(), key_values.end(), @@ -37,7 +37,7 @@ TransferHandle::TransferHandle(const WriteMetadata &md, TransferHandle::TransferHandle(const Aws::String &srcUri, const std::shared_ptr &ctx) : m_context{ctx}, rmd_{} { - rmd_.uri = srcUri; + rmd_.uri = srcUri; } Aws::String TransferHandle::GetBucket() const { diff --git a/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp b/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp index d9dab67d7a3..3e0ac411e11 100644 --- a/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp +++ b/tests/aws-cpp-sdk-transfer-crt-tests/DownloadStreamTests.cpp @@ -171,7 +171,7 @@ TEST_F(DownloadStreamtest, FdUnsupportedMethods) { } TEST_F(DownloadStreamtest, DownloadStreamHappyPath) { - // Document expected use case in ACTION/VERIFICATION blocks. + // Document expected use cases in ACTION/VERIFICATION blocks. Aws::String errMsg; DownloadStream d{dst_, [&errMsg](Aws::String e) { errMsg = std::move(e); }};