Skip to content

Commit

Permalink
Separate more interface and definition. Add comments on std::future. …
Browse files Browse the repository at this point in the history
…Mark noexcept to compat mode-related functions (#588)

This PR performs makes the following three improvements:
- Separates interface and definition for `file_handle.hpp` that was missed in the previous PR #581.
- To help avoid UB (e.g. program crash) for downstream applications, adds the following qualifying remark to the returned future object of `pread/pwrite`:
  >The `std::future` object's `wait()` or `get()` should not be called after the lifetime of the FileHandle object ends. Otherwise, the behavior is undefined.
- Add `noexcept` specifier to compatibility mode-related functions.

Authors:
  - Tianyu Liu (https://github.com/kingcrimsontianyu)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #588
  • Loading branch information
kingcrimsontianyu authored Jan 22, 2025
1 parent e5888f8 commit 716a99f
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 259 deletions.
4 changes: 2 additions & 2 deletions cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class defaults {
* (`ON`/`OFF`/`AUTO`) to two (`ON`/`OFF`) so as to determine the actual I/O path. This function
* is lightweight as the inferred result is cached.
*/
static CompatMode infer_compat_mode_if_auto(CompatMode compat_mode);
static CompatMode infer_compat_mode_if_auto(CompatMode compat_mode) noexcept;

/**
* @brief Given a requested compatibility mode, whether it is expected to reduce to `ON`.
Expand All @@ -156,7 +156,7 @@ class defaults {
* @param compat_mode Compatibility mode.
* @return Boolean answer.
*/
static bool is_compat_mode_preferred(CompatMode compat_mode);
static bool is_compat_mode_preferred(CompatMode compat_mode) noexcept;

/**
* @brief Whether the global compatibility mode from class defaults is expected to be `ON`.
Expand Down
258 changes: 25 additions & 233 deletions cpp/include/kvikio/file_handle.hpp

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions cpp/include/kvikio/shim/cufile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ class cuFileAPI {
* @return The boolean answer
*/
#ifdef KVIKIO_CUFILE_FOUND
bool is_cufile_library_available();
bool is_cufile_library_available() noexcept;
#else
constexpr bool is_cufile_library_available() { return false; }
constexpr bool is_cufile_library_available() noexcept { return false; }
#endif

/**
Expand All @@ -115,7 +115,7 @@ constexpr bool is_cufile_library_available() { return false; }
*
* @return The boolean answer
*/
bool is_cufile_available();
bool is_cufile_available() noexcept;

/**
* @brief Get cufile version (or zero if older than v1.8).
Expand All @@ -129,9 +129,9 @@ bool is_cufile_available();
* @return The version (1000*major + 10*minor) or zero if older than 1080.
*/
#ifdef KVIKIO_CUFILE_FOUND
int cufile_version();
int cufile_version() noexcept;
#else
constexpr int cufile_version() { return 0; }
constexpr int cufile_version() noexcept { return 0; }
#endif

/**
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/kvikio/shim/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void get_symbol(T& handle, void* lib, const char* name)
*
* @return The boolean answer
*/
[[nodiscard]] bool is_running_in_wsl();
[[nodiscard]] bool is_running_in_wsl() noexcept;

/**
* @brief Check if `/run/udev` is readable
Expand All @@ -91,6 +91,6 @@ void get_symbol(T& handle, void* lib, const char* name)
*
* @return The boolean answer
*/
[[nodiscard]] bool run_udev_readable();
[[nodiscard]] bool run_udev_readable() noexcept;

} // namespace kvikio
4 changes: 2 additions & 2 deletions cpp/src/defaults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ CompatMode defaults::compat_mode() { return instance()->_compat_mode; }

void defaults::compat_mode_reset(CompatMode compat_mode) { instance()->_compat_mode = compat_mode; }

CompatMode defaults::infer_compat_mode_if_auto(CompatMode compat_mode)
CompatMode defaults::infer_compat_mode_if_auto(CompatMode compat_mode) noexcept
{
if (compat_mode == CompatMode::AUTO) {
static auto inferred_compat_mode_for_auto = []() -> CompatMode {
Expand All @@ -154,7 +154,7 @@ CompatMode defaults::infer_compat_mode_if_auto(CompatMode compat_mode)
return compat_mode;
}

bool defaults::is_compat_mode_preferred(CompatMode compat_mode)
bool defaults::is_compat_mode_preferred(CompatMode compat_mode) noexcept
{
return compat_mode == CompatMode::ON ||
(compat_mode == CompatMode::AUTO &&
Expand Down
280 changes: 280 additions & 0 deletions cpp/src/file_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,58 @@ FileHandle::FileHandle(const std::string& file_path,
}
}

FileHandle::FileHandle(FileHandle&& o) noexcept
: _fd_direct_on{std::exchange(o._fd_direct_on, -1)},
_fd_direct_off{std::exchange(o._fd_direct_off, -1)},
_initialized{std::exchange(o._initialized, false)},
_compat_mode{std::exchange(o._compat_mode, CompatMode::AUTO)},
_nbytes{std::exchange(o._nbytes, 0)},
_handle{std::exchange(o._handle, CUfileHandle_t{})}
{
}

FileHandle& FileHandle::operator=(FileHandle&& o) noexcept
{
_fd_direct_on = std::exchange(o._fd_direct_on, -1);
_fd_direct_off = std::exchange(o._fd_direct_off, -1);
_initialized = std::exchange(o._initialized, false);
_compat_mode = std::exchange(o._compat_mode, CompatMode::AUTO);
_nbytes = std::exchange(o._nbytes, 0);
_handle = std::exchange(o._handle, CUfileHandle_t{});
return *this;
}

FileHandle::~FileHandle() noexcept { close(); }

bool FileHandle::closed() const noexcept { return !_initialized; }

void FileHandle::close() noexcept
{
try {
if (closed()) { return; }

if (!is_compat_mode_preferred()) { cuFileAPI::instance().HandleDeregister(_handle); }
_compat_mode = CompatMode::AUTO;
::close(_fd_direct_off);
if (_fd_direct_on != -1) { ::close(_fd_direct_on); }
_fd_direct_on = -1;
_fd_direct_off = -1;
_initialized = false;
} catch (...) {
}
}

CUfileHandle_t FileHandle::handle()
{
if (closed()) { throw CUfileException("File handle is closed"); }
if (is_compat_mode_preferred()) {
throw CUfileException("The underlying cuFile handle isn't available in compatibility mode");
}
return _handle;
}

int FileHandle::fd() const noexcept { return _fd_direct_off; }

int FileHandle::fd_open_flags() const { return open_flags(_fd_direct_off); }

std::size_t FileHandle::nbytes() const
Expand All @@ -172,4 +224,232 @@ std::size_t FileHandle::nbytes() const
return _nbytes;
}

std::size_t FileHandle::read(void* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset,
bool sync_default_stream)
{
if (is_compat_mode_preferred()) {
return detail::posix_device_read(_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
}
if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); }

KVIKIO_NVTX_SCOPED_RANGE("cufileRead()", size);
ssize_t ret = cuFileAPI::instance().Read(
_handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
CUFILE_CHECK_BYTES_DONE(ret);
return ret;
}

std::size_t FileHandle::write(const void* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset,
bool sync_default_stream)
{
_nbytes = 0; // Invalidate the computed file size

if (is_compat_mode_preferred()) {
return detail::posix_device_write(
_fd_direct_off, devPtr_base, size, file_offset, devPtr_offset);
}
if (sync_default_stream) { CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr)); }

KVIKIO_NVTX_SCOPED_RANGE("cufileWrite()", size);
ssize_t ret = cuFileAPI::instance().Write(
_handle, devPtr_base, size, convert_size2off(file_offset), convert_size2off(devPtr_offset));
if (ret == -1) {
throw std::system_error(errno, std::generic_category(), "Unable to write file");
}
if (ret < -1) {
throw CUfileException(std::string{"cuFile error at: "} + __FILE__ + ":" +
KVIKIO_STRINGIFY(__LINE__) + ": " + CUFILE_ERRSTR(ret));
}
return ret;
}

std::future<std::size_t> FileHandle::pread(void* buf,
std::size_t size,
std::size_t file_offset,
std::size_t task_size,
std::size_t gds_threshold,
bool sync_default_stream)
{
KVIKIO_NVTX_MARKER("FileHandle::pread()", size);
if (is_host_memory(buf)) {
auto op = [this](void* hostPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t hostPtr_offset) -> std::size_t {
char* buf = static_cast<char*>(hostPtr_base) + hostPtr_offset;
return detail::posix_host_read<detail::PartialIO::NO>(_fd_direct_off, buf, size, file_offset);
};

return parallel_io(op, buf, size, file_offset, task_size, 0);
}

CUcontext ctx = get_context_from_pointer(buf);

// Shortcut that circumvent the threadpool and use the POSIX backend directly.
if (size < gds_threshold) {
auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
PushAndPopContext c(ctx);
return detail::posix_device_read(_fd_direct_off, buf, size, file_offset, 0);
};
return std::async(std::launch::deferred, task);
}

// Let's synchronize once instead of in each task.
if (sync_default_stream && !is_compat_mode_preferred()) {
PushAndPopContext c(ctx);
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr));
}

