Skip to content

Commit 0e88a8e

Browse files
committed
Reimplement Interruptible using Thread#queue
* Replaces a little Unix cleverness with a standard Ruby class. This pushes the responsibiity for meeting the SQ requirements from SQ to stock Ruby * Delivers equivelent performance, identical API, and API behaviors with the original implementation (see note below on Futures) * Mostly fixes a *platform / version dependent* issue with MySQL (see below) * Meets 100% of SQ's functional requirements: * interruptible_sleep: a potentially blocking operation interruptible via either a "wake_event" (possibly requested prior to entering interruptible_sleep) or blocking until a timeout. * wake_up / interrupt: a Signal#trap and thread-safe method that does not require user-level synchronization (with the risk of not fully understanding all of the complexities required) code that either interrupts an inflight-interruptible_sleep or enqueues the event to processed in the invocation of interruptible_sleep * Interruptible's API is trivially reproduceable via Thread::Queue * interruptible_sleep => Queue.pop(timeout:) where pushing anything into the queue acts as the interrupt event and timeout is reliable without any extra code or exception handling. * wake_up / interrupt => Queue.push(Anything) is thread, fiber, and Signal.trap safe (can be called from anywhere) and captures all wake_up events whenever requested, automaticall caching any "event" not processed by a currently executing interruptible_sleep matching existing functionality exactly. Why the Future in #interruptible_sleep? While Thread::Queue micro benchmarks as having the same performance on the main thread Vs. any form of a sub-thread (or Fiber) and self-pipe, when running the SQ test suite we see a 35% slow down Vs. the original self-pipe implenentation. One assumes this slowdown would manifest in production. By moving the just the Queue#pop into a separate thread via Concurrent::Promises.future we get +/- identical performance to the original self-pipe implementation. I'm assuming this root causes to Ruby main-thread only housekeeping and/or possibly triggering a fast/slow path issue. Why a Future Vs. Thread#new for each interruptible_sleep call? Every other threaded operation in SQ is implemented using Concurrent Ruby. Using a Future is for code and architectual consistency. There is no difference in performance or functionality between the two. MySQL *only* issues: There seems to be a *platform specific* or *version specific* problem with MySQL database connectivity and/or broken self-pipes leading to randomly failing tests and a stream of distracting backtraces *even with successful* tests. Adding to the complexity sometimes, the lost database connection can self-heal -- HOWEVER -- this takes time and given how much of the test suite has time based assertions, leads to additional random test failures. These, or similar, issues have been observed in the past when changes to the MySQL client library forced changes in the mysql2 gem. With the Thread::Queue based implementation of the Interruptible concern, the random failures and amount of spurious output are dramatically improved (but not eliminated).
1 parent da78382 commit 0e88a8e

File tree

1 file changed

+12
-18
lines changed

1 file changed

+12
-18
lines changed

lib/solid_queue/processes/interruptible.rb

+12-18
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,25 @@ def wake_up
77
end
88

99
private
10-
SELF_PIPE_BLOCK_SIZE = 11
1110

1211
def interrupt
13-
self_pipe[:writer].write_nonblock(".")
14-
rescue Errno::EAGAIN, Errno::EINTR
15-
# Ignore writes that would block and retry
16-
# if another signal arrived while writing
17-
retry
12+
queue << true
1813
end
1914

2015
def interruptible_sleep(time)
21-
if time > 0 && self_pipe[:reader].wait_readable(time)
22-
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
23-
end
24-
rescue Errno::EAGAIN, Errno::EINTR
16+
# Since this is invoked on the main thread, using some form of Async
17+
# avoids a 35% slowdown (at least when running the test suite).
18+
#
19+
# Using Futures for architectural consistency with all the other Async in SolidQueue.
20+
Concurrent::Promises.future(time) do |timeout|
21+
if timeout > 0 && queue.pop(timeout:)
22+
queue.clear # exiting the poll wait guarantees testing for SHUTDOWN before next poll
23+
end
24+
end.value
2525
end
2626

27-
# Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html)
28-
def self_pipe
29-
@self_pipe ||= create_self_pipe
30-
end
31-
32-
def create_self_pipe
33-
reader, writer = IO.pipe
34-
{ reader: reader, writer: writer }
27+
def queue
28+
@queue ||= Queue.new
3529
end
3630
end
3731
end

0 commit comments

Comments
 (0)