Skip to content

Commit a152f26

Browse files
committed
Reimplement Interruptible using Thread#queue
* Replaces a little Unix cleverness with a standard Ruby class. This pushes the responsibiity for meeting the SQ requirements from SQ to stock Ruby * Delivers equivelent performance, identical API, and API behaviors with the original implementation (see note below on Futures) * Mostly fixes a *platform / version dependent* issue with MySQL (see below) * Meets 100% of SQ's functional requirements: * interruptible_sleep: a potentially blocking operation interruptible via either a "wake_event" (possibly requested prior to entering interruptible_sleep) or blocking until a timeout. * wake_up / interrupt: a Signal#trap and thread-safe method that does not require user-level synchronization (with the risk of not fully understanding all of the complexities required) code that either interrupts an inflight-interruptible_sleep or enqueues the event to processed in the invocation of interruptible_sleep * Interruptible's API is trivially reproduceable via Thread::Queue * interruptible_sleep => Queue.pop(timeout:) where pushing anything into the queue acts as the interrupt event and timeout is reliable without any extra code or exception handling. * wake_up / interrupt => Queue.push(Anything) is thread, fiber, and Signal.trap safe (can be called from anywhere) and captures all wake_up events whenever requested, automaticall caching any "event" not processed by a currently executing interruptible_sleep matching existing functionality exactly. Why the Future in #interruptible_sleep? While Thread::Queue micro benchmarks as having the same performance on the main thread Vs. any form of a sub-thread (or Fiber) and self-pipe, when running the SQ test suite we see a 35% slow down Vs. the original self-pipe implenentation. One assumes this slowdown would manifest in production. By moving the just the Queue#pop into a separate thread via Concurrent::Promises.future we get +/- identical performance to the original self-pipe implementation. I'm assuming this root causes to Ruby main-thread only housekeeping and/or possibly triggering a fast/slow path issue. Why a Future Vs. Thread#new for each interruptible_sleep call? Every other threaded operation in SQ is implemented using Concurrent Ruby. Using a Future is for code and architectual consistency. There is no difference in performance or functionality between the two. MySQL *only* issues: There seems to be a *platform specific* or *version specific* problem with MySQL database connectivity and/or broken self-pipes leading to randomly failing tests and a stream of distracting backtraces *even with successful* tests. Adding to the complexity sometimes, the lost database connection can self-heal -- HOWEVER -- this takes time and given how much of the test suite has time based assertions, leads to additional random test failures. These, or similar, issues have been observed in the past when changes to the MySQL client library forced changes in the mysql2 gem. With the Thread::Queue based implementation of the Interruptible concern, the random failures and amount of spurious output are dramatically improved (but not eliminated).
1 parent 6cc550e commit a152f26

File tree

2 files changed

+15
-21
lines changed

2 files changed

+15
-21
lines changed

Gemfile.lock

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ GEM
5454
concurrent-ruby (1.3.4)
5555
connection_pool (2.4.1)
5656
crass (1.0.6)
57+
date (3.4.1)
5758
debug (1.7.1)
5859
irb (>= 1.5.0)
5960
reline (>= 0.3.1)
@@ -98,7 +99,8 @@ GEM
9899
ast (~> 2.4.1)
99100
racc
100101
pg (1.5.4)
101-
psych (5.2.0)
102+
psych (5.2.1)
103+
date
102104
stringio
103105
puma (6.4.3)
104106
nio4r (~> 2.0)
@@ -130,8 +132,6 @@ GEM
130132
rake (13.2.1)
131133
rdoc (6.8.1)
132134
psych (>= 4.0.0)
133-
rdoc (6.6.3.1)
134-
psych (>= 4.0.0)
135135
regexp_parser (2.9.2)
136136
reline (0.5.12)
137137
io-console (~> 0.5)

lib/solid_queue/processes/interruptible.rb

+12-18
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,25 @@ def wake_up
77
end
88

99
private
10-
SELF_PIPE_BLOCK_SIZE = 11
1110

1211
def interrupt
13-
self_pipe[:writer].write_nonblock(".")
14-
rescue Errno::EAGAIN, Errno::EINTR
15-
# Ignore writes that would block and retry
16-
# if another signal arrived while writing
17-
retry
12+
queue << true
1813
end
1914

2015
def interruptible_sleep(time)
21-
if time > 0 && self_pipe[:reader].wait_readable(time)
22-
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
23-
end
24-
rescue Errno::EAGAIN, Errno::EINTR
16+
# Since this is invoked on the main thread, using some form of Async
17+
# avoids a 35% slowdown (at least when running the test suite).
18+
#
19+
# Using Futures for architectural consistency with all the other Async in SolidQueue.
20+
Concurrent::Promises.future(time) do |timeout|
21+
if timeout > 0 && queue.pop(timeout:)
22+
queue.clear # exiting the poll wait guarantees testing for SHUTDOWN before next poll
23+
end
24+
end.value
2525
end
2626

27-
# Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html)
28-
def self_pipe
29-
@self_pipe ||= create_self_pipe
30-
end
31-
32-
def create_self_pipe
33-
reader, writer = IO.pipe
34-
{ reader: reader, writer: writer }
27+
def queue
28+
@queue ||= Queue.new
3529
end
3630
end
3731
end

0 commit comments

Comments
 (0)