From cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 7 Mar 2025 12:34:50 +0800 Subject: [PATCH] Adapt to latest Asio APIs (Asio 1.32 or Boost.Asio 1.87) --- .github/workflows/ci-pr-validation.yaml | 5 + CMakeLists.txt | 7 +- lib/AckGroupingTrackerEnabled.cc | 5 +- lib/ClientConnection.cc | 305 +++++++++++------------- lib/ClientConnection.h | 17 +- lib/ConsumerImpl.cc | 9 +- lib/ConsumerImplBase.cc | 2 +- lib/ExecutorService.cc | 39 +-- lib/ExecutorService.h | 12 +- lib/HandlerBase.cc | 15 +- lib/MultiTopicsConsumerImpl.cc | 5 +- lib/NegativeAcksTracker.cc | 5 +- lib/PartitionedProducerImpl.cc | 5 +- lib/PatternMultiTopicsConsumerImpl.cc | 9 +- lib/PeriodicTask.cc | 7 +- lib/ProducerImpl.cc | 9 +- lib/RetryableOperation.h | 7 +- lib/RetryableOperationCache.h | 1 - lib/SharedBuffer.h | 6 +- lib/UnAckedMessageTrackerEnabled.cc | 5 +- lib/stats/ConsumerStatsImpl.cc | 2 +- lib/stats/ConsumerStatsImpl.h | 5 +- lib/stats/ProducerStatsImpl.cc | 2 +- tests/AuthPluginTest.cc | 7 +- tests/ConsumerTest.h | 4 +- vcpkg | 2 +- vcpkg.json | 18 +- 27 files changed, 237 insertions(+), 278 deletions(-) diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index 5cb18df9..8d165d68 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -115,6 +115,11 @@ jobs: - name: Run unit tests run: RETRY_FAILED=3 ./run-unit-tests.sh + - name: Build with Boost.Asio + run: | + cmake -B build-boost-asio -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON + cmake --build build-boost-asio -j8 + - name: Build perf tools run: | cmake . -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=ON -DBUILD_PERF_TOOLS=ON diff --git a/CMakeLists.txt b/CMakeLists.txt index b0046534..2efeec89 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,15 +19,16 @@ cmake_minimum_required(VERSION 3.13) -option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) - option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF) if (INTEGRATE_VCPKG) - set(USE_ASIO ON) + option(USE_ASIO "Use Asio instead of Boost.Asio" ON) if (NOT CMAKE_TOOLCHAIN_FILE) set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake") endif () +else () + option(USE_ASIO "Use Asio instead of Boost.Asio" OFF) endif () +message(STATUS "USE_ASIO: ${USE_ASIO}") option(BUILD_TESTS "Build tests" ON) message(STATUS "BUILD_TESTS: " ${BUILD_TESTS}) diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc index 7233b2c9..bc8da970 100644 --- a/lib/AckGroupingTrackerEnabled.cc +++ b/lib/AckGroupingTrackerEnabled.cc @@ -117,8 +117,7 @@ void AckGroupingTrackerEnabled::close() { this->flush(); std::lock_guard lock(this->mutexTimer_); if (this->timer_) { - ASIO_ERROR ec; - this->timer_->cancel(ec); + this->timer_->cancel(); } } @@ -168,7 +167,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() { std::lock_guard lock(this->mutexTimer_); this->timer_ = this->executor_->createDeadlineTimer(); - this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); + this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_))); auto self = shared_from_this(); this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void { if (!ec) { diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 2037722f..de226a85 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -266,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) { LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port()); std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host(); - tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost)); + tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost)); } LOG_DEBUG("TLS SNI Host: " << serviceUrl.host()); @@ -309,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC // Only send keep-alive probes if the broker supports it keepAliveTimer_ = executor_->createDeadlineTimer(); if (keepAliveTimer_) { - keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_)); + keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_)); auto weakSelf = weak_from_this(); keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { auto self = weakSelf.lock(); @@ -354,7 +354,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector consumerSta // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero // Check if we have a timer still before we set the request timer to pop again. if (consumerStatsRequestTimer_) { - consumerStatsRequestTimer_->expires_from_now(operationsTimeout_); + consumerStatsRequestTimer_->expires_after(operationsTimeout_); auto weakSelf = weak_from_this(); consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) { auto self = weakSelf.lock(); @@ -388,129 +388,87 @@ typedef ASIO::detail::socket_option::integer tcp_kee typedef ASIO::detail::socket_option::integer tcp_keep_alive_idle; #endif -/* - * TCP Connect handler - * - * if async_connect without any error, connected_ would be set to true - * at this point the connection is deemed valid to be used by clients of this class - */ -void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { - if (!err) { - std::stringstream cnxStringStream; - try { - cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() - << "] "; - cnxString_ = cnxStringStream.str(); - } catch (const ASIO_SYSTEM_ERROR& e) { - LOG_ERROR("Failed to get endpoint: " << e.what()); - close(ResultRetryable); - return; - } - if (logicalAddress_ == physicalAddress_) { - LOG_INFO(cnxString_ << "Connected to broker"); - } else { - LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_ - << ", proxy: " << proxyServiceUrl_ - << ", physical address:" << physicalAddress_); - } +void ClientConnection::completeConnect(ASIO::ip::tcp::endpoint endpoint) { + std::stringstream cnxStringStream; + try { + cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] "; + cnxString_ = cnxStringStream.str(); + } catch (const ASIO_SYSTEM_ERROR& e) { + LOG_ERROR("Failed to get endpoint: " << e.what()); + close(ResultRetryable); + return; + } + if (logicalAddress_ == physicalAddress_) { + LOG_INFO(cnxString_ << "Connected to broker"); + } else { + LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_ + << ", proxy: " << proxyServiceUrl_ << ", physical address:" << physicalAddress_); + } - Lock lock(mutex_); - if (isClosed()) { - LOG_INFO(cnxString_ << "Connection already closed"); - return; - } - state_ = TcpConnected; - lock.unlock(); + Lock lock(mutex_); + if (isClosed()) { + LOG_INFO(cnxString_ << "Connection already closed"); + return; + } + state_ = TcpConnected; + lock.unlock(); - ASIO_ERROR error; - socket_->set_option(tcp::no_delay(true), error); - if (error) { - LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message()); - } + ASIO_ERROR error; + socket_->set_option(tcp::no_delay(true), error); + if (error) { + LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message()); + } - socket_->set_option(tcp::socket::keep_alive(true), error); - if (error) { - LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message()); - } + socket_->set_option(tcp::socket::keep_alive(true), error); + if (error) { + LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message()); + } - // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this - // should never happen, given that we're sending our own keep-alive probes (within the TCP - // connection) every 30 seconds - socket_->set_option(tcp_keep_alive_idle(1 * 60), error); - if (error) { - LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message()); - } + // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this + // should never happen, given that we're sending our own keep-alive probes (within the TCP + // connection) every 30 seconds + socket_->set_option(tcp_keep_alive_idle(1 * 60), error); + if (error) { + LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message()); + } - // Send up to 10 probes before declaring the connection broken - socket_->set_option(tcp_keep_alive_count(10), error); - if (error) { - LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message()); - } + // Send up to 10 probes before declaring the connection broken + socket_->set_option(tcp_keep_alive_count(10), error); + if (error) { + LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message()); + } - // Interval between probes: 6 seconds - socket_->set_option(tcp_keep_alive_interval(6), error); - if (error) { - LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message()); - } + // Interval between probes: 6 seconds + socket_->set_option(tcp_keep_alive_interval(6), error); + if (error) { + LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message()); + } - if (tlsSocket_) { - if (!isTlsAllowInsecureConnection_) { - ASIO_ERROR err; - Url service_url; - if (!Url::parse(physicalAddress_, service_url)) { - LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); - close(); - return; - } - } - auto weakSelf = weak_from_this(); - auto socket = socket_; - auto tlsSocket = tlsSocket_; - // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation - // fault might happen - auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) { - auto self = weakSelf.lock(); - if (self) { - self->handleHandshake(err); - } - }; - tlsSocket_->async_handshake(ASIO::ssl::stream::client, - ASIO::bind_executor(strand_, callback)); - } else { - handleHandshake(ASIO_SUCCESS); - } - } else if (endpointIterator != tcp::resolver::iterator()) { - LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message()); - // The connection failed. Try the next endpoint in the list. - ASIO_ERROR closeError; - socket_->close(closeError); // ignore the error of close - if (closeError) { - LOG_WARN(cnxString_ << "Failed to close socket: " << err.message()); - } - connectTimeoutTask_->stop(); - ++endpointIterator; - if (endpointIterator != tcp::resolver::iterator()) { - LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); - connectTimeoutTask_->start(); - tcp::endpoint endpoint = *endpointIterator; - auto weakSelf = weak_from_this(); - socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) { - auto self = weakSelf.lock(); - if (self) { - self->handleTcpConnected(err, endpointIterator); - } - }); - } else { - if (err == ASIO::error::operation_aborted) { - // TCP connect timeout, which is not retryable + if (tlsSocket_) { + if (!isTlsAllowInsecureConnection_) { + ASIO_ERROR err; + Url service_url; + if (!Url::parse(physicalAddress_, service_url)) { + LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); close(); - } else { - close(ResultRetryable); + return; } } + auto weakSelf = weak_from_this(); + auto socket = socket_; + auto tlsSocket = tlsSocket_; + // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation + // fault might happen + auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) { + auto self = weakSelf.lock(); + if (self) { + self->handleHandshake(err); + } + }; + tlsSocket_->async_handshake(ASIO::ssl::stream::client, + ASIO::bind_executor(strand_, callback)); } else { - LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); - close(ResultRetryable); + handleHandshake(ASIO_SUCCESS); } } @@ -603,60 +561,71 @@ void ClientConnection::tcpConnectAsync() { } LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port()); - tcp::resolver::query query(service_url.host(), std::to_string(service_url.port())); + tcp::resolver::endpoint_type endpoint(ASIO::ip::make_address(service_url.host()), service_url.port()); auto weakSelf = weak_from_this(); - resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) { - auto self = weakSelf.lock(); - if (self) { - self->handleResolve(err, iterator); - } - }); + resolver_->async_resolve( + endpoint, [this, weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + if (err) { + std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; + LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); + close(); + return; + } + if (results.empty()) { + LOG_ERROR(cnxString_ << "No IP address found"); + close(); + return; + } + connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) { + ClientConnectionPtr ptr = weakSelf.lock(); + if (!ptr) { + // Connection was already destroyed + return; + } + + if (ptr->state_ != Ready) { + LOG_ERROR(ptr->cnxString_ << "Connection was not established in " + << ptr->connectTimeoutTask_->getPeriodMs() + << " ms, close the socket"); + PeriodicTask::ErrorCode err; + ptr->socket_->close(err); + if (err) { + LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message()); + } + } + ptr->connectTimeoutTask_->stop(); + }); + connectTimeoutTask_->start(); + std::vector endpoints; + for (const auto& result : results) { + endpoints.emplace_back(result.endpoint()); + } + asyncConnect(endpoints, 0); + }); } -void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) { - if (err) { - std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_; - LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message()); - close(); +void ClientConnection::asyncConnect(const std::vector& endpoints, size_t index) { + if (index >= endpoints.size()) { + close(ResultRetryable); return; } - auto weakSelf = weak_from_this(); - connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) { - ClientConnectionPtr ptr = weakSelf.lock(); - if (!ptr) { - // Connection was already destroyed + socket_->async_connect(endpoints[index], [this, weakSelf, endpoints, index](const ASIO_ERROR& err) { + auto self = weakSelf.lock(); + if (!self) { return; } - - if (ptr->state_ != Ready) { - LOG_ERROR(ptr->cnxString_ << "Connection was not established in " - << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket"); - PeriodicTask::ErrorCode err; - ptr->socket_->close(err); - if (err) { - LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message()); - } + if (err) { + LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message()); + asyncConnect(endpoints, index + 1); + return; } - ptr->connectTimeoutTask_->stop(); + completeConnect(endpoints[index]); }); - - LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "..."); - connectTimeoutTask_->start(); - if (endpointIterator != tcp::resolver::iterator()) { - LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() // - << " to " << endpointIterator->endpoint()); - socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) { - auto self = weakSelf.lock(); - if (self) { - self->handleTcpConnected(err, endpointIterator); - } - }); - } else { - LOG_WARN(cnxString_ << "No IP address found"); - close(); - return; - } } void ClientConnection::readNextCommand() { @@ -1058,7 +1027,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request LookupRequestData requestData; requestData.promise = promise; requestData.timer = executor_->createDeadlineTimer(); - requestData.timer->expires_from_now(operationsTimeout_); + requestData.timer->expires_after(operationsTimeout_); auto weakSelf = weak_from_this(); requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); @@ -1174,8 +1143,9 @@ void ClientConnection::sendPendingCommands() { PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args); - // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the - // callback is called, an invalid buffer range might be passed to the underlying socket send. + // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before + // the callback is called, an invalid buffer range might be passed to the underlying socket + // send. asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) { handleSendPair(err); })); @@ -1198,7 +1168,7 @@ Future ClientConnection::sendRequestWithId(SharedBuffer cm PendingRequestData requestData; requestData.timer = executor_->createDeadlineTimer(); - requestData.timer->expires_from_now(operationsTimeout_); + requestData.timer->expires_after(operationsTimeout_); auto weakSelf = weak_from_this(); requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); @@ -1251,7 +1221,7 @@ void ClientConnection::handleKeepAliveTimeout() { // be zero And we do not attempt to dereference the pointer. Lock lock(mutex_); if (keepAliveTimer_) { - keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_)); + keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_)); auto weakSelf = weak_from_this(); keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) { auto self = weakSelf.lock(); @@ -1430,7 +1400,7 @@ Future ClientConnection::newGetLastMessageId(u LastMessageIdRequestData requestData; requestData.promise = promise; requestData.timer = executor_->createDeadlineTimer(); - requestData.timer->expires_from_now(operationsTimeout_); + requestData.timer->expires_after(operationsTimeout_); auto weakSelf = weak_from_this(); requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); @@ -1478,7 +1448,7 @@ Future ClientConnection::newGetSchema(const std::string& top lock.unlock(); auto weakSelf = weak_from_this(); - timer->expires_from_now(operationsTimeout_); + timer->expires_after(operationsTimeout_); timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (!self) { @@ -2047,8 +2017,7 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) { auto it = pendingRequests_.find(requestId); if (it != pendingRequests_.end()) { it->second.promise.setFailed(ResultDisconnected); - ASIO_ERROR ec; - it->second.timer->cancel(ec); + it->second.timer->cancel(); pendingRequests_.erase(it); } } diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 7646f85e..14e07652 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -25,13 +25,13 @@ #include #ifdef USE_ASIO #include -#include +#include #include #include #include #else #include -#include +#include #include #include #include @@ -231,13 +231,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this& endpoints, size_t index); + void completeConnect(ASIO::ip::tcp::endpoint endpoint); void handleHandshake(const ASIO_ERROR& err); @@ -260,8 +255,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this strand_; + ASIO::strand strand_; const std::string logicalAddress_; /* diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 250845b3..cfdb0b2d 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -422,7 +422,7 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b } void ConsumerImpl::triggerCheckExpiredChunkedTimer() { - checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); + checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_)); std::weak_ptr weakSelf{shared_from_this()}; checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void { auto self = weakSelf.lock(); @@ -1668,7 +1668,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time } remainTime -= next; - timer->expires_from_now(next); + timer->expires_after(next); auto self = shared_from_this(); timer->async_wait([this, backoff, remainTime, timer, next, callback, @@ -1791,9 +1791,8 @@ std::shared_ptr ConsumerImpl::get_shared_this_ptr() { } void ConsumerImpl::cancelTimers() noexcept { - ASIO_ERROR ec; - batchReceiveTimer_->cancel(ec); - checkExpiredChunkedTimer_->cancel(ec); + batchReceiveTimer_->cancel(); + checkExpiredChunkedTimer_->cancel(); unAckedMessageTrackerPtr_->stop(); consumerStatsBasePtr_->stop(); } diff --git a/lib/ConsumerImplBase.cc b/lib/ConsumerImplBase.cc index 098f2d5b..76d99370 100644 --- a/lib/ConsumerImplBase.cc +++ b/lib/ConsumerImplBase.cc @@ -51,7 +51,7 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) { if (timeoutMs > 0) { - batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs)); + batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs)); std::weak_ptr weakSelf{shared_from_this()}; batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc index 794e3619..7f2a2c14 100644 --- a/lib/ExecutorService.cc +++ b/lib/ExecutorService.cc @@ -18,6 +18,12 @@ */ #include "ExecutorService.h" +#ifdef USE_ASIO +#include +#else +#include +#endif + #include "LogUtils.h" #include "TimeUtils.h" DECLARE_LOG_OBJECT() @@ -31,18 +37,13 @@ ExecutorService::~ExecutorService() { close(0); } void ExecutorService::start() { auto self = shared_from_this(); std::thread t{[this, self] { - LOG_DEBUG("Run io_service in a single thread"); - ASIO_ERROR ec; + LOG_DEBUG("Run io_context in a single thread"); while (!closed_) { - io_service_.restart(); - IOService::work work{getIOService()}; - io_service_.run(ec); - } - if (ec) { - LOG_ERROR("Failed to run io_service: " << ec.message()); - } else { - LOG_DEBUG("Event loop of ExecutorService exits successfully"); + io_context_.restart(); + auto work{ASIO::make_work_guard(io_context_)}; + io_context_.run(); } + LOG_DEBUG("Event loop of ExecutorService exits successfully"); { std::lock_guard lock{mutex_}; ioServiceDone_ = true; @@ -63,12 +64,12 @@ ExecutorServicePtr ExecutorService::create() { } /* - * factory method of ASIO::ip::tcp::socket associated with io_service_ instance + * factory method of ASIO::ip::tcp::socket associated with io_context_ instance * @ returns shared_ptr to this socket */ SocketPtr ExecutorService::createSocket() { try { - return SocketPtr(new ASIO::ip::tcp::socket(io_service_)); + return SocketPtr(new ASIO::ip::tcp::socket(io_context_)); } catch (const ASIO_SYSTEM_ERROR &e) { restart(); auto error = std::string("Failed to create socket: ") + e.what(); @@ -82,12 +83,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::cont } /* - * factory method of Resolver object associated with io_service_ instance + * factory method of Resolver object associated with io_context_ instance * @returns shraed_ptr to resolver object */ TcpResolverPtr ExecutorService::createTcpResolver() { try { - return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_)); + return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_)); } catch (const ASIO_SYSTEM_ERROR &e) { restart(); auto error = std::string("Failed to create resolver: ") + e.what(); @@ -97,7 +98,7 @@ TcpResolverPtr ExecutorService::createTcpResolver() { DeadlineTimerPtr ExecutorService::createDeadlineTimer() { try { - return DeadlineTimerPtr(new ASIO::steady_timer(io_service_)); + return DeadlineTimerPtr(new ASIO::steady_timer(io_context_)); } catch (const ASIO_SYSTEM_ERROR &e) { restart(); auto error = std::string("Failed to create steady_timer: ") + e.what(); @@ -105,7 +106,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() { } } -void ExecutorService::restart() { io_service_.stop(); } +void ExecutorService::restart() { io_context_.stop(); } void ExecutorService::close(long timeoutMs) { bool expectedState = false; @@ -113,12 +114,12 @@ void ExecutorService::close(long timeoutMs) { return; } if (timeoutMs == 0) { // non-blocking - io_service_.stop(); + io_context_.stop(); return; } std::unique_lock lock{mutex_}; - io_service_.stop(); + io_context_.stop(); if (timeoutMs > 0) { cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_; }); } else { // < 0 @@ -126,7 +127,7 @@ void ExecutorService::close(long timeoutMs) { } } -void ExecutorService::postWork(std::function task) { io_service_.post(task); } +void ExecutorService::postWork(std::function task) { ASIO::post(io_context_, task); } ///////////////////// diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h index 89d06d30..626cb203 100644 --- a/lib/ExecutorService.h +++ b/lib/ExecutorService.h @@ -23,11 +23,11 @@ #include #ifdef USE_ASIO -#include +#include #include #include #else -#include +#include #include #include #endif @@ -46,7 +46,7 @@ typedef std::shared_ptr > TlsSocketPt typedef std::shared_ptr TcpResolverPtr; class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this { public: - using IOService = ASIO::io_service; + using IOService = ASIO::io_context; using SharedPtr = std::shared_ptr; static SharedPtr create(); @@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_thiscancel(ignored); - creationTimer_->cancel(ignored); + timer_->cancel(); + creationTimer_->cancel(); } void HandlerBase::start() { @@ -61,15 +60,14 @@ void HandlerBase::start() { if (state_.compare_exchange_strong(state, Pending)) { grabCnx(); } - creationTimer_->expires_from_now(operationTimeut_); + creationTimer_->expires_after(operationTimeut_); std::weak_ptr weakSelf{shared_from_this()}; creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) { auto self = weakSelf.lock(); if (self && !error) { LOG_WARN("Cancel the pending reconnection due to the start timeout"); connectionFailed(ResultTimeout); - ASIO_ERROR ignored; - timer_->cancel(ignored); + timer_->cancel(); } }); } @@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const boost::optional& assignedBrokerUrl) connectionTimeMs_ = duration_cast(high_resolution_clock::now() - before).count(); // Prevent the creationTimer_ from cancelling the timer_ in future - ASIO_ERROR ignored; - creationTimer_->cancel(ignored); + creationTimer_->cancel(); LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms") } else if (isResultRetryable(result)) { scheduleReconnection(); @@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const boost::optional& assig TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next(); LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s"); - timer_->expires_from_now(delay); + timer_->expires_after(delay); // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled // so we will not run into the case where grabCnx is invoked on out of scope handler auto name = getName(); diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index dddade5c..61fbf7b8 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -962,7 +962,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() { return numberOfConnectedConsumer; } void MultiTopicsConsumerImpl::runPartitionUpdateTask() { - partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); + partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_); auto weakSelf = weak_from_this(); partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it @@ -1115,8 +1115,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { void MultiTopicsConsumerImpl::cancelTimers() noexcept { if (partitionsUpdateTimer_) { - ASIO_ERROR ec; - partitionsUpdateTimer_->cancel(ec); + partitionsUpdateTimer_->cancel(); } } diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index e443496d..e50b4ca2 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -50,7 +50,7 @@ void NegativeAcksTracker::scheduleTimer() { return; } std::weak_ptr weakSelf{shared_from_this()}; - timer_->expires_from_now(timerInterval_); + timer_->expires_after(timerInterval_); timer_->async_wait([weakSelf](const ASIO_ERROR &ec) { if (auto self = weakSelf.lock()) { self->handleTimer(ec); @@ -107,8 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) { void NegativeAcksTracker::close() { closed_ = true; - ASIO_ERROR ec; - timer_->cancel(ec); + timer_->cancel(); std::lock_guard lock(mutex_); nackedMessages_.clear(); } diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 4178096c..923c038b 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) { void PartitionedProducerImpl::runPartitionUpdateTask() { auto weakSelf = weak_from_this(); - partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); + partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_); partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { @@ -524,8 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() { void PartitionedProducerImpl::cancelTimers() noexcept { if (partitionsUpdateTimer_) { - ASIO_ERROR ec; - partitionsUpdateTimer_->cancel(ec); + partitionsUpdateTimer_->cancel(); } } diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc index 4fc7bb61..07d9a7bc 100644 --- a/lib/PatternMultiTopicsConsumerImpl.cc +++ b/lib/PatternMultiTopicsConsumerImpl.cc @@ -48,7 +48,7 @@ const PULSAR_REGEX_NAMESPACE::regex PatternMultiTopicsConsumerImpl::getPattern() void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() { autoDiscoveryRunning_ = false; - autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); + autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod())); auto weakSelf = weak_from_this(); autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { @@ -228,7 +228,7 @@ void PatternMultiTopicsConsumerImpl::start() { LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_."); if (conf_.getPatternAutoDiscoveryPeriod() > 0) { - autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod())); + autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod())); auto weakSelf = weak_from_this(); autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { if (auto self = weakSelf.lock()) { @@ -248,7 +248,4 @@ void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { MultiTopicsConsumerImpl::closeAsync(callback); } -void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { - ASIO_ERROR ec; - autoDiscoveryTimer_->cancel(ec); -} +void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { autoDiscoveryTimer_->cancel(); } diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc index 9fde012a..4b5f9621 100644 --- a/lib/PeriodicTask.cc +++ b/lib/PeriodicTask.cc @@ -29,7 +29,7 @@ void PeriodicTask::start() { state_ = Ready; if (periodMs_ >= 0) { std::weak_ptr weakSelf{shared_from_this()}; - timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); + timer_->expires_after(std::chrono::milliseconds(periodMs_)); timer_->async_wait([weakSelf](const ErrorCode& ec) { auto self = weakSelf.lock(); if (self) { @@ -44,8 +44,7 @@ void PeriodicTask::stop() noexcept { if (!state_.compare_exchange_strong(state, Closing)) { return; } - ErrorCode ec; - timer_->cancel(ec); + timer_->cancel(); state_ = Pending; } @@ -59,7 +58,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) { // state_ may be changed in handleTimeout, so we check state_ again if (state_ == Ready) { auto self = shared_from_this(); - timer_->expires_from_now(std::chrono::milliseconds(periodMs_)); + timer_->expires_after(std::chrono::milliseconds(periodMs_)); timer_->async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); }); } } diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 4399ce5f..8b112bf1 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -570,7 +570,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg); bool isFull = batchMessageContainer_->add(msg, callback); if (isFirstMessage) { - batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs())); + batchTimer_->expires_after(milliseconds(conf_.getBatchingMaxPublishDelayMs())); auto weakSelf = weak_from_this(); batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); @@ -1007,9 +1007,8 @@ void ProducerImpl::shutdown() { void ProducerImpl::cancelTimers() noexcept { dataKeyRefreshTask_.stop(); - ASIO_ERROR ec; - batchTimer_->cancel(ec); - sendTimer_->cancel(ec); + batchTimer_->cancel(); + sendTimer_->cancel(); } bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const { @@ -1030,7 +1029,7 @@ void ProducerImpl::startSendTimeoutTimer() { } void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) { - sendTimer_->expires_from_now(expiryTime); + sendTimer_->expires_after(expiryTime); auto weakSelf = weak_from_this(); sendTimer_->async_wait([weakSelf](const ASIO_ERROR& err) { diff --git a/lib/RetryableOperation.h b/lib/RetryableOperation.h index dba190f4..8a235d3a 100644 --- a/lib/RetryableOperation.h +++ b/lib/RetryableOperation.h @@ -26,8 +26,8 @@ #include #include +#include "AsioTimer.h" #include "Backoff.h" -#include "ExecutorService.h" #include "Future.h" #include "LogUtils.h" #include "ResultUtils.h" @@ -68,8 +68,7 @@ class RetryableOperation : public std::enable_shared_from_thiscancel(ec); + timer_->cancel(); } private: @@ -107,7 +106,7 @@ class RetryableOperation : public std::enable_shared_from_thisexpires_from_now(delay); + timer_->expires_after(delay); auto nextRemainingTime = remainingTime - delay; LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay) diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h index e42460dd..5030c94e 100644 --- a/lib/RetryableOperationCache.h +++ b/lib/RetryableOperationCache.h @@ -18,7 +18,6 @@ */ #pragma once -#include #include #include diff --git a/lib/SharedBuffer.h b/lib/SharedBuffer.h index 26fc59ed..a6ced186 100644 --- a/lib/SharedBuffer.h +++ b/lib/SharedBuffer.h @@ -151,11 +151,11 @@ class SharedBuffer { inline bool writable() const { return writableBytes() > 0; } - ASIO::const_buffers_1 const_asio_buffer() const { - return ASIO::const_buffers_1(ptr_ + readIdx_, readableBytes()); + ASIO::const_buffer const_asio_buffer() const { + return ASIO::const_buffer(ptr_ + readIdx_, readableBytes()); } - ASIO::mutable_buffers_1 asio_buffer() { + ASIO::mutable_buffer asio_buffer() { assert(data_); return ASIO::buffer(ptr_ + writeIdx_, writableBytes()); } diff --git a/lib/UnAckedMessageTrackerEnabled.cc b/lib/UnAckedMessageTrackerEnabled.cc index e371af99..3b959d8a 100644 --- a/lib/UnAckedMessageTrackerEnabled.cc +++ b/lib/UnAckedMessageTrackerEnabled.cc @@ -34,7 +34,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() { timeoutHandlerHelper(); ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get(); timer_ = executorService->createDeadlineTimer(); - timer_->expires_from_now(std::chrono::milliseconds(tickDurationInMs_)); + timer_->expires_after(std::chrono::milliseconds(tickDurationInMs_)); std::weak_ptr weakSelf{shared_from_this()}; timer_->async_wait([weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); @@ -173,9 +173,8 @@ void UnAckedMessageTrackerEnabled::clear() { } void UnAckedMessageTrackerEnabled::stop() { - ASIO_ERROR ec; if (timer_) { - timer_->cancel(ec); + timer_->cancel(); } } } /* namespace pulsar */ diff --git a/lib/stats/ConsumerStatsImpl.cc b/lib/stats/ConsumerStatsImpl.cc index 0eefabdc..e8bd919a 100644 --- a/lib/stats/ConsumerStatsImpl.cc +++ b/lib/stats/ConsumerStatsImpl.cc @@ -85,7 +85,7 @@ void ConsumerStatsImpl::messageAcknowledged(Result res, CommandAck_AckType ackTy } void ConsumerStatsImpl::scheduleTimer() { - timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_)); + timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_)); std::weak_ptr weakSelf{shared_from_this()}; timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h index 3333ea85..35fda9b4 100644 --- a/lib/stats/ConsumerStatsImpl.h +++ b/lib/stats/ConsumerStatsImpl.h @@ -59,10 +59,7 @@ class ConsumerStatsImpl : public std::enable_shared_from_this ConsumerStatsImpl(const ConsumerStatsImpl& stats); void flushAndReset(const ASIO_ERROR&); void start() override; - void stop() override { - ASIO_ERROR error; - timer_->cancel(error); - } + void stop() override { timer_->cancel(); } void receivedMessage(Message&, Result) override; void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override; virtual ~ConsumerStatsImpl(); diff --git a/lib/stats/ProducerStatsImpl.cc b/lib/stats/ProducerStatsImpl.cc index 15e9e67e..b5e00794 100644 --- a/lib/stats/ProducerStatsImpl.cc +++ b/lib/stats/ProducerStatsImpl.cc @@ -109,7 +109,7 @@ void ProducerStatsImpl::messageReceived(Result res, const ptime& publishTime) { ProducerStatsImpl::~ProducerStatsImpl() { timer_->cancel(); } void ProducerStatsImpl::scheduleTimer() { - timer_->expires_from_now(std::chrono::seconds(statsIntervalInSeconds_)); + timer_->expires_after(std::chrono::seconds(statsIntervalInSeconds_)); std::weak_ptr weakSelf{shared_from_this()}; timer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); diff --git a/tests/AuthPluginTest.cc b/tests/AuthPluginTest.cc index 24549d7f..ed0511ea 100644 --- a/tests/AuthPluginTest.cc +++ b/tests/AuthPluginTest.cc @@ -309,16 +309,17 @@ namespace testAthenz { std::string principalToken; void mockZTS(Latch& latch, int port) { LOG_INFO("-- MockZTS started"); - ASIO::io_service io; - ASIO::ip::tcp::iostream stream; + ASIO::io_context io; + ASIO::ip::tcp::socket socket(io); ASIO::ip::tcp::acceptor acceptor(io, ASIO::ip::tcp::endpoint(ASIO::ip::tcp::v4(), port)); LOG_INFO("-- MockZTS waiting for connnection"); latch.countdown(); - acceptor.accept(*stream.rdbuf()); + acceptor.accept(socket); LOG_INFO("-- MockZTS got connection"); std::string headerLine; + ASIO::ip::tcp::iostream stream(std::move(socket)); while (getline(stream, headerLine)) { std::vector kv; boost::algorithm::split(kv, headerLine, boost::is_any_of(" ")); diff --git a/tests/ConsumerTest.h b/tests/ConsumerTest.h index 82482875..9d190c10 100644 --- a/tests/ConsumerTest.h +++ b/tests/ConsumerTest.h @@ -46,8 +46,8 @@ class ConsumerTest { return nullptr; } auto timer = cnx->executor_->createDeadlineTimer(); - timer->expires_from_now(delaySinceStartGrabCnx - - std::chrono::milliseconds(impl->connectionTimeMs_ + 50)); + timer->expires_after(delaySinceStartGrabCnx - + std::chrono::milliseconds(impl->connectionTimeMs_ + 50)); timer->async_wait([cnx](const ASIO_ERROR&) { cnx->close(); }); return timer; } diff --git a/vcpkg b/vcpkg index 97dd2672..cd1099f4 160000 --- a/vcpkg +++ b/vcpkg @@ -1 +1 @@ -Subproject commit 97dd26728e3856ed1ab62ee74ee3a391d9c81d19 +Subproject commit cd1099f42a3c2ee28dc68e3db3f6f88658982736 diff --git a/vcpkg.json b/vcpkg.json index 5ff44100..cf1c7b94 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -2,22 +2,30 @@ "name": "pulsar-cpp", "version": "3.5.0", "description": "Pulsar C++ SDK", - "builtin-baseline": "b051745c68faa6f65c493371d564c4eb8af34dad", + "builtin-baseline": "cd1099f42a3c2ee28dc68e3db3f6f88658982736", "dependencies": [ { "name": "asio", "features": [ "openssl" ], - "version>=": "1.28.2" + "version>=": "1.32.0" }, { "name": "boost-accumulators", - "version>=": "1.83.0" + "version>=": "1.87.0" + }, + { + "name": "boost-asio", + "version>=": "1.87.0" + }, + { + "name": "boost-format", + "version>=": "1.87.0" }, { "name": "boost-property-tree", - "version>=": "1.83.0" + "version>=": "1.87.0" }, { "name": "curl", @@ -58,7 +66,7 @@ "dependencies": [ { "name": "boost-program-options", - "version>=": "1.83.0" + "version>=": "1.87.0" } ] },