diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2a4625d3c..edbc1e7b8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,16 +23,16 @@ jobs: - { os: ubuntu-20.04, toolchain: wasm-5.9.1-RELEASE, wasi-backend: MicroWASI } - { os: ubuntu-20.04, toolchain: wasm-5.10.0-RELEASE, wasi-backend: MicroWASI } - os: ubuntu-22.04 - toolchain: DEVELOPMENT-SNAPSHOT-2024-05-01-a + toolchain: DEVELOPMENT-SNAPSHOT-2024-06-13-a swift-sdk: - id: DEVELOPMENT-SNAPSHOT-2024-05-25-a-wasm32-unknown-wasi - download-url: "https://github.com/swiftwasm/swift/releases/download/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-05-25-a/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-05-25-a-wasm32-unknown-wasi.artifactbundle.zip" + id: DEVELOPMENT-SNAPSHOT-2024-06-14-a-wasm32-unknown-wasi + download-url: "https://github.com/swiftwasm/swift/releases/download/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-06-14-a/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-06-14-a-wasm32-unknown-wasi.artifactbundle.zip" wasi-backend: Node - os: ubuntu-22.04 - toolchain: DEVELOPMENT-SNAPSHOT-2024-05-01-a + toolchain: DEVELOPMENT-SNAPSHOT-2024-06-13-a swift-sdk: - id: DEVELOPMENT-SNAPSHOT-2024-05-25-a-wasm32-unknown-wasip1-threads - download-url: "https://github.com/swiftwasm/swift/releases/download/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-05-25-a/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-05-25-a-wasm32-unknown-wasip1-threads.artifactbundle.zip" + id: DEVELOPMENT-SNAPSHOT-2024-06-14-a-wasm32-unknown-wasip1-threads + download-url: "https://github.com/swiftwasm/swift/releases/download/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-06-14-a/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-06-14-a-wasm32-unknown-wasip1-threads.artifactbundle.zip" wasi-backend: Node runs-on: ${{ matrix.entry.os }} diff --git a/IntegrationTests/lib.js b/IntegrationTests/lib.js index fe25cf679..ed66c7e86 100644 --- a/IntegrationTests/lib.js +++ b/IntegrationTests/lib.js @@ -3,26 +3,30 @@ import { WASI as NodeWASI } from "wasi" import { WASI as MicroWASI, useAll } from "uwasi" import * as fs from "fs/promises" import path from "path"; +import { Worker, parentPort } from "node:worker_threads"; const WASI = { - MicroWASI: ({ programName }) => { + MicroWASI: ({ args }) => { const wasi = new MicroWASI({ - args: [path.basename(programName)], + args: args, env: {}, features: [useAll()], }) return { wasiImport: wasi.wasiImport, + setInstance(instance) { + wasi.instance = instance; + }, start(instance, swift) { wasi.initialize(instance); swift.main(); } } }, - Node: ({ programName }) => { + Node: ({ args }) => { const wasi = new NodeWASI({ - args: [path.basename(programName)], + args: args, env: {}, preopens: { "/": "./", @@ -44,12 +48,9 @@ const WASI = { const selectWASIBackend = () => { const value = process.env["JAVASCRIPTKIT_WASI_BACKEND"] if (value) { - const backend = WASI[value]; - if (backend) { - return backend; - } + return value; } - return WASI.Node; + return "Node" }; function isUsingSharedMemory(module) { @@ -62,33 +63,125 @@ function isUsingSharedMemory(module) { return false; } -export const startWasiTask = async (wasmPath, wasiConstructor = selectWASIBackend()) => { - const swift = new SwiftRuntime(); - // Fetch our Wasm File - const wasmBinary = await fs.readFile(wasmPath); - const wasi = wasiConstructor({ programName: wasmPath }); - - const module = await WebAssembly.compile(wasmBinary); - - const importObject = { +function constructBaseImportObject(wasi, swift) { + return { wasi_snapshot_preview1: wasi.wasiImport, - javascript_kit: swift.importObjects(), + javascript_kit: swift.wasmImports, benchmark_helper: { noop: () => {}, noop_with_int: (_) => {}, + }, + } +} + +export async function startWasiChildThread(event) { + const { module, programName, memory, tid, startArg } = event; + const swift = new SwiftRuntime({ + sharedMemory: true, + threadChannel: { + postMessageToMainThread: parentPort.postMessage.bind(parentPort), + listenMessageFromMainThread: (listener) => { + parentPort.on("message", listener) + } + } + }); + // Use uwasi for child threads because Node.js WASI cannot be used without calling + // `WASI.start` or `WASI.initialize`, which is already called in the main thread and + // will cause an error if called again. + const wasi = WASI.MicroWASI({ programName }); + + const importObject = constructBaseImportObject(wasi, swift); + + importObject["wasi"] = { + "thread-spawn": () => { + throw new Error("Cannot spawn a new thread from a worker thread") } }; + importObject["env"] = { memory }; + importObject["JavaScriptEventLoopTestSupportTests"] = { + "isMainThread": () => false, + } + + const instance = await WebAssembly.instantiate(module, importObject); + swift.setInstance(instance); + wasi.setInstance(instance); + swift.startThread(tid, startArg); +} + +class ThreadRegistry { + workers = new Map(); + nextTid = 1; + + spawnThread(module, programName, memory, startArg) { + const tid = this.nextTid++; + const selfFilePath = new URL(import.meta.url).pathname; + const worker = new Worker(` + const { parentPort } = require('node:worker_threads'); + + Error.stackTraceLimit = 100; + parentPort.once("message", async (event) => { + const { selfFilePath } = event; + const { startWasiChildThread } = await import(selfFilePath); + await startWasiChildThread(event); + }) + `, { type: "module", eval: true }) + + worker.on("error", (error) => { + console.error(`Worker thread ${tid} error:`, error); + }); + this.workers.set(tid, worker); + worker.postMessage({ selfFilePath, module, programName, memory, tid, startArg }); + return tid; + } + + worker(tid) { + return this.workers.get(tid); + } + + wakeUpWorkerThread(tid, message) { + const worker = this.workers.get(tid); + worker.postMessage(message); + } +} + +export const startWasiTask = async (wasmPath, wasiConstructorKey = selectWASIBackend()) => { + // Fetch our Wasm File + const wasmBinary = await fs.readFile(wasmPath); + const programName = wasmPath; + const args = [path.basename(programName)]; + args.push(...process.argv.slice(3)); + const wasi = WASI[wasiConstructorKey]({ args }); + + const module = await WebAssembly.compile(wasmBinary); + + const sharedMemory = isUsingSharedMemory(module); + const threadRegistry = new ThreadRegistry(); + const swift = new SwiftRuntime({ + sharedMemory, + threadChannel: { + postMessageToWorkerThread: threadRegistry.wakeUpWorkerThread.bind(threadRegistry), + listenMessageFromWorkerThread: (tid, listener) => { + const worker = threadRegistry.worker(tid); + worker.on("message", listener); + } + } + }); + + const importObject = constructBaseImportObject(wasi, swift); + + importObject["JavaScriptEventLoopTestSupportTests"] = { + "isMainThread": () => true, + } - if (isUsingSharedMemory(module)) { - importObject["env"] = { - // We don't have JS API to get memory descriptor of imported memory - // at this moment, so we assume 256 pages (16MB) memory is enough - // large for initial memory size. - memory: new WebAssembly.Memory({ initial: 256, maximum: 16384, shared: true }), - }; + if (sharedMemory) { + // We don't have JS API to get memory descriptor of imported memory + // at this moment, so we assume 256 pages (16MB) memory is enough + // large for initial memory size. + const memory = new WebAssembly.Memory({ initial: 256, maximum: 16384, shared: true }) + importObject["env"] = { memory }; importObject["wasi"] = { - "thread-spawn": () => { - throw new Error("thread-spawn not implemented"); + "thread-spawn": (startArg) => { + return threadRegistry.spawnThread(module, programName, memory, startArg); } } } diff --git a/Package.swift b/Package.swift index d9f33839e..aa529c772 100644 --- a/Package.swift +++ b/Package.swift @@ -26,6 +26,14 @@ let package = Package( name: "JavaScriptEventLoop", dependencies: ["JavaScriptKit", "_CJavaScriptEventLoop"] ), + .testTarget( + name: "JavaScriptEventLoopTests", + dependencies: [ + "JavaScriptEventLoop", + "JavaScriptKit", + "JavaScriptEventLoopTestSupport", + ] + ), .target(name: "_CJavaScriptEventLoop"), .target( name: "JavaScriptEventLoopTestSupport", diff --git a/Runtime/src/index.ts b/Runtime/src/index.ts index 605ce2d06..4cf0ee65a 100644 --- a/Runtime/src/index.ts +++ b/Runtime/src/index.ts @@ -10,8 +10,103 @@ import { import * as JSValue from "./js-value.js"; import { Memory } from "./memory.js"; -type SwiftRuntimeOptions = { +type MainToWorkerMessage = { + type: "wake"; +}; + +type WorkerToMainMessage = { + type: "job"; + data: number; +}; + +/** + * A thread channel is a set of functions that are used to communicate between + * the main thread and the worker thread. The main thread and the worker thread + * can send messages to each other using these functions. + * + * @example + * ```javascript + * // worker.js + * const runtime = new SwiftRuntime({ + * threadChannel: { + * postMessageToMainThread: postMessage, + * listenMessageFromMainThread: (listener) => { + * self.onmessage = (event) => { + * listener(event.data); + * }; + * } + * } + * }); + * + * // main.js + * const worker = new Worker("worker.js"); + * const runtime = new SwiftRuntime({ + * threadChannel: { + * postMessageToWorkerThread: (tid, data) => { + * worker.postMessage(data); + * }, + * listenMessageFromWorkerThread: (tid, listener) => { + * worker.onmessage = (event) => { + listener(event.data); + * }; + * } + * } + * }); + * ``` + */ +export type SwiftRuntimeThreadChannel = + | { + /** + * This function is used to send messages from the worker thread to the main thread. + * The message submitted by this function is expected to be listened by `listenMessageFromWorkerThread`. + * @param message The message to be sent to the main thread. + */ + postMessageToMainThread: (message: WorkerToMainMessage) => void; + /** + * This function is expected to be set in the worker thread and should listen + * to messages from the main thread sent by `postMessageToWorkerThread`. + * @param listener The listener function to be called when a message is received from the main thread. + */ + listenMessageFromMainThread: (listener: (message: MainToWorkerMessage) => void) => void; + } + | { + /** + * This function is expected to be set in the main thread. + * The message submitted by this function is expected to be listened by `listenMessageFromMainThread`. + * @param tid The thread ID of the worker thread. + * @param message The message to be sent to the worker thread. + */ + postMessageToWorkerThread: (tid: number, message: MainToWorkerMessage) => void; + /** + * This function is expected to be set in the main thread and shuold listen + * to messsages sent by `postMessageToMainThread` from the worker thread. + * @param tid The thread ID of the worker thread. + * @param listener The listener function to be called when a message is received from the worker thread. + */ + listenMessageFromWorkerThread: ( + tid: number, + listener: (message: WorkerToMainMessage) => void + ) => void; + + /** + * This function is expected to be set in the main thread and called + * when the worker thread is terminated. + * @param tid The thread ID of the worker thread. + */ + terminateWorkerThread?: (tid: number) => void; + }; + +export type SwiftRuntimeOptions = { + /** + * If `true`, the memory space of the WebAssembly instance can be shared + * between the main thread and the worker thread. + */ sharedMemory?: boolean; + /** + * The thread channel is a set of functions that are used to communicate + * between the main thread and the worker thread. + */ + threadChannel?: SwiftRuntimeThreadChannel; }; export class SwiftRuntime { @@ -23,11 +118,14 @@ export class SwiftRuntime { private textDecoder = new TextDecoder("utf-8"); private textEncoder = new TextEncoder(); // Only support utf-8 + /** The thread ID of the current thread. */ + private tid: number | null; constructor(options?: SwiftRuntimeOptions) { this._instance = null; this._memory = null; this._closureDeallocator = null; + this.tid = null; this.options = options || {}; } @@ -72,6 +170,32 @@ export class SwiftRuntime { } } + /** + * Start a new thread with the given `tid` and `startArg`, which + * is forwarded to the `wasi_thread_start` function. + * This function is expected to be called from the spawned Web Worker thread. + */ + startThread(tid: number, startArg: number) { + this.tid = tid; + const instance = this.instance; + try { + if (typeof instance.exports.wasi_thread_start === "function") { + instance.exports.wasi_thread_start(tid, startArg); + } else { + throw new Error( + `The WebAssembly module is not built for wasm32-unknown-wasip1-threads target.` + ); + } + } catch (error) { + if (error instanceof UnsafeEventLoopYield) { + // Ignore the error + return; + } + // Rethrow other errors + throw error; + } + } + private get instance() { if (!this._instance) throw new Error("WebAssembly instance is not set yet"); @@ -462,8 +586,82 @@ export class SwiftRuntime { swjs_unsafe_event_loop_yield: () => { throw new UnsafeEventLoopYield(); }, + swjs_send_job_to_main_thread: (unowned_job) => { + this.postMessageToMainThread({ type: "job", data: unowned_job }); + }, + swjs_listen_message_from_main_thread: () => { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "listenMessageFromMainThread" in threadChannel)) { + throw new Error( + "listenMessageFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread." + ); + } + threadChannel.listenMessageFromMainThread((message) => { + switch (message.type) { + case "wake": + this.exports.swjs_wake_worker_thread(); + break; + default: + const unknownMessage: never = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }); + }, + swjs_wake_up_worker_thread: (tid) => { + this.postMessageToWorkerThread(tid, { type: "wake" }); + }, + swjs_listen_message_from_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "listenMessageFromWorkerThread" in threadChannel)) { + throw new Error( + "listenMessageFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads." + ); + } + threadChannel.listenMessageFromWorkerThread( + tid, (message) => { + switch (message.type) { + case "job": + this.exports.swjs_enqueue_main_job_from_worker(message.data); + break; + default: + const unknownMessage: never = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }, + ); + }, + swjs_terminate_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if (threadChannel && "terminateWorkerThread" in threadChannel) { + threadChannel.terminateWorkerThread?.(tid); + } // Otherwise, just ignore the termination request + }, + swjs_get_worker_thread_id: () => { + // Main thread's tid is always -1 + return this.tid || -1; + }, }; } + + private postMessageToMainThread(message: WorkerToMainMessage) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToMainThread" in threadChannel)) { + throw new Error( + "postMessageToMainThread is not set in options given to SwiftRuntime. Please set it to send messages to the main thread." + ); + } + threadChannel.postMessageToMainThread(message); + } + + private postMessageToWorkerThread(tid: number, message: MainToWorkerMessage) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToWorkerThread" in threadChannel)) { + throw new Error( + "postMessageToWorkerThread is not set in options given to SwiftRuntime. Please set it to send messages to worker threads." + ); + } + threadChannel.postMessageToWorkerThread(tid, message); + } } /// This error is thrown when yielding event loop control from `swift_task_asyncMainDrainQueue` diff --git a/Runtime/src/types.ts b/Runtime/src/types.ts index 55f945b64..dd638acc5 100644 --- a/Runtime/src/types.ts +++ b/Runtime/src/types.ts @@ -19,6 +19,9 @@ export interface ExportedFunctions { ): bool; swjs_free_host_function(host_func_id: number): void; + + swjs_enqueue_main_job_from_worker(unowned_job: number): void; + swjs_wake_worker_thread(): void; } export interface ImportedFunctions { @@ -103,6 +106,12 @@ export interface ImportedFunctions { swjs_bigint_to_i64(ref: ref, signed: bool): bigint; swjs_i64_to_bigint_slow(lower: number, upper: number, signed: bool): ref; swjs_unsafe_event_loop_yield: () => void; + swjs_send_job_to_main_thread: (unowned_job: number) => void; + swjs_listen_message_from_main_thread: () => void; + swjs_wake_up_worker_thread: (tid: number) => void; + swjs_listen_message_from_worker_thread: (tid: number) => void; + swjs_terminate_worker_thread: (tid: number) => void; + swjs_get_worker_thread_id: () => number; } export const enum LibraryFeatures { diff --git a/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift b/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift index 7a0364a5c..e1e023e7f 100644 --- a/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift +++ b/Sources/JavaScriptEventLoop/JavaScriptEventLoop.swift @@ -56,7 +56,7 @@ public final class JavaScriptEventLoop: SerialExecutor, @unchecked Sendable { self.setTimeout = setTimeout } - /// A singleton instance of the Executor + /// A per-thread singleton instance of the Executor public static var shared: JavaScriptEventLoop { return _shared } diff --git a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift new file mode 100644 index 000000000..d1f7f64e2 --- /dev/null +++ b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift @@ -0,0 +1,471 @@ +#if compiler(>=6.0) && _runtime(_multithreaded) // @_expose and @_extern are only available in Swift 6.0+ + +import JavaScriptKit +import _CJavaScriptKit +import _CJavaScriptEventLoop + +import Synchronization +#if canImport(wasi_pthread) + import wasi_pthread + import WASILibc +#endif + +// MARK: - Web Worker Task Executor + +/// A task executor that runs tasks on Web Worker threads. +/// +/// ## Prerequisites +/// +/// This task executor is designed to work with [wasi-threads](https://github.com/WebAssembly/wasi-threads) +/// but it requires the following single extension: +/// The wasi-threads implementation should listen to the `message` event +/// from spawned Web Workers, and forward the message to the main thread +/// by calling `_swjs_enqueue_main_job_from_worker`. +/// +/// ## Usage +/// +/// ```swift +/// let executor = WebWorkerTaskExecutor(numberOfThreads: 4) +/// defer { executor.terminate() } +/// +/// await withTaskExecutorPreference(executor) { +/// // This block runs on the Web Worker thread. +/// await withTaskGroup(of: Int.self) { group in +/// for i in 0..<10 { +/// // Structured child works are executed on the Web Worker thread. +/// group.addTask { fibonacci(of: i) } +/// } +/// } +/// } +/// ```` +/// +/// ## Known limitations +/// +/// Currently, the Cooperative Global Executor of Swift runtime has a bug around +/// main executor detection. The issue leads to ignoring the `@MainActor` +/// attribute, which is supposed to run tasks on the main thread, when this web +/// worker executor is preferred. +/// +/// ```swift +/// func run(executor: WebWorkerTaskExecutor) async { +/// await withTaskExecutorPreference(executor) { +/// // This block runs on the Web Worker thread. +/// await MainActor.run { +/// // This block should run on the main thread, but it runs on +/// // the Web Worker thread. +/// } +/// } +/// // Back to the main thread. +/// } +/// ```` +/// +public final class WebWorkerTaskExecutor: TaskExecutor { + + /// A job worker dedicated to a single Web Worker thread. + /// + /// ## Lifetime + /// The worker instance in Swift world lives as long as the + /// `WebWorkerTaskExecutor` instance that spawned it lives. Thus, the worker + /// instance may outlive the underlying Web Worker thread. + fileprivate final class Worker: Sendable { + + /// The state of the worker. + /// + /// State transition: + /// + /// +---------+ +------------+ + /// +----->| Idle |--[terminate]-->| Terminated | + /// | +---+-----+ +------------+ + /// | | + /// | [enqueue] + /// | | + /// [no more job] | + /// | v + /// | +---------+ + /// +------| Running | + /// +---------+ + /// + enum State: UInt32, AtomicRepresentable { + /// The worker is idle and waiting for a new job. + case idle = 0 + /// The worker is processing a job. + case running = 1 + /// The worker is terminated. + case terminated = 2 + } + let state: Atomic = Atomic(.idle) + /// TODO: Rewrite it to use real queue :-) + let jobQueue: Mutex<[UnownedJob]> = Mutex([]) + /// The TaskExecutor that spawned this worker. + /// This variable must be set only once when the worker is started. + nonisolated(unsafe) weak var parentTaskExecutor: WebWorkerTaskExecutor.Executor? + /// The thread ID of this worker. + let tid: Atomic = Atomic(0) + + /// A trace statistics + struct TraceStats: CustomStringConvertible { + var enqueuedJobs: Int = 0 + var dequeuedJobs: Int = 0 + var processedJobs: Int = 0 + + var description: String { + "TraceStats(E: \(enqueuedJobs), D: \(dequeuedJobs), P: \(processedJobs))" + } + } + #if JAVASCRIPTKIT_STATS + private let traceStats = Mutex(TraceStats()) + private func statsIncrement(_ keyPath: WritableKeyPath) { + traceStats.withLock { stats in + stats[keyPath: keyPath] += 1 + } + } + #else + private func statsIncrement(_ keyPath: WritableKeyPath) {} + #endif + + /// The worker bound to the current thread. + /// Returns `nil` if the current thread is not a worker thread. + static var currentThread: Worker? { + guard let ptr = swjs_thread_local_task_executor_worker else { + return nil + } + return Unmanaged.fromOpaque(ptr).takeUnretainedValue() + } + + init() {} + + /// Enqueue a job to the worker. + func enqueue(_ job: UnownedJob) { + statsIncrement(\.enqueuedJobs) + jobQueue.withLock { queue in + queue.append(job) + + // Wake up the worker to process a job. + switch state.exchange(.running, ordering: .sequentiallyConsistent) { + case .idle: + if Self.currentThread === self { + // Enqueueing a new job to the current worker thread, but it's idle now. + // This is usually the case when a continuation is resumed by JS events + // like `setTimeout` or `addEventListener`. + // We can run the job and subsequently spawned jobs immediately. + // JSPromise.resolve(JSValue.undefined).then { _ in + _ = JSObject.global.queueMicrotask!(JSOneshotClosure { _ in + self.run() + return JSValue.undefined + }) + } else { + let tid = self.tid.load(ordering: .sequentiallyConsistent) + swjs_wake_up_worker_thread(tid) + } + case .running: + // The worker is already running, no need to wake up. + break + case .terminated: + // Will not wake up the worker because it's already terminated. + break + } + } + } + + func scheduleNextRun() { + _ = JSObject.global.queueMicrotask!(JSOneshotClosure { _ in + self.run() + return JSValue.undefined + }) + } + + /// Run the worker + /// + /// NOTE: This function must be called from the worker thread. + /// It will return when the worker is terminated. + func start(executor: WebWorkerTaskExecutor.Executor) { + // Get the thread ID of the current worker thread from the JS side. + // NOTE: Unfortunately even though `pthread_self` internally holds the thread ID, + // there is no public API to get it because it's a part of implementation details + // of wasi-libc. So we need to get it from the JS side. + let tid = swjs_get_worker_thread_id() + // Set the thread-local variable to the current worker. + // `self` outlives the worker thread because `Executor` retains the worker. + // Thus it's safe to store the reference without extra retain. + swjs_thread_local_task_executor_worker = Unmanaged.passUnretained(self).toOpaque() + // Start listening events from the main thread. + // This must be called after setting the swjs_thread_local_task_executor_worker + // because the event listener enqueues jobs to the TLS worker. + swjs_listen_message_from_main_thread() + // Set the parent executor. + parentTaskExecutor = executor + // Store the thread ID to the worker. This notifies the main thread that the worker is started. + self.tid.store(tid, ordering: .sequentiallyConsistent) + } + + /// Process jobs in the queue. + /// + /// Return when the worker has no more jobs to run or terminated. + /// This method must be called from the worker thread after the worker + /// is started by `start(executor:)`. + func run() { + trace("Worker.run") + guard let executor = parentTaskExecutor else { + preconditionFailure("The worker must be started with a parent executor.") + } + assert(state.load(ordering: .sequentiallyConsistent) == .running, "Invalid state: not running") + while true { + // Pop a job from the queue. + let job = jobQueue.withLock { queue -> UnownedJob? in + if let job = queue.first { + queue.removeFirst() + return job + } + // No more jobs to run now. Wait for a new job to be enqueued. + let (exchanged, original) = state.compareExchange(expected: .running, desired: .idle, ordering: .sequentiallyConsistent) + + switch (exchanged, original) { + case (true, _): + trace("Worker.run exited \(original) -> idle") + return nil // Regular case + case (false, .idle): + preconditionFailure("unreachable: Worker/run running in multiple threads!?") + case (false, .running): + preconditionFailure("unreachable: running -> idle should return exchanged=true") + case (false, .terminated): + return nil // The worker is terminated, exit the loop. + } + } + guard let job else { return } + statsIncrement(\.dequeuedJobs) + job.runSynchronously( + on: executor.asUnownedTaskExecutor() + ) + statsIncrement(\.processedJobs) + // The job is done. Continue to the next job. + } + } + + /// Terminate the worker. + func terminate() { + trace("Worker.terminate") + state.store(.terminated, ordering: .sequentiallyConsistent) + let tid = self.tid.load(ordering: .sequentiallyConsistent) + guard tid != 0 else { + // The worker is not started yet. + return + } + swjs_terminate_worker_thread(tid) + } + } + + fileprivate final class Executor: TaskExecutor { + let numberOfThreads: Int + let workers: [Worker] + let roundRobinIndex: Mutex = Mutex(0) + + init(numberOfThreads: Int) { + self.numberOfThreads = numberOfThreads + var workers = [Worker]() + for _ in 0...fromOpaque(ptr!).takeRetainedValue() + context.worker.start(executor: context.executor) + // The worker is started. Throw JS exception to unwind the call stack without + // reaching the `pthread_exit`, which is called immediately after this block. + swjs_unsafe_event_loop_yield() + return nil + }, ptr) + precondition(ret == 0, "Failed to create a thread") + } + // Wait until all worker threads are started and wire up messaging channels + // between the main thread and workers to notify job enqueuing events each other. + let clock = ContinuousClock() + let workerInitStarted = clock.now + for worker in workers { + var tid: pid_t + repeat { + if workerInitStarted.duration(to: .now) > timeout { + fatalError("Worker thread initialization timeout exceeded (\(timeout))") + } + tid = worker.tid.load(ordering: .sequentiallyConsistent) + try await clock.sleep(for: checkInterval) + } while tid == 0 + swjs_listen_message_from_worker_thread(tid) + } + } + + func terminate() { + for worker in workers { + worker.terminate() + } + } + + func enqueue(_ job: UnownedJob) { + precondition(!workers.isEmpty, "No worker threads are available") + + // If the current thread is a worker thread, enqueue the job to the current worker. + if let worker = Worker.currentThread { + worker.enqueue(job) + return + } + // Otherwise (main thread), enqueue the job to the worker with round-robin scheduling. + // TODO: Use a more sophisticated scheduling algorithm with priority. + roundRobinIndex.withLock { index in + let worker = workers[index] + worker.enqueue(job) + index = (index + 1) % numberOfThreads + } + } + } + + private let executor: Executor + + /// Create a new Web Worker task executor. + /// + /// - Parameters: + /// - numberOfThreads: The number of Web Worker threads to spawn. + /// - timeout: The timeout to wait for all worker threads to be started. + /// - checkInterval: The interval to check if all worker threads are started. + public init(numberOfThreads: Int, timeout: Duration = .seconds(3), checkInterval: Duration = .microseconds(5)) async throws { + self.executor = Executor(numberOfThreads: numberOfThreads) + try await self.executor.start(timeout: timeout, checkInterval: checkInterval) + } + + /// Terminate child Web Worker threads. + /// Jobs enqueued to the executor after calling this method will be ignored. + /// + /// NOTE: This method must be called after all tasks that prefer this executor are done. + /// Otherwise, the tasks may stuck forever. + public func terminate() { + executor.terminate() + } + + /// The number of Web Worker threads. + public var numberOfThreads: Int { + executor.numberOfThreads + } + + // MARK: TaskExecutor conformance + + /// Enqueue a job to the executor. + /// + /// NOTE: Called from the Swift Concurrency runtime. + public func enqueue(_ job: UnownedJob) { + Self.traceStatsIncrement(\.enqueueExecutor) + executor.enqueue(job) + } + + // MARK: Statistics + + /// Executor global statistics + internal struct ExecutorStats: CustomStringConvertible { + var sendJobToMainThread: Int = 0 + var recieveJobFromWorkerThread: Int = 0 + var enqueueGlobal: Int = 0 + var enqueueExecutor: Int = 0 + + var description: String { + "ExecutorStats(sendWtoM: \(sendJobToMainThread), recvWfromM: \(recieveJobFromWorkerThread)), enqueueGlobal: \(enqueueGlobal), enqueueExecutor: \(enqueueExecutor)" + } + } + #if JAVASCRIPTKIT_STATS + private static let stats = Mutex(ExecutorStats()) + fileprivate static func traceStatsIncrement(_ keyPath: WritableKeyPath) { + stats.withLock { stats in + stats[keyPath: keyPath] += 1 + } + } + internal func dumpStats() { + Self.stats.withLock { stats in + print("WebWorkerTaskExecutor stats: \(stats)") + } + } + #else + fileprivate static func traceStatsIncrement(_ keyPath: WritableKeyPath) {} + internal func dumpStats() {} + #endif + + // MARK: Global Executor hack + + private static var _mainThread: pthread_t? + private static var _swift_task_enqueueGlobal_hook_original: UnsafeMutableRawPointer? + private static var _swift_task_enqueueGlobalWithDelay_hook_original: UnsafeMutableRawPointer? + private static var _swift_task_enqueueGlobalWithDeadline_hook_original: UnsafeMutableRawPointer? + + /// Install a global executor that forwards jobs from Web Worker threads to the main thread. + /// + /// This function must be called once before using the Web Worker task executor. + public static func installGlobalExecutor() { + // Ensure this function is called only once. + guard _mainThread == nil else { return } + + _mainThread = pthread_self() + assert(swjs_get_worker_thread_id() == -1, "\(#function) must be called on the main thread") + + _swift_task_enqueueGlobal_hook_original = swift_task_enqueueGlobal_hook + + typealias swift_task_enqueueGlobal_hook_Fn = @convention(thin) (UnownedJob, swift_task_enqueueGlobal_original) -> Void + let swift_task_enqueueGlobal_hook_impl: swift_task_enqueueGlobal_hook_Fn = { job, base in + WebWorkerTaskExecutor.traceStatsIncrement(\.enqueueGlobal) + // Enter this block only if the current Task has no executor preference. + if pthread_equal(pthread_self(), WebWorkerTaskExecutor._mainThread) != 0 { + // If the current thread is the main thread, delegate the job + // execution to the original hook of JavaScriptEventLoop. + let original = unsafeBitCast(WebWorkerTaskExecutor._swift_task_enqueueGlobal_hook_original, to: swift_task_enqueueGlobal_hook_Fn.self) + original(job, base) + } else { + // Notify the main thread to execute the job when a job is + // enqueued from a Web Worker thread but without an executor preference. + // This is usually the case when hopping back to the main thread + // at the end of a task. + WebWorkerTaskExecutor.traceStatsIncrement(\.sendJobToMainThread) + let jobBitPattern = unsafeBitCast(job, to: UInt.self) + swjs_send_job_to_main_thread(jobBitPattern) + } + } + swift_task_enqueueGlobal_hook = unsafeBitCast(swift_task_enqueueGlobal_hook_impl, to: UnsafeMutableRawPointer?.self) + } +} + +/// Enqueue a job scheduled from a Web Worker thread to the main thread. +/// This function is called when a job is enqueued from a Web Worker thread. +@_expose(wasm, "swjs_enqueue_main_job_from_worker") +func _swjs_enqueue_main_job_from_worker(_ job: UnownedJob) { + WebWorkerTaskExecutor.traceStatsIncrement(\.recieveJobFromWorkerThread) + JavaScriptEventLoop.shared.enqueue(ExecutorJob(job)) +} + +/// Wake up the worker thread. +/// This function is called when a job is enqueued from the main thread to a worker thread. +@_expose(wasm, "swjs_wake_worker_thread") +func _swjs_wake_worker_thread() { + WebWorkerTaskExecutor.Worker.currentThread!.run() +} + +#endif + +fileprivate func trace(_ message: String) { +#if JAVASCRIPTKIT_TRACE + JSObject.global.process.stdout.write("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n") +#endif +} diff --git a/Sources/JavaScriptKit/FundamentalObjects/JSClosure.swift b/Sources/JavaScriptKit/FundamentalObjects/JSClosure.swift index 6decbc814..75a8398fa 100644 --- a/Sources/JavaScriptKit/FundamentalObjects/JSClosure.swift +++ b/Sources/JavaScriptKit/FundamentalObjects/JSClosure.swift @@ -63,8 +63,26 @@ public class JSOneshotClosure: JSObject, JSClosureProtocol { /// public class JSClosure: JSFunction, JSClosureProtocol { + class SharedJSClosure { + private var storage: [JavaScriptHostFuncRef: (object: JSObject, body: ([JSValue]) -> JSValue)] = [:] + init() {} + + subscript(_ key: JavaScriptHostFuncRef) -> (object: JSObject, body: ([JSValue]) -> JSValue)? { + get { storage[key] } + set { storage[key] = newValue } + } + } + // Note: Retain the closure object itself also to avoid funcRef conflicts - fileprivate static var sharedClosures: [JavaScriptHostFuncRef: (object: JSObject, body: ([JSValue]) -> JSValue)] = [:] + fileprivate static var sharedClosures: SharedJSClosure { + if let swjs_thread_local_closures { + return Unmanaged.fromOpaque(swjs_thread_local_closures).takeUnretainedValue() + } else { + let shared = SharedJSClosure() + swjs_thread_local_closures = Unmanaged.passRetained(shared).toOpaque() + return shared + } + } private var hostFuncRef: JavaScriptHostFuncRef = 0 diff --git a/Sources/JavaScriptKit/Runtime/index.js b/Sources/JavaScriptKit/Runtime/index.js index 2aaabce65..9d29b4329 100644 --- a/Sources/JavaScriptKit/Runtime/index.js +++ b/Sources/JavaScriptKit/Runtime/index.js @@ -205,6 +205,7 @@ this._instance = null; this._memory = null; this._closureDeallocator = null; + this.tid = null; this.options = options || {}; } setInstance(instance) { @@ -240,6 +241,31 @@ throw error; } } + /** + * Start a new thread with the given `tid` and `startArg`, which + * is forwarded to the `wasi_thread_start` function. + * This function is expected to be called from the spawned Web Worker thread. + */ + startThread(tid, startArg) { + this.tid = tid; + const instance = this.instance; + try { + if (typeof instance.exports.wasi_thread_start === "function") { + instance.exports.wasi_thread_start(tid, startArg); + } + else { + throw new Error(`The WebAssembly module is not built for wasm32-unknown-wasip1-threads target.`); + } + } + catch (error) { + if (error instanceof UnsafeEventLoopYield) { + // Ignore the error + return; + } + // Rethrow other errors + throw error; + } + } get instance() { if (!this._instance) throw new Error("WebAssembly instance is not set yet"); @@ -464,8 +490,71 @@ swjs_unsafe_event_loop_yield: () => { throw new UnsafeEventLoopYield(); }, + swjs_send_job_to_main_thread: (unowned_job) => { + this.postMessageToMainThread({ type: "job", data: unowned_job }); + }, + swjs_listen_message_from_main_thread: () => { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "listenMessageFromMainThread" in threadChannel)) { + throw new Error("listenMessageFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread."); + } + threadChannel.listenMessageFromMainThread((message) => { + switch (message.type) { + case "wake": + this.exports.swjs_wake_worker_thread(); + break; + default: + const unknownMessage = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }); + }, + swjs_wake_up_worker_thread: (tid) => { + this.postMessageToWorkerThread(tid, { type: "wake" }); + }, + swjs_listen_message_from_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "listenMessageFromWorkerThread" in threadChannel)) { + throw new Error("listenMessageFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); + } + threadChannel.listenMessageFromWorkerThread(tid, (message) => { + switch (message.type) { + case "job": + this.exports.swjs_enqueue_main_job_from_worker(message.data); + break; + default: + const unknownMessage = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }); + }, + swjs_terminate_worker_thread: (tid) => { + var _a; + const threadChannel = this.options.threadChannel; + if (threadChannel && "terminateWorkerThread" in threadChannel) { + (_a = threadChannel.terminateWorkerThread) === null || _a === void 0 ? void 0 : _a.call(threadChannel, tid); + } // Otherwise, just ignore the termination request + }, + swjs_get_worker_thread_id: () => { + // Main thread's tid is always -1 + return this.tid || -1; + }, }; } + postMessageToMainThread(message) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToMainThread" in threadChannel)) { + throw new Error("postMessageToMainThread is not set in options given to SwiftRuntime. Please set it to send messages to the main thread."); + } + threadChannel.postMessageToMainThread(message); + } + postMessageToWorkerThread(tid, message) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToWorkerThread" in threadChannel)) { + throw new Error("postMessageToWorkerThread is not set in options given to SwiftRuntime. Please set it to send messages to worker threads."); + } + threadChannel.postMessageToWorkerThread(tid, message); + } } /// This error is thrown when yielding event loop control from `swift_task_asyncMainDrainQueue` /// to JavaScript. This is usually thrown when: diff --git a/Sources/JavaScriptKit/Runtime/index.mjs b/Sources/JavaScriptKit/Runtime/index.mjs index 52de118b5..9201b7712 100644 --- a/Sources/JavaScriptKit/Runtime/index.mjs +++ b/Sources/JavaScriptKit/Runtime/index.mjs @@ -199,6 +199,7 @@ class SwiftRuntime { this._instance = null; this._memory = null; this._closureDeallocator = null; + this.tid = null; this.options = options || {}; } setInstance(instance) { @@ -234,6 +235,31 @@ class SwiftRuntime { throw error; } } + /** + * Start a new thread with the given `tid` and `startArg`, which + * is forwarded to the `wasi_thread_start` function. + * This function is expected to be called from the spawned Web Worker thread. + */ + startThread(tid, startArg) { + this.tid = tid; + const instance = this.instance; + try { + if (typeof instance.exports.wasi_thread_start === "function") { + instance.exports.wasi_thread_start(tid, startArg); + } + else { + throw new Error(`The WebAssembly module is not built for wasm32-unknown-wasip1-threads target.`); + } + } + catch (error) { + if (error instanceof UnsafeEventLoopYield) { + // Ignore the error + return; + } + // Rethrow other errors + throw error; + } + } get instance() { if (!this._instance) throw new Error("WebAssembly instance is not set yet"); @@ -458,8 +484,71 @@ class SwiftRuntime { swjs_unsafe_event_loop_yield: () => { throw new UnsafeEventLoopYield(); }, + swjs_send_job_to_main_thread: (unowned_job) => { + this.postMessageToMainThread({ type: "job", data: unowned_job }); + }, + swjs_listen_message_from_main_thread: () => { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "listenMessageFromMainThread" in threadChannel)) { + throw new Error("listenMessageFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread."); + } + threadChannel.listenMessageFromMainThread((message) => { + switch (message.type) { + case "wake": + this.exports.swjs_wake_worker_thread(); + break; + default: + const unknownMessage = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }); + }, + swjs_wake_up_worker_thread: (tid) => { + this.postMessageToWorkerThread(tid, { type: "wake" }); + }, + swjs_listen_message_from_worker_thread: (tid) => { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "listenMessageFromWorkerThread" in threadChannel)) { + throw new Error("listenMessageFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); + } + threadChannel.listenMessageFromWorkerThread(tid, (message) => { + switch (message.type) { + case "job": + this.exports.swjs_enqueue_main_job_from_worker(message.data); + break; + default: + const unknownMessage = message.type; + throw new Error(`Unknown message type: ${unknownMessage}`); + } + }); + }, + swjs_terminate_worker_thread: (tid) => { + var _a; + const threadChannel = this.options.threadChannel; + if (threadChannel && "terminateWorkerThread" in threadChannel) { + (_a = threadChannel.terminateWorkerThread) === null || _a === void 0 ? void 0 : _a.call(threadChannel, tid); + } // Otherwise, just ignore the termination request + }, + swjs_get_worker_thread_id: () => { + // Main thread's tid is always -1 + return this.tid || -1; + }, }; } + postMessageToMainThread(message) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToMainThread" in threadChannel)) { + throw new Error("postMessageToMainThread is not set in options given to SwiftRuntime. Please set it to send messages to the main thread."); + } + threadChannel.postMessageToMainThread(message); + } + postMessageToWorkerThread(tid, message) { + const threadChannel = this.options.threadChannel; + if (!(threadChannel && "postMessageToWorkerThread" in threadChannel)) { + throw new Error("postMessageToWorkerThread is not set in options given to SwiftRuntime. Please set it to send messages to worker threads."); + } + threadChannel.postMessageToWorkerThread(tid, message); + } } /// This error is thrown when yielding event loop control from `swift_task_asyncMainDrainQueue` /// to JavaScript. This is usually thrown when: diff --git a/Sources/_CJavaScriptEventLoop/_CJavaScriptEventLoop.c b/Sources/_CJavaScriptEventLoop/_CJavaScriptEventLoop.c index 009672933..ebb05e1db 100644 --- a/Sources/_CJavaScriptEventLoop/_CJavaScriptEventLoop.c +++ b/Sources/_CJavaScriptEventLoop/_CJavaScriptEventLoop.c @@ -1,3 +1,5 @@ #include "_CJavaScriptEventLoop.h" _Thread_local void *swjs_thread_local_event_loop; + +_Thread_local void *swjs_thread_local_task_executor_worker; diff --git a/Sources/_CJavaScriptEventLoop/include/_CJavaScriptEventLoop.h b/Sources/_CJavaScriptEventLoop/include/_CJavaScriptEventLoop.h index 890e26a01..4f1b9470c 100644 --- a/Sources/_CJavaScriptEventLoop/include/_CJavaScriptEventLoop.h +++ b/Sources/_CJavaScriptEventLoop/include/_CJavaScriptEventLoop.h @@ -66,4 +66,6 @@ extern void *_Nullable swift_task_asyncMainDrainQueue_hook; extern _Thread_local void * _Nullable swjs_thread_local_event_loop; +extern _Thread_local void * _Nullable swjs_thread_local_task_executor_worker; + #endif diff --git a/Sources/_CJavaScriptKit/_CJavaScriptKit.c b/Sources/_CJavaScriptKit/_CJavaScriptKit.c index 0bcc5eaca..3cc06af1c 100644 --- a/Sources/_CJavaScriptKit/_CJavaScriptKit.c +++ b/Sources/_CJavaScriptKit/_CJavaScriptKit.c @@ -47,4 +47,6 @@ int swjs_library_features(void) { return _library_features(); } +_Thread_local void *swjs_thread_local_closures; + #endif diff --git a/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h b/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h index 431b83615..1e539fde1 100644 --- a/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h +++ b/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h @@ -295,4 +295,21 @@ IMPORT_JS_FUNCTION(swjs_release, void, (const JavaScriptObjectRef ref)) /// @note This function never returns IMPORT_JS_FUNCTION(swjs_unsafe_event_loop_yield, void, (void)) +IMPORT_JS_FUNCTION(swjs_send_job_to_main_thread, void, (uintptr_t job)) + +IMPORT_JS_FUNCTION(swjs_listen_message_from_main_thread, void, (void)) + +IMPORT_JS_FUNCTION(swjs_wake_up_worker_thread, void, (int tid)) + +IMPORT_JS_FUNCTION(swjs_listen_message_from_worker_thread, void, (int tid)) + +IMPORT_JS_FUNCTION(swjs_terminate_worker_thread, void, (int tid)) + +IMPORT_JS_FUNCTION(swjs_get_worker_thread_id, int, (void)) + +/// MARK: - thread local storage + +// TODO: Rewrite closure system without global storage +extern _Thread_local void * _Nullable swjs_thread_local_closures; + #endif /* _CJavaScriptKit_h */ diff --git a/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift b/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift new file mode 100644 index 000000000..94e7635e4 --- /dev/null +++ b/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift @@ -0,0 +1,154 @@ +#if compiler(>=6.0) && _runtime(_multithreaded) +import XCTest +import JavaScriptKit +import _CJavaScriptKit // For swjs_get_worker_thread_id +@testable import JavaScriptEventLoop + +@_extern(wasm, module: "JavaScriptEventLoopTestSupportTests", name: "isMainThread") +func isMainThread() -> Bool + +final class WebWorkerTaskExecutorTests: XCTestCase { + override func setUp() { + WebWorkerTaskExecutor.installGlobalExecutor() + } + + func testTaskRunOnMainThread() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + + XCTAssertTrue(isMainThread()) + + let task = Task(executorPreference: executor) { + return isMainThread() + } + let taskRunOnMainThread = await task.value + // The task should run on the worker thread + XCTAssertFalse(taskRunOnMainThread) + // After the task is done, back to the main thread + XCTAssertTrue(isMainThread()) + + executor.terminate() + } + + func testWithPreferenceBlock() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + await withTaskExecutorPreference(executor) { + XCTAssertFalse(isMainThread()) + } + } + + func testAwaitInsideTask() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + + let task = Task(executorPreference: executor) { + await Task.yield() + _ = try await JSPromise.resolve(1).value + return isMainThread() + } + let taskRunOnMainThread = try await task.value + XCTAssertFalse(taskRunOnMainThread) + + executor.terminate() + } + + func testSleepInsideTask() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + + let task = Task(executorPreference: executor) { + XCTAssertFalse(isMainThread()) + try await Task.sleep(nanoseconds: 10) + XCTAssertFalse(isMainThread()) + try await Task.sleep(nanoseconds: 100) + XCTAssertFalse(isMainThread()) + let clock = ContinuousClock() + try await clock.sleep(for: .milliseconds(10)) + return isMainThread() + } + let taskRunOnMainThread = try await task.value + XCTAssertFalse(taskRunOnMainThread) + + executor.terminate() + } + + func testMainActorRun() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + + let task = Task(executorPreference: executor) { + await MainActor.run { + return isMainThread() + } + } + let taskRunOnMainThread = await task.value + // FIXME: The block passed to `MainActor.run` should run on the main thread + // XCTAssertTrue(taskRunOnMainThread) + XCTAssertFalse(taskRunOnMainThread) + // After the task is done, back to the main thread + XCTAssertTrue(isMainThread()) + + executor.terminate() + } + + func testTaskGroupRunOnSameThread() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 3) + + let mainTid = swjs_get_worker_thread_id() + await withTaskExecutorPreference(executor) { + let tid = swjs_get_worker_thread_id() + await withTaskGroup(of: Int32.self) { group in + group.addTask { + return swjs_get_worker_thread_id() + } + + group.addTask { + return swjs_get_worker_thread_id() + } + + for await id in group { + XCTAssertEqual(id, tid) + XCTAssertNotEqual(id, mainTid) + } + } + } + + executor.terminate() + } + + func testTaskGroupRunOnDifferentThreads() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 2) + + struct Item: Hashable { + let type: String + let tid: Int32 + let value: Int + init(_ type: String, _ tid: Int32, _ value: Int) { + self.type = type + self.tid = tid + self.value = value + } + } + + await withTaskGroup(of: Item.self) { group in + group.addTask { + let tid = swjs_get_worker_thread_id() + return Item("main", tid, 0) + } + + let numberOffloadedTasks = 10 + for i in 0.. { process.exit(1); } +Error.stackTraceLimit = Infinity; + startWasiTask(process.argv[2]).catch(handleExitOrError);