Skip to content

Commit

Permalink
backend/epoll: implement eventfd wakeup notification (#128)
Browse files Browse the repository at this point in the history
Tries to mimic what happens in backend/kqueue.

Closes #4
  • Loading branch information
mitchellh authored Feb 5, 2025
2 parents a2e809b + 378a396 commit c6e8719
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 7 deletions.
79 changes: 73 additions & 6 deletions src/backend/epoll.zig
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub const Loop = struct {

fd: posix.fd_t,

/// The eventfd that this epoll queue always has a filter for. Writing
/// an empty message to this eventfd can be used to wake up the loop
/// at any time. Waking up the loop via this eventfd won't trigger any
/// particular completion, it just forces tick to cycle.
eventfd: xev.Async,

/// The number of active completions. This DOES NOT include completions that
/// are queued in the submissions queue.
active: usize = 0,
Expand Down Expand Up @@ -56,8 +62,12 @@ pub const Loop = struct {
} = .{},

pub fn init(options: xev.Options) !Loop {
var eventfd = try xev.Async.init();
errdefer eventfd.deinit();

var res: Loop = .{
.fd = try posix.epoll_create1(std.os.linux.EPOLL.CLOEXEC),
.eventfd = eventfd,
.thread_pool = options.thread_pool,
.thread_pool_completions = undefined,
.cached_now = undefined,
Expand All @@ -68,6 +78,7 @@ pub const Loop = struct {

pub fn deinit(self: *Loop) void {
posix.close(self.fd);
self.eventfd.deinit();
}

/// Run the event loop. See RunMode documentation for details on modes.
Expand Down Expand Up @@ -262,20 +273,51 @@ pub const Loop = struct {
// Initialize
if (!self.flags.init) {
self.flags.init = true;

if (self.thread_pool != null) {
self.thread_pool_completions.init();
}

var ev: linux.epoll_event = .{
.events = linux.EPOLL.IN | linux.EPOLL.RDHUP,
.data = .{ .fd = self.eventfd.fd },
};
posix.epoll_ctl(
self.fd,
linux.EPOLL.CTL_ADD,
self.eventfd.fd,
&ev,
) catch |err| {
// We reset initialization because we can't do anything
// safely unless we get this mach port registered!
self.flags.init = false;
return err;
};
}

// Submit all the submissions. We copy the submission queue so that
// any resubmits don't cause an infinite loop.
var wait_rem: usize = @intCast(wait);
var queued = self.submissions;
self.submissions = .{};
while (queued.pop()) |c| {
// We ignore any completions that aren't in the adding state.
// This usually means that we switched them to be deleted or
// something.
if (c.flags.state != .adding) continue;

// These operations happen synchronously. Ensure they are
// decremented from wait_rem.
switch (c.op) {
.cancel,
// should noop be counted?
// .noop,
.shutdown,
.timer,
=> wait_rem -|= 1,
else => {},
}

self.start(c);
}

Expand All @@ -294,7 +336,6 @@ pub const Loop = struct {

// Wait and process events. We only do this if we have any active.
var events: [1024]linux.epoll_event = undefined;
var wait_rem: usize = @intCast(wait);
while (self.active > 0 and (wait == 0 or wait_rem > 0)) {
self.update_now();
const now_timer: Operation.Timer = .{ .next = self.cached_now };
Expand Down Expand Up @@ -347,9 +388,7 @@ pub const Loop = struct {
const timeout: i32 = if (wait_rem == 0) 0 else timeout: {
// If we have a timer, we want to set the timeout to our next
// timer value. If we have no timer, we wait forever.
// TODO: do not wait 100ms here, use an eventfd for our
// thread pool to wake us up.
const t = self.timers.peek() orelse break :timeout 100;
const t = self.timers.peek() orelse break :timeout -1;

// Determine the time in milliseconds.
const ms_now = @as(u64, @intCast(self.cached_now.tv_sec)) * std.time.ms_per_s +
Expand All @@ -369,6 +408,13 @@ pub const Loop = struct {

// Process all our events and invoke their completion handlers
for (events[0..n]) |ev| {
// Handle wakeup eventfd
if (ev.data.fd == self.eventfd.fd) {
var buffer: u64 = undefined;
_ = posix.read(self.eventfd.fd, std.mem.asBytes(&buffer)) catch {};
continue;
}

const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr)));

// We get the fd and mark this as in progress we can properly
Expand Down Expand Up @@ -415,6 +461,7 @@ pub const Loop = struct {
const pool = self.thread_pool orelse return error.ThreadPoolRequired;

// Setup our completion state so that thread_perform can do stuff
c.task_loop = self;
c.task_completions = &self.thread_pool_completions;
c.task = .{ .callback = Loop.thread_perform };

Expand All @@ -436,6 +483,14 @@ pub const Loop = struct {

// Add to our completion queue
c.task_completions.push(c);

// Wake up our main loop
c.task_loop.wakeup() catch {};
}

/// Sends an empty message to this loop's eventfd so that it wakes up.
fn wakeup(self: *Loop) !void {
try self.eventfd.notify();
}

fn start(self: *Loop, completion: *Completion) void {
Expand Down Expand Up @@ -658,6 +713,13 @@ pub const Loop = struct {
},

.close => |v| res: {
if (completion.flags.threadpool) {
if (self.thread_schedule(completion)) |_|
return
else |err|
break :res .{ .close = err };
}

posix.close(v.fd);
break :res .{ .close = {} };
},
Expand Down Expand Up @@ -800,6 +862,7 @@ pub const Completion = struct {
/// reliable way to get access to the loop and shouldn't be used
/// except internally.
task: ThreadPool.Task = undefined,
task_loop: *Loop = undefined,
task_completions: *Loop.TaskCompletionQueue = undefined,
task_result: Result = undefined,

Expand Down Expand Up @@ -866,7 +929,6 @@ pub const Completion = struct {
// This should never happen because we always do these synchronously
// or in another location.
.cancel,
.close,
.noop,
.shutdown,
.timer,
Expand Down Expand Up @@ -972,6 +1034,11 @@ pub const Completion = struct {
err,
};
},

.close => |*op| res: {
posix.close(op.fd);
break :res .{ .close = {} };
},
};
}

Expand Down Expand Up @@ -1234,7 +1301,7 @@ pub const AcceptError = posix.EpollCtlError || error{
Unknown,
};

pub const CloseError = posix.EpollCtlError || error{
pub const CloseError = posix.EpollCtlError || ThreadPoolError || error{
Unknown,
};

Expand Down
6 changes: 5 additions & 1 deletion src/backend/kqueue.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,6 @@ pub const Completion = struct {
fn perform(self: *Completion, ev_: ?*const Kevent) Result {
return switch (self.op) {
.cancel,
.close,
.noop,
.timer,
.shutdown,
Expand Down Expand Up @@ -1236,6 +1235,11 @@ pub const Completion = struct {

break :res .{ .proc = 0 };
},

.close => |*op| res: {
posix.close(op.fd);
break :res .{ .close = {} };
},
};
}

Expand Down
16 changes: 16 additions & 0 deletions src/watcher/stream.zig
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ pub fn Closeable(comptime xev: type, comptime T: type, comptime options: Options
}).callback,
};

// If we're dup-ing, then we ask the backend to manage the fd.
switch (xev.backend) {
.io_uring,
.wasi_poll,
.iocp,
=> {},

.epoll => {
c.flags.threadpool = true;
},

.kqueue => {
c.flags.threadpool = true;
},
}

loop.add(c);
}
};
Expand Down

0 comments on commit c6e8719

Please sign in to comment.