Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added timeout to kvikio requests #643

Open
wants to merge 1 commit into
base: branch-25.04
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class defaults {
std::size_t _gds_threshold;
std::size_t _bounce_buffer_size;
std::size_t _http_max_attempts;
long _http_timeout;
std::vector<int> _http_status_codes;

static unsigned int get_num_threads_from_env();
Expand Down Expand Up @@ -255,6 +256,23 @@ class defaults {
*/
static void http_max_attempts_reset(std::size_t attempts);

/**
* @brief The maximum time, in seconds, the transfer is allowed to complete.
*
* Set the value using `kvikio::default::http_timeout_reset()` or by setting the
* `KVIKIO_HTTP_TIMEOUT` environment variable. If not set, the value is 60.
*
* @return The maximum time the transfer is allowed to complete.
*/
[[nodiscard]] static long http_timeout();

/**
* @brief Reset the http timeout.
*
* @param timeout_seconds The maximum time the transfer is allowed to complete.
*/
static void http_timeout_reset(long timeout_seconds);

/**
* @brief The list of HTTP status codes to retry.
*
Expand Down
20 changes: 19 additions & 1 deletion cpp/src/defaults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ defaults::defaults()
}
_http_max_attempts = env;
}

// Determine the default value of `http_timeout`
{
const long env = getenv_or("KVIKIO_HTTP_TIMEOUT", 60);
if (env <= 0) {
throw std::invalid_argument("KVIKIO_HTTP_TIMEOUT has to be a positive integer");
}
_http_timeout = env;
}

// Determine the default value of `http_status_codes`
{
_http_status_codes =
Expand Down Expand Up @@ -211,10 +221,18 @@ void defaults::http_max_attempts_reset(std::size_t attempts)
}

std::vector<int> const& defaults::http_status_codes() { return instance()->_http_status_codes; }

void defaults::http_status_codes_reset(std::vector<int> status_codes)
{
instance()->_http_status_codes = std::move(status_codes);
}

long defaults::http_timeout() { return instance()->_http_timeout; }
void defaults::http_timeout_reset(long timeout_seconds)
{
if (timeout_seconds <= 0) {
throw std::invalid_argument("timeout_seconds must be a positive integer");
}
instance()->_http_timeout = timeout_seconds;
}

} // namespace kvikio
28 changes: 20 additions & 8 deletions cpp/src/shim/libcurl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ CurlHandle::CurlHandle(LibCurl::UniqueHandlePtr handle,

// Make curl_easy_perform() fail when receiving HTTP code errors.
setopt(CURLOPT_FAILONERROR, 1L);

// Make requests time out after `value` seconds.
setopt(CURLOPT_TIMEOUT, kvikio::defaults::http_timeout());
}

CurlHandle::~CurlHandle() noexcept { LibCurl::instance().retain_handle(std::move(_handle)); }
Expand All @@ -125,9 +128,10 @@ void CurlHandle::perform()
auto max_delay = 4000; // milliseconds
auto http_max_attempts = kvikio::defaults::http_max_attempts();
auto& http_status_codes = kvikio::defaults::http_status_codes();
CURLcode err;

while (attempt_count++ < http_max_attempts) {
auto err = curl_easy_perform(handle());
err = curl_easy_perform(handle());

if (err == CURLE_OK) {
// We set CURLE_HTTP_RETURNED_ERROR, so >= 400 status codes are considered
Expand All @@ -141,7 +145,7 @@ void CurlHandle::perform()
(std::find(http_status_codes.begin(), http_status_codes.end(), http_code) !=
http_status_codes.end());

if (is_retryable_response) {
if ((err == CURLE_OPERATION_TIMEDOUT) || is_retryable_response) {
// backoff and retry again. With a base value of 500ms, we retry after
// 500ms, 1s, 2s, 4s, ...
auto const backoff_delay = base_delay * (1 << std::min(attempt_count - 1, 4));
Expand All @@ -150,9 +154,14 @@ void CurlHandle::perform()

// Only print this message out and sleep if we're actually going to retry again.
if (attempt_count < http_max_attempts) {
std::cout << "KvikIO: Got HTTP code " << http_code << ". Retrying after " << delay
<< "ms (attempt " << attempt_count << " of " << http_max_attempts << ")."
<< std::endl;
if (err == CURLE_OPERATION_TIMEDOUT) {
std::cout << "KvikIO: Timeout error. Retrying after " << delay << "ms (attempt "
<< attempt_count << " of " << http_max_attempts << ")." << std::endl;
} else {
std::cout << "KvikIO: Got HTTP code " << http_code << ". Retrying after " << delay
<< "ms (attempt " << attempt_count << " of " << http_max_attempts << ")."
<< std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
}
} else {
Expand All @@ -170,11 +179,14 @@ void CurlHandle::perform()
}
}

// We've exceeded the maximum number of requests. Fail with a good error
// message.
std::stringstream ss;
ss << "KvikIO: HTTP request reached maximum number of attempts (" << http_max_attempts
<< "). Got HTTP code " << http_code << ".";
<< "). Reason: ";
if (err == CURLE_OPERATION_TIMEDOUT) {
ss << "Operation timed out.";
} else {
ss << "Got HTTP code " << http_code << ".";
}
throw std::runtime_error(ss.str());
}
} // namespace kvikio
4 changes: 3 additions & 1 deletion docs/source/runtime_settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ This setting can also be controlled by :py:func:`kvikio.defaults.bounce_buffer_s
#### HTTP Retries
-----------------

The behavior when a remote IO read returns a error can be controlled through the `KVIKIO_HTTP_STATUS_CODES` and `KVIKIO_HTTP_MAX_ATTEMPTS` environment variables.
The behavior when a remote IO read returns a error can be controlled through the `KVIKIO_HTTP_STATUS_CODES`, `KVIKIO_HTTP_MAX_ATTEMPTS`, and `KVIKIO_HTTP_TIMEOUT` environment variables.

`KVIKIO_HTTP_STATUS_CODES` controls the status codes to retry and can be controlled by :py:func:`kvikio.defaults.http_status_codes`, :py:func:`kvikio.defaults.http_status_codes_reset`, and :py:func:`kvikio.defaults.set_http_status_codes`.

`KVIKIO_HTTP_MAX_ATTEMPTS` controls the maximum number of attempts to make before throwing an exception and can be controlled by :py:func:`kvikio.defaults.http_max_attempts`, :py:func:`kvikio.defaults.http_max_attempts_reset`, and :py:func:`kvikio.defaults.set_http_max_attempts`.

`KVIKIO_HTTP_TIMEOUT` controls the maximum duration of the HTTP request and can be controlled by :py:func:`kvikio.defaults.http_timoeout`, :py:func:`kvikio.defaults.http_timeout_reset`, and :py:func:`kvikio.defaults.set_http_timeout`.
14 changes: 14 additions & 0 deletions python/kvikio/kvikio/_lib/defaults.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ cdef extern from "<kvikio/defaults.hpp>" namespace "kvikio" nogil:
size_t cpp_bounce_buffer_size "kvikio::defaults::bounce_buffer_size"() except +
void cpp_bounce_buffer_size_reset \
"kvikio::defaults::bounce_buffer_size_reset"(size_t nbytes) except +

size_t cpp_http_max_attempts "kvikio::defaults::http_max_attempts"() except +
void cpp_http_max_attempts_reset \
"kvikio::defaults::http_max_attempts_reset"(size_t attempts) except +

long cpp_http_timeout "kvikio::defaults::http_timeout"() except +
void cpp_http_timeout_reset \
"kvikio::defaults::http_timeout_reset"(long timeout_seconds) except +

vector[int] cpp_http_status_codes "kvikio::defaults::http_status_codes"() except +
void cpp_http_status_codes_reset \
"kvikio::defaults::http_status_codes_reset"(vector[int] status_codes) except +
Expand Down Expand Up @@ -85,6 +91,14 @@ def http_max_attempts_reset(attempts: int) -> None:
cpp_http_max_attempts_reset(attempts)


def http_timeout() -> int:
return cpp_http_timeout()


def http_timeout_reset(timeout_seconds: int) -> None:
cpp_http_timeout_reset(timeout_seconds)


def http_status_codes() -> list[int]:
return cpp_http_status_codes()

Expand Down
43 changes: 43 additions & 0 deletions python/kvikio/kvikio/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,49 @@ def set_http_max_attempts(attempts: int):
http_max_attempts_reset(old_value)


def http_timeout() -> int:
"""Get the maximum duration, in seconds, HTTP requests are allowed to take.

Set the value using :py:func:``kvikio.defaults.set_http_timeout`` or by
setting the ``KVIKIO_HTTP_TIMEOUT`` environment variable. If not set, the
default value is 60.

Returns
-------
timeout : int
The maximum duration HTTP requests are allowed to take.
"""
return kvikio._lib.defaults.http_timeout()


def http_timeout_reset(timeout_seconds: int) -> None:
"""Reset the maximum duration HTTP requests are allowed to take.

Parameters
----------
timeout_seconds : int
The maximum duration, in seconds, HTTP requests are allowed to take.
"""
kvikio._lib.defaults.http_timeout_reset(timeout_seconds)


@contextlib.contextmanager
def set_http_timeout(timeout_seconds: int):
"""Context for resetting the the maximum duration of HTTP requests.

Parameters
----------
timeout_seconds : int
The maximum duration, in seconds, HTTP requests are allowed to take.
"""
old_value = http_timeout()
try:
http_timeout_reset(timeout_seconds)
yield
finally:
http_timeout_reset(old_value)


def http_status_codes() -> list[int]:
"""Get the list of HTTP status codes to retry.

Expand Down
Loading
Loading