From 6d47766a8d7596f9bd360e5e48358715ee3c7455 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 19 Sep 2020 11:26:02 +1200 Subject: [PATCH] Initial hack for ractor (incomplete). --- lib/async/container/best.rb | 6 +- lib/async/container/ractor.rb | 197 ++++++++++++++++++++++++++++++++ lib/async/container/ractored.rb | 43 +++++++ 3 files changed, 245 insertions(+), 1 deletion(-) create mode 100644 lib/async/container/ractor.rb create mode 100644 lib/async/container/ractored.rb diff --git a/lib/async/container/best.rb b/lib/async/container/best.rb index 2505362..0a87010 100644 --- a/lib/async/container/best.rb +++ b/lib/async/container/best.rb @@ -32,13 +32,17 @@ def self.fork? ::Process.respond_to?(:fork) && ::Process.respond_to?(:setpgid) end + def self.ractor? + defined?(::Ractor) + end + # Determins the best container class based on the underlying Ruby implementation. # Some platforms, including JRuby, don't support fork. Applications which just want a reasonable default can use this method. # @returns [Class] def self.best_container_class if fork? return Forked - else + els return Threaded end end diff --git a/lib/async/container/ractor.rb b/lib/async/container/ractor.rb new file mode 100644 index 0000000..9a04beb --- /dev/null +++ b/lib/async/container/ractor.rb @@ -0,0 +1,197 @@ +# frozen_string_literal: true + +# Copyright, 2020, by Samuel G. D. Williams. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +require_relative 'channel' +require_relative 'error' +require_relative 'notify/pipe' + +require 'async/logger' + +module Async + module Container + # Represents a running child thread from the point of view of the parent container. + class Ractor < Channel + # Represents a running child thread from the point of view of the child thread. + class Instance < Notify::Pipe + # Wrap an instance around the {Thread} instance from within the threaded child. + # @parameter thread [Thread] The thread intance to wrap. + def self.for(thread) + instance = self.new(thread.out) + + return instance + end + + def initialize(io) + @name = nil + @thread = ::Thread.current + + super + end + + # Set the name of the thread. + # @parameter value [String] The name to set. + def name= value + @thread.name = value + end + + # Get the name of the thread. + # @returns [String] + def name + @thread.name + end + + # Execute a child process using {::Process.spawn}. In order to simulate {::Process.exec}, an {Exit} instance is raised to propagage exit status. + # This creates the illusion that this method does not return (normally). + def exec(*arguments, ready: true, **options) + if ready + self.ready!(status: "(spawn)") if ready + else + self.before_spawn(arguments, options) + end + + begin + # TODO prefer **options... but it doesn't support redirections on < 2.7 + pid = ::Process.spawn(*arguments, options) + ensure + _, status = ::Process.wait2(pid) + + raise Exit, status + end + end + end + + def self.fork(**options) + self.new(**options) do |thread| + ::Thread.new do + yield Instance.for(thread) + end + end + end + + # Initialize the thread. + # @parameter name [String] The name to use for the child thread. + def initialize(name: nil) + super() + + @status = nil + + @thread = yield(self) + @thread.report_on_exception = false + @thread.name = name + + @waiter = ::Thread.new do + begin + @thread.join + rescue Exit => exit + finished(exit.error) + rescue Interrupt + # Graceful shutdown. + finished + rescue Exception => error + finished(error) + else + finished + end + end + end + + # Set the name of the thread. + # @parameter value [String] The name to set. + def name= value + @thread.name = value + end + + # Get the name of the thread. + # @returns [String] + def name + @thread.name + end + + # A human readable representation of the thread. + # @returns [String] + def to_s + "\#<#{self.class} #{@thread.name}>" + end + + # Invoke {#terminate!} and then {#wait} for the child thread to exit. + def close + self.terminate! + self.wait + ensure + super + end + + # Raise {Interrupt} in the child thread. + def interrupt! + @thread.raise(Interrupt) + end + + # Raise {Terminate} in the child thread. + def terminate! + @thread.raise(Terminate) + end + + # Wait for the thread to exit and return he exit status. + # @returns [Status] + def wait + if @waiter + @waiter.join + @waiter = nil + end + + return @status + end + + # A pseudo exit-status wrapper. + class Status + # Initialise the status. + # @parameter error [::Process::Status] The exit status of the child thread. + def initialize(error = nil) + @error = error + end + + # Whether the status represents a successful outcome. + # @returns [Boolean] + def success? + @error.nil? + end + + # A human readable representation of the status. + def to_s + "\#<#{self.class} #{success? ? "success" : "failure"}>" + end + end + + protected + + # Invoked by the @waiter thread to indicate the outcome of the child thread. + def finished(error = nil) + if error + Async.logger.error(self) {error} + end + + @status = Status.new(error) + self.close_write + end + end + end +end diff --git a/lib/async/container/ractored.rb b/lib/async/container/ractored.rb new file mode 100644 index 0000000..69d2ce4 --- /dev/null +++ b/lib/async/container/ractored.rb @@ -0,0 +1,43 @@ +# frozen_string_literal: true + +# Copyright, 2017, by Samuel G. D. Williams. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +require_relative 'generic' +require_relative 'thread' + +module Async + module Container + # A multi-thread container which uses {Ractor}. + class Ractored < Generic + # Indicates that this is not a multi-process container. + def self.multiprocess? + false + end + + # Start a named ractor and execute the provided block in it. + # @parameter name [String] The name (title) of the ractor. + # @parameter block [Proc] The block to execute in the ractor. + def start(name, &block) + Ractor.new(name: name, &block) + end + end + end +end