diff --git a/lib/async/internal/blocking_operation_wait_inline.rb b/lib/async/internal/blocking_operation_wait_inline.rb new file mode 100644 index 00000000..7ba34797 --- /dev/null +++ b/lib/async/internal/blocking_operation_wait_inline.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +module Async + module Internal + module BlockingOperationWaitInline + def blocking_operation_wait(work) + Fiber.blocking{work.call} + end + end + end +end diff --git a/lib/async/internal/blocking_operation_wait_log.rb b/lib/async/internal/blocking_operation_wait_log.rb new file mode 100644 index 00000000..7eb6c78b --- /dev/null +++ b/lib/async/internal/blocking_operation_wait_log.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +module Async + module Internal + class Sample + def initialize(name) + @name = name + + @count = 0 + @total = 0 + end + + def measure + start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + yield + ensure + finish = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + @count += 1 + @total += finish - start + end + + def average + @total / @count + end + + def to_s + "#{@name}: #{@count} samples, average: #{format_time(average)}" + end + + private + + def format_time(time) + if time < 1e-6 + "%.0fns" % (time * 1e9) + elsif time < 1e-3 + "%.0fµs" % (time * 1e6) + elsif time < 1 + "%.0fms" % (time * 1e3) + else + "%.3fs" % time + end + end + end + + module BlockingOperationWaitLog + def run(...) + $stderr.puts "Blocking operation wait log enabled." + + @blocking_operations = {} + + super + ensure + @blocking_operations.each do |name, sample| + $stderr.puts sample + end + + @blocking_operations = nil + end + + def blocking_operation_wait(work) + from = caller(1, 1).first + + sample = (@blocking_operations[from] ||= Sample.new(from)) + + sample.measure do + return super + end + end + end + end +end diff --git a/lib/async/internal/worker_pool.rb b/lib/async/internal/worker_pool.rb new file mode 100644 index 00000000..b80e7ec8 --- /dev/null +++ b/lib/async/internal/worker_pool.rb @@ -0,0 +1,184 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require "etc" + +module Async + module Internal + # A simple work pool that offloads work to a background thread. + # + # @private + class WorkerPool + # Used to augment the scheduler to add support for blocking operations. + module BlockingOperationWait + # Wait for the given work to be executed. + # + # @public Since *Async v2.19* and *Ruby v3.4*. + # @asynchronous May be non-blocking. + # + # @parameter work [Proc] The work to execute on a background thread. + # @returns [Object] The result of the work. + def blocking_operation_wait(work) + @worker_pool.call(work) + end + end + + # Execute the given work in a background thread. + class Promise + # Create a new promise. + # + # @parameter work [Proc] The work to be done. + def initialize(work) + @work = work + @state = :pending + @value = nil + @guard = ::Mutex.new + @condition = ::ConditionVariable.new + @thread = nil + end + + # Execute the work and resolve the promise. + def call + work = nil + + @guard.synchronize do + @thread = ::Thread.current + + return unless work = @work + end + + resolve(work.call) + rescue Exception => error + reject(error) + end + + private def resolve(value) + @guard.synchronize do + @work = nil + @thread = nil + @value = value + @state = :resolved + @condition.broadcast + end + end + + private def reject(error) + @guard.synchronize do + @work = nil + @thread = nil + @value = error + @state = :failed + @condition.broadcast + end + end + + # Cancel the work and raise an exception in the background thread. + def cancel + return unless @work + + @guard.synchronize do + @work = nil + @state = :cancelled + @thread&.raise(Interrupt) + end + end + + # Wait for the work to be done. + # + # @returns [Object] The result of the work. + def wait + @guard.synchronize do + while @state == :pending + @condition.wait(@guard) + end + + if @state == :failed + raise @value + else + return @value + end + end + end + end + + # A background worker thread. + class Worker + # Create a new worker. + def initialize + @work = ::Thread::Queue.new + @thread = ::Thread.new(&method(:run)) + end + + # Execute work until the queue is closed. + def run + while work = @work.pop + work.call + end + end + + # Close the worker thread. + def close + if thread = @thread + @thread = nil + thread.kill + end + end + + # Call the work and notify the scheduler when it is done. + def call(work) + promise = Promise.new(work) + + @work.push(promise) + + begin + return promise.wait + ensure + promise.cancel + end + end + end + + # Create a new work pool. + # + # @parameter size [Integer] The number of threads to use. + def initialize(size: Etc.nprocessors) + @ready = ::Thread::Queue.new + + size.times do + @ready.push(Worker.new) + end + end + + # Close the work pool. Kills all outstanding work. + def close + if ready = @ready + @ready = nil + ready.close + + while worker = ready.pop + worker.close + end + end + end + + # Offload work to a thread. + # + # @parameter work [Proc] The work to be done. + def call(work) + if ready = @ready + worker = ready.pop + + begin + worker.call(work) + ensure + ready.push(worker) + end + else + raise RuntimeError, "No worker available!" + end + end + end + end +end diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 56c9620b..c0b0b20b 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -7,7 +7,10 @@ require_relative "clock" require_relative "task" -require_relative "worker_pool" + +require_relative "internal/worker_pool" +require_relative "internal/blocking_operation_wait_log" +require_relative "internal/blocking_operation_wait_inline" require "io/event" @@ -21,6 +24,10 @@ class Scheduler < Node value == "true" ? true : nil end + LOG_BLOCKING_OPERATIONS = ENV.fetch("ASYNC_SCHEDULER_LOG_BLOCKING_OPERATIONS", nil).then do |value| + value == "true" + end + # Raised when an operation is attempted on a closed scheduler. class ClosedError < RuntimeError # Create a new error. @@ -55,13 +62,21 @@ def initialize(parent = nil, selector: nil, worker_pool: DEFAULT_WORKER_POOL) @timers = ::IO::Event::Timers.new if worker_pool == true - @worker_pool = WorkerPool.new + @worker_pool = Internal::WorkerPool.new else @worker_pool = worker_pool end if @worker_pool - self.singleton_class.prepend(WorkerPool::BlockingOperationWait) + self.singleton_class.prepend(Internal::WorkerPool::BlockingOperationWait) + end + + if LOG_BLOCKING_OPERATIONS + unless @worker_pool + self.singleton_class.prepend(Internal::BlockingOperationWaitInline) + end + + self.singleton_class.prepend(Internal::BlockingOperationWaitLog) end end diff --git a/lib/async/worker_pool.rb b/lib/async/worker_pool.rb deleted file mode 100644 index 36df724c..00000000 --- a/lib/async/worker_pool.rb +++ /dev/null @@ -1,182 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2024, by Samuel Williams. - -require "etc" - -module Async - # A simple work pool that offloads work to a background thread. - # - # @private - class WorkerPool - # Used to augment the scheduler to add support for blocking operations. - module BlockingOperationWait - # Wait for the given work to be executed. - # - # @public Since *Async v2.19* and *Ruby v3.4*. - # @asynchronous May be non-blocking. - # - # @parameter work [Proc] The work to execute on a background thread. - # @returns [Object] The result of the work. - def blocking_operation_wait(work) - @worker_pool.call(work) - end - end - - # Execute the given work in a background thread. - class Promise - # Create a new promise. - # - # @parameter work [Proc] The work to be done. - def initialize(work) - @work = work - @state = :pending - @value = nil - @guard = ::Mutex.new - @condition = ::ConditionVariable.new - @thread = nil - end - - # Execute the work and resolve the promise. - def call - work = nil - - @guard.synchronize do - @thread = ::Thread.current - - return unless work = @work - end - - resolve(work.call) - rescue Exception => error - reject(error) - end - - private def resolve(value) - @guard.synchronize do - @work = nil - @thread = nil - @value = value - @state = :resolved - @condition.broadcast - end - end - - private def reject(error) - @guard.synchronize do - @work = nil - @thread = nil - @value = error - @state = :failed - @condition.broadcast - end - end - - # Cancel the work and raise an exception in the background thread. - def cancel - return unless @work - - @guard.synchronize do - @work = nil - @state = :cancelled - @thread&.raise(Interrupt) - end - end - - # Wait for the work to be done. - # - # @returns [Object] The result of the work. - def wait - @guard.synchronize do - while @state == :pending - @condition.wait(@guard) - end - - if @state == :failed - raise @value - else - return @value - end - end - end - end - - # A background worker thread. - class Worker - # Create a new worker. - def initialize - @work = ::Thread::Queue.new - @thread = ::Thread.new(&method(:run)) - end - - # Execute work until the queue is closed. - def run - while work = @work.pop - work.call - end - end - - # Close the worker thread. - def close - if thread = @thread - @thread = nil - thread.kill - end - end - - # Call the work and notify the scheduler when it is done. - def call(work) - promise = Promise.new(work) - - @work.push(promise) - - begin - return promise.wait - ensure - promise.cancel - end - end - end - - # Create a new work pool. - # - # @parameter size [Integer] The number of threads to use. - def initialize(size: Etc.nprocessors) - @ready = ::Thread::Queue.new - - size.times do - @ready.push(Worker.new) - end - end - - # Close the work pool. Kills all outstanding work. - def close - if ready = @ready - @ready = nil - ready.close - - while worker = ready.pop - worker.close - end - end - end - - # Offload work to a thread. - # - # @parameter work [Proc] The work to be done. - def call(work) - if ready = @ready - worker = ready.pop - - begin - worker.call(work) - ensure - ready.push(worker) - end - else - raise RuntimeError, "No worker available!" - end - end - end -end diff --git a/test/async/worker_pool.rb b/test/async/internal/worker_pool.rb similarity index 93% rename from test/async/worker_pool.rb rename to test/async/internal/worker_pool.rb index 2cad9dcc..c5f1210c 100644 --- a/test/async/worker_pool.rb +++ b/test/async/internal/worker_pool.rb @@ -4,10 +4,10 @@ # Copyright, 2022-2024, by Samuel Williams. # Copyright, 2024, by Patrik Wenger. -require "async/worker_pool" +require "async/internal/worker_pool" require "sus/fixtures/async" -describe Async::WorkerPool do +describe Async::Internal::WorkerPool do let(:worker_pool) {subject.new(size: 1)} it "offloads work to a thread" do