// Regular case that use the threadpool and run the tasks in parallel
auto task = [this, ctx](void* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset) -> std::size_t {
PushAndPopContext c(ctx);
return read(devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false);
};
auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
return parallel_io(task, devPtr_base, size, file_offset, task_size, devPtr_offset);
}

std::future<std::size_t> FileHandle::pwrite(const void* buf,
std::size_t size,
std::size_t file_offset,
std::size_t task_size,
std::size_t gds_threshold,
bool sync_default_stream)
{
KVIKIO_NVTX_MARKER("FileHandle::pwrite()", size);
if (is_host_memory(buf)) {
auto op = [this](const void* hostPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t hostPtr_offset) -> std::size_t {
const char* buf = static_cast<const char*>(hostPtr_base) + hostPtr_offset;
return detail::posix_host_write<detail::PartialIO::NO>(
_fd_direct_off, buf, size, file_offset);
};

return parallel_io(op, buf, size, file_offset, task_size, 0);
}

CUcontext ctx = get_context_from_pointer(buf);

// Shortcut that circumvent the threadpool and use the POSIX backend directly.
if (size < gds_threshold) {
auto task = [this, ctx, buf, size, file_offset]() -> std::size_t {
PushAndPopContext c(ctx);
return detail::posix_device_write(_fd_direct_off, buf, size, file_offset, 0);
};
return std::async(std::launch::deferred, task);
}

// Let's synchronize once instead of in each task.
if (sync_default_stream && !is_compat_mode_preferred()) {
PushAndPopContext c(ctx);
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(nullptr));
}

