Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for logging blocking operations. #372

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/async/internal/blocking_operation_wait_inline.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.

module Async
module Internal
module BlockingOperationWaitInline
def blocking_operation_wait(work)
Fiber.blocking{work.call}
end
end
end
end
76 changes: 76 additions & 0 deletions lib/async/internal/blocking_operation_wait_log.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025, by Samuel Williams.

module Async
module Internal
class Sample
def initialize(name)
@name = name

@count = 0
@total = 0
end

def measure
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)

yield
ensure
finish = Process.clock_gettime(Process::CLOCK_MONOTONIC)

@count += 1
@total += finish - start
end

def average
@total / @count
end

def to_s
"#{@name}: #{@count} samples, average: #{format_time(average)}"
end

private

def format_time(time)
if time < 1e-6
"%.0fns" % (time * 1e9)
elsif time < 1e-3
"%.0fµs" % (time * 1e6)
elsif time < 1
"%.0fms" % (time * 1e3)
else
"%.3fs" % time
end
end
end

module BlockingOperationWaitLog
def run(...)
$stderr.puts "Blocking operation wait log enabled."

@blocking_operations = {}

super
ensure
@blocking_operations.each do |name, sample|
$stderr.puts sample
end

@blocking_operations = nil
end

def blocking_operation_wait(work)
from = caller(1, 1).first

sample = (@blocking_operations[from] ||= Sample.new(from))

sample.measure do
return super
end
end
end
end
end
184 changes: 184 additions & 0 deletions lib/async/internal/worker_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require "etc"

module Async
module Internal
# A simple work pool that offloads work to a background thread.
#
# @private
class WorkerPool
# Used to augment the scheduler to add support for blocking operations.
module BlockingOperationWait
# Wait for the given work to be executed.
#
# @public Since *Async v2.19* and *Ruby v3.4*.
# @asynchronous May be non-blocking.
#
# @parameter work [Proc] The work to execute on a background thread.
# @returns [Object] The result of the work.
def blocking_operation_wait(work)
@worker_pool.call(work)
end
end

# Execute the given work in a background thread.
class Promise
# Create a new promise.
#
# @parameter work [Proc] The work to be done.
def initialize(work)
@work = work
@state = :pending
@value = nil
@guard = ::Mutex.new
@condition = ::ConditionVariable.new
@thread = nil
end

# Execute the work and resolve the promise.
def call
work = nil

@guard.synchronize do
@thread = ::Thread.current

return unless work = @work
end

resolve(work.call)
rescue Exception => error
reject(error)
end

private def resolve(value)
@guard.synchronize do
@work = nil
@thread = nil
@value = value
@state = :resolved
@condition.broadcast
end
end

private def reject(error)
@guard.synchronize do
@work = nil
@thread = nil
@value = error
@state = :failed
@condition.broadcast
end
end

# Cancel the work and raise an exception in the background thread.
def cancel
return unless @work

@guard.synchronize do
@work = nil
@state = :cancelled
@thread&.raise(Interrupt)
end
end

# Wait for the work to be done.
#
# @returns [Object] The result of the work.
def wait
@guard.synchronize do
while @state == :pending
@condition.wait(@guard)
end

if @state == :failed
raise @value
else
return @value
end
end
end
end

# A background worker thread.
class Worker
# Create a new worker.
def initialize
@work = ::Thread::Queue.new
@thread = ::Thread.new(&method(:run))
end

# Execute work until the queue is closed.
def run
while work = @work.pop
work.call
end
end

# Close the worker thread.
def close
if thread = @thread
@thread = nil
thread.kill
end
end

# Call the work and notify the scheduler when it is done.
def call(work)
promise = Promise.new(work)

@work.push(promise)

begin
return promise.wait
ensure
promise.cancel
end
end
end

# Create a new work pool.
#
# @parameter size [Integer] The number of threads to use.
def initialize(size: Etc.nprocessors)
@ready = ::Thread::Queue.new

size.times do
@ready.push(Worker.new)
end
end

# Close the work pool. Kills all outstanding work.
def close
if ready = @ready
@ready = nil
ready.close

while worker = ready.pop
worker.close
end
end
end

# Offload work to a thread.
#
# @parameter work [Proc] The work to be done.
def call(work)
if ready = @ready
worker = ready.pop

begin
worker.call(work)
ensure
ready.push(worker)
end
else
raise RuntimeError, "No worker available!"
end
end
end
end
end
21 changes: 18 additions & 3 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

require_relative "clock"
require_relative "task"
require_relative "worker_pool"

require_relative "internal/worker_pool"
require_relative "internal/blocking_operation_wait_log"
require_relative "internal/blocking_operation_wait_inline"

require "io/event"

Expand All @@ -21,6 +24,10 @@ class Scheduler < Node
value == "true" ? true : nil
end

LOG_BLOCKING_OPERATIONS = ENV.fetch("ASYNC_SCHEDULER_LOG_BLOCKING_OPERATIONS", nil).then do |value|
value == "true"
end

# Raised when an operation is attempted on a closed scheduler.
class ClosedError < RuntimeError
# Create a new error.
Expand Down Expand Up @@ -55,13 +62,21 @@ def initialize(parent = nil, selector: nil, worker_pool: DEFAULT_WORKER_POOL)

@timers = ::IO::Event::Timers.new
if worker_pool == true
@worker_pool = WorkerPool.new
@worker_pool = Internal::WorkerPool.new
else
@worker_pool = worker_pool
end

if @worker_pool
self.singleton_class.prepend(WorkerPool::BlockingOperationWait)
self.singleton_class.prepend(Internal::WorkerPool::BlockingOperationWait)
end

if LOG_BLOCKING_OPERATIONS
unless @worker_pool
self.singleton_class.prepend(Internal::BlockingOperationWaitInline)
end

self.singleton_class.prepend(Internal::BlockingOperationWaitLog)
end
end

Expand Down
Loading
Loading