// Regular case that use the threadpool and run the tasks in parallel
auto op = [this, ctx](const void* devPtr_base,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset) -> std::size_t {
PushAndPopContext c(ctx);
return write(devPtr_base, size, file_offset, devPtr_offset, /* sync_default_stream = */ false);
};
auto [devPtr_base, base_size, devPtr_offset] = get_alloc_info(buf, &ctx);
return parallel_io(op, devPtr_base, size, file_offset, task_size, devPtr_offset);
}

void FileHandle::read_async(void* devPtr_base,
std::size_t* size_p,
off_t* file_offset_p,
off_t* devPtr_offset_p,
ssize_t* bytes_read_p,
CUstream stream)
{
if (is_compat_mode_preferred_for_async(_compat_mode)) {
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
*bytes_read_p =
static_cast<ssize_t>(read(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
} else {
CUFILE_TRY(cuFileAPI::instance().ReadAsync(
_handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream));
}
}

StreamFuture FileHandle::read_async(
void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream)
{
StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_] =
ret.get_args();
read_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_read_p, stream_);
return ret;
}

void FileHandle::write_async(void* devPtr_base,
std::size_t* size_p,
off_t* file_offset_p,
off_t* devPtr_offset_p,
ssize_t* bytes_written_p,
CUstream stream)
{
if (is_compat_mode_preferred_for_async(_compat_mode)) {
CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(stream));
*bytes_written_p =
static_cast<ssize_t>(write(devPtr_base, *size_p, *file_offset_p, *devPtr_offset_p));
} else {
CUFILE_TRY(cuFileAPI::instance().WriteAsync(
_handle, devPtr_base, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream));
}
}

StreamFuture FileHandle::write_async(
void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream)
{
StreamFuture ret(devPtr_base, size, file_offset, devPtr_offset, stream);
auto [devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_] =
ret.get_args();
write_async(devPtr_base_, size_p, file_offset_p, devPtr_offset_p, bytes_written_p, stream_);
return ret;
}

bool FileHandle::is_compat_mode_preferred() const noexcept
{
return defaults::is_compat_mode_preferred(_compat_mode);
}

bool FileHandle::is_compat_mode_preferred_for_async() const noexcept
{
static bool is_extra_symbol_available = is_stream_api_available();
static bool is_config_path_empty = config_path().empty();
return is_compat_mode_preferred() || !is_extra_symbol_available || is_config_path_empty;
}

bool FileHandle::is_compat_mode_preferred_for_async(CompatMode requested_compat_mode)
{
if (defaults::is_compat_mode_preferred(requested_compat_mode)) { return true; }

if (!is_stream_api_available()) {
if (requested_compat_mode == CompatMode::AUTO) { return true; }
throw std::runtime_error("Missing the cuFile stream api.");
}

// When checking for availability, we also check if cuFile's config file exists. This is
// because even when the stream API is available, it doesn't work if no config file exists.
if (config_path().empty()) {
if (requested_compat_mode == CompatMode::AUTO) { return true; }
throw std::runtime_error("Missing cuFile configuration file.");
}
return false;
}

} // namespace kvikio
Loading

0 comments on commit 716a99f

Please sign in to comment.