diff --git a/Examples/Multithreading/README.md b/Examples/Multithreading/README.md index c95df2a8b..346f8cc8b 100644 --- a/Examples/Multithreading/README.md +++ b/Examples/Multithreading/README.md @@ -1,9 +1,21 @@ # Multithreading example -Install Development Snapshot toolchain `DEVELOPMENT-SNAPSHOT-2024-07-08-a` from [swift.org/install](https://www.swift.org/install/) and run the following commands: +Install Development Snapshot toolchain `DEVELOPMENT-SNAPSHOT-2024-07-08-a` or later from [swift.org/install](https://www.swift.org/install/) and run the following commands: ```sh -$ swift sdk install https://github.com/swiftwasm/swift/releases/download/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-07-09-a/swift-wasm-DEVELOPMENT-SNAPSHOT-2024-07-09-a-wasm32-unknown-wasip1-threads.artifactbundle.zip +$ ( + set -eo pipefail; \ + V="$(swiftc --version | head -n1)"; \ + TAG="$(curl -sL "https://raw.githubusercontent.com/swiftwasm/swift-sdk-index/refs/heads/main/v1/tag-by-version.json" | jq -e -r --arg v "$V" '.[$v] | .[-1]')"; \ + curl -sL "https://raw.githubusercontent.com/swiftwasm/swift-sdk-index/refs/heads/main/v1/builds/$TAG.json" | \ + jq -r '.["swift-sdks"]["wasm32-unknown-wasip1-threads"] | "swift sdk install \"\(.url)\" --checksum \"\(.checksum)\""' | sh -x +) +$ export SWIFT_SDK_ID=$( + V="$(swiftc --version | head -n1)"; \ + TAG="$(curl -sL "https://raw.githubusercontent.com/swiftwasm/swift-sdk-index/refs/heads/main/v1/tag-by-version.json" | jq -e -r --arg v "$V" '.[$v] | .[-1]')"; \ + curl -sL "https://raw.githubusercontent.com/swiftwasm/swift-sdk-index/refs/heads/main/v1/builds/$TAG.json" | \ + jq -r '.["swift-sdks"]["wasm32-unknown-wasip1-threads"]["id"]' +) $ ./build.sh $ npx serve ``` diff --git a/Examples/Multithreading/Sources/JavaScript/index.js b/Examples/Multithreading/Sources/JavaScript/index.js index cc0c7e4e4..3cfc01a43 100644 --- a/Examples/Multithreading/Sources/JavaScript/index.js +++ b/Examples/Multithreading/Sources/JavaScript/index.js @@ -27,9 +27,9 @@ class ThreadRegistry { }; } - postMessageToWorkerThread(tid, data) { + postMessageToWorkerThread(tid, data, transfer) { const worker = this.workers.get(tid); - worker.postMessage(data); + worker.postMessage(data, transfer); } terminateWorkerThread(tid) { diff --git a/Examples/Multithreading/Sources/JavaScript/worker.js b/Examples/Multithreading/Sources/JavaScript/worker.js index eadd42bef..703df4407 100644 --- a/Examples/Multithreading/Sources/JavaScript/worker.js +++ b/Examples/Multithreading/Sources/JavaScript/worker.js @@ -5,9 +5,9 @@ self.onmessage = async (event) => { const { instance, wasi, swiftRuntime } = await instantiate({ module, threadChannel: { - postMessageToMainThread: (message) => { + postMessageToMainThread: (message, transfer) => { // Send the job to the main thread - postMessage(message); + postMessage(message, transfer); }, listenMessageFromMainThread: (listener) => { self.onmessage = (event) => listener(event.data); diff --git a/Examples/Multithreading/build.sh b/Examples/Multithreading/build.sh index 7d903b1f4..0f8670db1 100755 --- a/Examples/Multithreading/build.sh +++ b/Examples/Multithreading/build.sh @@ -1 +1 @@ -swift build --swift-sdk DEVELOPMENT-SNAPSHOT-2024-07-09-a-wasm32-unknown-wasip1-threads -Xswiftc -Xclang-linker -Xswiftc -mexec-model=reactor -Xlinker --export=__main_argc_argv -c release -Xswiftc -g +swift build --swift-sdk "${SWIFT_SDK_ID:-wasm32-unknown-wasip1-threads}" -Xswiftc -Xclang-linker -Xswiftc -mexec-model=reactor -Xlinker --export=__main_argc_argv -c release -Xswiftc -g diff --git a/Examples/OffscrenCanvas/.gitignore b/Examples/OffscrenCanvas/.gitignore new file mode 100644 index 000000000..0023a5340 --- /dev/null +++ b/Examples/OffscrenCanvas/.gitignore @@ -0,0 +1,8 @@ +.DS_Store +/.build +/Packages +xcuserdata/ +DerivedData/ +.swiftpm/configuration/registries.json +.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata +.netrc diff --git a/Examples/OffscrenCanvas/Package.swift b/Examples/OffscrenCanvas/Package.swift new file mode 100644 index 000000000..7fc45ad1b --- /dev/null +++ b/Examples/OffscrenCanvas/Package.swift @@ -0,0 +1,20 @@ +// swift-tools-version: 5.10 + +import PackageDescription + +let package = Package( + name: "Example", + platforms: [.macOS("15"), .iOS("18"), .watchOS("11"), .tvOS("18"), .visionOS("2")], + dependencies: [ + .package(path: "../../"), + ], + targets: [ + .executableTarget( + name: "MyApp", + dependencies: [ + .product(name: "JavaScriptKit", package: "JavaScriptKit"), + .product(name: "JavaScriptEventLoop", package: "JavaScriptKit"), + ] + ), + ] +) diff --git a/Examples/OffscrenCanvas/README.md b/Examples/OffscrenCanvas/README.md new file mode 100644 index 000000000..395b0c295 --- /dev/null +++ b/Examples/OffscrenCanvas/README.md @@ -0,0 +1,21 @@ +# OffscreenCanvas example + +Install Development Snapshot toolchain `DEVELOPMENT-SNAPSHOT-2024-07-08-a` or later from [swift.org/install](https://www.swift.org/install/) and run the following commands: + +```sh +$ ( + set -eo pipefail; \ + V="$(swiftc --version | head -n1)"; \ + TAG="$(curl -sL "https://raw.githubusercontent.com/swiftwasm/swift-sdk-index/refs/heads/main/v1/tag-by-version.json" | jq -e -r --arg v "$V" '.[$v] | .[-1]')"; \ + curl -sL "https://raw.githubusercontent.com/swiftwasm/swift-sdk-index/refs/heads/main/v1/builds/$TAG.json" | \ + jq -r '.["swift-sdks"]["wasm32-unknown-wasip1-threads"] | "swift sdk install \"\(.url)\" --checksum \"\(.checksum)\""' | sh -x +) +$ export SWIFT_SDK_ID=$( + V="$(swiftc --version | head -n1)"; \ + TAG="$(curl -sL "https://raw.githubusercontent.com/swiftwasm/swift-sdk-index/refs/heads/main/v1/tag-by-version.json" | jq -e -r --arg v "$V" '.[$v] | .[-1]')"; \ + curl -sL "https://raw.githubusercontent.com/swiftwasm/swift-sdk-index/refs/heads/main/v1/builds/$TAG.json" | \ + jq -r '.["swift-sdks"]["wasm32-unknown-wasip1-threads"]["id"]' +) +$ ./build.sh +$ npx serve +``` diff --git a/Examples/OffscrenCanvas/Sources/JavaScript b/Examples/OffscrenCanvas/Sources/JavaScript new file mode 120000 index 000000000..b24c2256e --- /dev/null +++ b/Examples/OffscrenCanvas/Sources/JavaScript @@ -0,0 +1 @@ +../../Multithreading/Sources/JavaScript \ No newline at end of file diff --git a/Examples/OffscrenCanvas/Sources/MyApp/main.swift b/Examples/OffscrenCanvas/Sources/MyApp/main.swift new file mode 100644 index 000000000..67e087122 --- /dev/null +++ b/Examples/OffscrenCanvas/Sources/MyApp/main.swift @@ -0,0 +1,139 @@ +import JavaScriptEventLoop +import JavaScriptKit + +JavaScriptEventLoop.installGlobalExecutor() +WebWorkerTaskExecutor.installGlobalExecutor() + +protocol CanvasRenderer { + func render(canvas: JSObject, size: Int) async throws +} + +struct BackgroundRenderer: CanvasRenderer { + func render(canvas: JSObject, size: Int) async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + let transfer = JSSending.transfer(canvas) + let renderingTask = Task(executorPreference: executor) { + let canvas = try await transfer.receive() + try await renderAnimation(canvas: canvas, size: size) + } + await withTaskCancellationHandler { + try? await renderingTask.value + } onCancel: { + renderingTask.cancel() + } + executor.terminate() + } +} + +struct MainThreadRenderer: CanvasRenderer { + func render(canvas: JSObject, size: Int) async throws { + try await renderAnimation(canvas: canvas, size: size) + } +} + +// FPS Counter for CSS animation +func startFPSMonitor() { + let fpsCounterElement = JSObject.global.document.getElementById("fps-counter").object! + + var lastTime = JSObject.global.performance.now().number! + var frames = 0 + + // Create a frame counter function + func countFrame() { + frames += 1 + let currentTime = JSObject.global.performance.now().number! + let elapsed = currentTime - lastTime + + if elapsed >= 1000 { + let fps = Int(Double(frames) * 1000 / elapsed) + fpsCounterElement.textContent = .string("FPS: \(fps)") + frames = 0 + lastTime = currentTime + } + + // Request next frame + _ = JSObject.global.requestAnimationFrame!( + JSClosure { _ in + countFrame() + return .undefined + }) + } + + // Start counting + countFrame() +} + +@MainActor +func onClick(renderer: CanvasRenderer) async throws { + let document = JSObject.global.document + + let canvasContainerElement = document.getElementById("canvas-container").object! + + // Remove all child elements from the canvas container + for i in 0..<Int(canvasContainerElement.children.length.number!) { + let child = canvasContainerElement.children[i] + _ = canvasContainerElement.removeChild!(child) + } + + let canvasElement = document.createElement("canvas").object! + _ = canvasContainerElement.appendChild!(canvasElement) + + let size = 800 + canvasElement.width = .number(Double(size)) + canvasElement.height = .number(Double(size)) + + let offscreenCanvas = canvasElement.transferControlToOffscreen!().object! + try await renderer.render(canvas: offscreenCanvas, size: size) +} + +func main() async throws { + let renderButtonElement = JSObject.global.document.getElementById("render-button").object! + let cancelButtonElement = JSObject.global.document.getElementById("cancel-button").object! + let rendererSelectElement = JSObject.global.document.getElementById("renderer-select").object! + + var renderingTask: Task<Void, Error>? = nil + + // Start the FPS monitor for CSS animations + startFPSMonitor() + + _ = renderButtonElement.addEventListener!( + "click", + JSClosure { _ in + renderingTask?.cancel() + renderingTask = Task { + let selectedValue = rendererSelectElement.value.string! + let renderer: CanvasRenderer = + selectedValue == "main" ? MainThreadRenderer() : BackgroundRenderer() + try await onClick(renderer: renderer) + } + return JSValue.undefined + }) + + _ = cancelButtonElement.addEventListener!( + "click", + JSClosure { _ in + renderingTask?.cancel() + return JSValue.undefined + }) +} + +Task { + try await main() +} + +#if canImport(wasi_pthread) + import wasi_pthread + import WASILibc + + /// Trick to avoid blocking the main thread. pthread_mutex_lock function is used by + /// the Swift concurrency runtime. + @_cdecl("pthread_mutex_lock") + func pthread_mutex_lock(_ mutex: UnsafeMutablePointer<pthread_mutex_t>) -> Int32 { + // DO NOT BLOCK MAIN THREAD + var ret: Int32 + repeat { + ret = pthread_mutex_trylock(mutex) + } while ret == EBUSY + return ret + } +#endif diff --git a/Examples/OffscrenCanvas/Sources/MyApp/render.swift b/Examples/OffscrenCanvas/Sources/MyApp/render.swift new file mode 100644 index 000000000..714cac184 --- /dev/null +++ b/Examples/OffscrenCanvas/Sources/MyApp/render.swift @@ -0,0 +1,174 @@ +import Foundation +import JavaScriptKit + +func sleepOnThread(milliseconds: Int, isolation: isolated (any Actor)? = #isolation) async { + // Use the JavaScript setTimeout function to avoid hopping back to the main thread + await withCheckedContinuation(isolation: isolation) { continuation in + _ = JSObject.global.setTimeout!( + JSOneshotClosure { _ in + continuation.resume() + return JSValue.undefined + }, milliseconds + ) + } +} + +func renderAnimation(canvas: JSObject, size: Int, isolation: isolated (any Actor)? = #isolation) + async throws +{ + let ctx = canvas.getContext!("2d").object! + + // Animation state variables + var time: Double = 0 + + // Create a large number of particles + let particleCount = 5000 + var particles: [[Double]] = [] + + // Initialize particles with random positions and velocities + for _ in 0..<particleCount { + // [x, y, vx, vy, size, hue, lifespan, maxLife] + let x = Double.random(in: 0..<Double(size)) + let y = Double.random(in: 0..<Double(size)) + let speed = Double.random(in: 0.2...2.0) + let angle = Double.random(in: 0..<(2 * Double.pi)) + let vx = cos(angle) * speed + let vy = sin(angle) * speed + let particleSize = Double.random(in: 1.0...3.0) + let hue = Double.random(in: 0..<360) + let maxLife = Double.random(in: 100...300) + particles.append([x, y, vx, vy, particleSize, hue, maxLife, maxLife]) + } + + // Create emitter positions that will generate new particles + let emitters = 5 + var emitterPositions: [[Double]] = [] + for i in 0..<emitters { + let angle = Double(i) * 2 * Double.pi / Double(emitters) + let distance = Double(size) * 0.3 + let x = Double(size) / 2 + cos(angle) * distance + let y = Double(size) / 2 + sin(angle) * distance + emitterPositions.append([x, y]) + } + + while !Task.isCancelled { + // Semi-transparent background for trail effect + _ = ctx.fillStyle = .string("rgba(0, 0, 0, 0.05)") + _ = ctx.fillRect!(0, 0, size, size) + + // Intentionally add a computationally expensive calculation for main thread demonstration + var expensiveCalculation = 0.0 + for _ in 0..<500 { + expensiveCalculation += sin(time) * cos(time) + } + + // Update and render all particles + for i in 0..<particles.count { + // Update position + particles[i][0] += particles[i][2] + particles[i][1] += particles[i][3] + + // Apply slight gravity + particles[i][3] += 0.02 + + // Decrease lifespan + particles[i][6] -= 1 + + // If particle is dead, respawn it from an emitter + if particles[i][6] <= 0 { + let emitterIndex = Int.random(in: 0..<emitterPositions.count) + particles[i][0] = emitterPositions[emitterIndex][0] + particles[i][1] = emitterPositions[emitterIndex][1] + + let speed = Double.random(in: 0.5...3.0) + let angle = Double.random(in: 0..<(2 * Double.pi)) + particles[i][2] = cos(angle) * speed + particles[i][3] = sin(angle) * speed + + particles[i][4] = Double.random(in: 1.0...3.0) // Size + particles[i][5] = Double.random(in: 0..<360) // Hue + particles[i][6] = particles[i][7] // Reset lifespan + } + + // Bounce off edges + if particles[i][0] < 0 || particles[i][0] > Double(size) { + particles[i][2] *= -0.8 + } + if particles[i][1] < 0 || particles[i][1] > Double(size) { + particles[i][3] *= -0.8 + } + + // Calculate opacity based on lifespan + let opacity = particles[i][6] / particles[i][7] + + // Get coordinates and properties + let x = particles[i][0] + let y = particles[i][1] + let size = particles[i][4] + let hue = (particles[i][5] + time * 10).truncatingRemainder(dividingBy: 360) + + // Draw particle + _ = ctx.beginPath!() + ctx.fillStyle = .string("hsla(\(hue), 100%, 60%, \(opacity))") + _ = ctx.arc!(x, y, size, 0, 2 * Double.pi) + _ = ctx.fill!() + + // Connect nearby particles with lines (only check some to save CPU) + if i % 20 == 0 { + for j in (i + 1)..<min(i + 20, particles.count) { + let dx = particles[j][0] - x + let dy = particles[j][1] - y + let dist = sqrt(dx * dx + dy * dy) + + if dist < 30 { + _ = ctx.beginPath!() + ctx.strokeStyle = .string("rgba(255, 255, 255, \(0.1 * opacity))") + ctx.lineWidth = .number(0.3) + _ = ctx.moveTo!(x, y) + _ = ctx.lineTo!(particles[j][0], particles[j][1]) + _ = ctx.stroke!() + } + } + } + } + + // Draw emitters as glowing circles + for i in 0..<emitterPositions.count { + let x = emitterPositions[i][0] + let y = emitterPositions[i][1] + + // Emitter pulse effect + let pulseSize = 10 + 5 * sin(time * 2 + Double(i)) + let hue = (time * 50 + Double(i) * 72).truncatingRemainder(dividingBy: 360) + + // Draw glow + let gradient = ctx.createRadialGradient!(x, y, 0, x, y, pulseSize * 2).object! + _ = gradient.addColorStop!(0, "hsla(\(hue), 100%, 70%, 0.8)") + _ = gradient.addColorStop!(1, "hsla(\(hue), 100%, 50%, 0)") + + _ = ctx.beginPath!() + ctx.fillStyle = .object(gradient) + _ = ctx.arc!(x, y, pulseSize * 2, 0, 2 * Double.pi) + _ = ctx.fill!() + + // Center of emitter + _ = ctx.beginPath!() + ctx.fillStyle = .string("hsla(\(hue), 100%, 70%, 0.8)") + _ = ctx.arc!(x, y, pulseSize * 0.5, 0, 2 * Double.pi) + _ = ctx.fill!() + } + + // Update time and emitter positions + time += 0.03 + + // Move emitters in circular patterns + for i in 0..<emitterPositions.count { + let angle = time * 0.2 + Double(i) * 2 * Double.pi / Double(emitters) + let distance = Double(size) * 0.3 + sin(time * 0.5) * Double(size) * 0.05 + emitterPositions[i][0] = Double(size) / 2 + cos(angle) * distance + emitterPositions[i][1] = Double(size) / 2 + sin(angle) * distance + } + + await sleepOnThread(milliseconds: 16, isolation: isolation) + } +} diff --git a/Examples/OffscrenCanvas/build.sh b/Examples/OffscrenCanvas/build.sh new file mode 100755 index 000000000..0f8670db1 --- /dev/null +++ b/Examples/OffscrenCanvas/build.sh @@ -0,0 +1 @@ +swift build --swift-sdk "${SWIFT_SDK_ID:-wasm32-unknown-wasip1-threads}" -Xswiftc -Xclang-linker -Xswiftc -mexec-model=reactor -Xlinker --export=__main_argc_argv -c release -Xswiftc -g diff --git a/Examples/OffscrenCanvas/index.html b/Examples/OffscrenCanvas/index.html new file mode 100644 index 000000000..5887c66cc --- /dev/null +++ b/Examples/OffscrenCanvas/index.html @@ -0,0 +1,98 @@ +<html> + +<head> + <meta charset="utf-8"> + <title>OffscreenCanvas Example</title> + <style> + /* CSS animation container */ + .animation-container { + display: flex; + margin-top: 20px; + justify-content: center; + overflow: hidden; + height: 80px; + } + + /* Animated element */ + .animation-box { + width: 50px; + height: 50px; + margin: 0 10px; + background-color: #3498db; + border-radius: 5px; + animation: moveBox 1s infinite alternate linear; + box-shadow: 0 0 10px rgba(0, 0, 150, 0.5); + } + + /* Different colors for each box */ + .animation-box:nth-child(2) { + background-color: #e74c3c; + animation-delay: 0.1s; + } + + .animation-box:nth-child(3) { + background-color: #2ecc71; + animation-delay: 0.2s; + } + + .animation-box:nth-child(4) { + background-color: #f39c12; + animation-delay: 0.3s; + } + + .animation-box:nth-child(5) { + background-color: #9b59b6; + animation-delay: 0.4s; + } + + /* Animation keyframes */ + @keyframes moveBox { + 0% { + transform: translateY(0) rotate(0deg); + } + + 100% { + transform: translateY(30px) rotate(360deg); + } + } + + /* Counter animation to show main thread performance */ + .counter { + width: 100%; + text-align: center; + font-size: 24px; + margin-top: 20px; + font-family: monospace; + } + </style> +</head> + +<body> + <script type="module" src="Sources/JavaScript/index.js"></script> + <h1>OffscreenCanvas Example</h1> + <p> + <div> + <select id="renderer-select"> + <option value="background">Background Renderer (Web Worker)</option> + <option value="main">Main Thread Renderer</option> + </select> + <button id="render-button">Render</button> + <button id="cancel-button">Cancel</button> + </div> + </p> + + <h3>CSS Animation (Main Thread Performance Indicator)</h3> + <div class="animation-container"> + <div class="animation-box"></div> + <div class="animation-box"></div> + <div class="animation-box"></div> + <div class="animation-box"></div> + <div class="animation-box"></div> + </div> + + <div class="counter" id="fps-counter">FPS: 0</div> + + <div id="canvas-container"></div> +</body> + +</html> diff --git a/Examples/OffscrenCanvas/serve.json b/Examples/OffscrenCanvas/serve.json new file mode 120000 index 000000000..326719cd4 --- /dev/null +++ b/Examples/OffscrenCanvas/serve.json @@ -0,0 +1 @@ +../Multithreading/serve.json \ No newline at end of file diff --git a/IntegrationTests/lib.js b/IntegrationTests/lib.js index 0172250d4..a2f10e565 100644 --- a/IntegrationTests/lib.js +++ b/IntegrationTests/lib.js @@ -79,7 +79,9 @@ export async function startWasiChildThread(event) { const swift = new SwiftRuntime({ sharedMemory: true, threadChannel: { - postMessageToMainThread: parentPort.postMessage.bind(parentPort), + postMessageToMainThread: (message, transfer) => { + parentPort.postMessage(message, transfer); + }, listenMessageFromMainThread: (listener) => { parentPort.on("message", listener) } @@ -139,9 +141,9 @@ class ThreadRegistry { return this.workers.get(tid); } - wakeUpWorkerThread(tid, message) { + wakeUpWorkerThread(tid, message, transfer) { const worker = this.workers.get(tid); - worker.postMessage(message); + worker.postMessage(message, transfer); } } diff --git a/Runtime/src/index.ts b/Runtime/src/index.ts index 73f56411a..3f23ed753 100644 --- a/Runtime/src/index.ts +++ b/Runtime/src/index.ts @@ -6,95 +6,12 @@ import { pointer, TypedArray, ImportedFunctions, + MAIN_THREAD_TID, } from "./types.js"; import * as JSValue from "./js-value.js"; import { Memory } from "./memory.js"; - -type MainToWorkerMessage = { - type: "wake"; -}; - -type WorkerToMainMessage = { - type: "job"; - data: number; -}; - -/** - * A thread channel is a set of functions that are used to communicate between - * the main thread and the worker thread. The main thread and the worker thread - * can send messages to each other using these functions. - * - * @example - * ```javascript - * // worker.js - * const runtime = new SwiftRuntime({ - * threadChannel: { - * postMessageToMainThread: postMessage, - * listenMessageFromMainThread: (listener) => { - * self.onmessage = (event) => { - * listener(event.data); - * }; - * } - * } - * }); - * - * // main.js - * const worker = new Worker("worker.js"); - * const runtime = new SwiftRuntime({ - * threadChannel: { - * postMessageToWorkerThread: (tid, data) => { - * worker.postMessage(data); - * }, - * listenMessageFromWorkerThread: (tid, listener) => { - * worker.onmessage = (event) => { - listener(event.data); - * }; - * } - * } - * }); - * ``` - */ -export type SwiftRuntimeThreadChannel = - | { - /** - * This function is used to send messages from the worker thread to the main thread. - * The message submitted by this function is expected to be listened by `listenMessageFromWorkerThread`. - * @param message The message to be sent to the main thread. - */ - postMessageToMainThread: (message: WorkerToMainMessage) => void; - /** - * This function is expected to be set in the worker thread and should listen - * to messages from the main thread sent by `postMessageToWorkerThread`. - * @param listener The listener function to be called when a message is received from the main thread. - */ - listenMessageFromMainThread: (listener: (message: MainToWorkerMessage) => void) => void; - } - | { - /** - * This function is expected to be set in the main thread. - * The message submitted by this function is expected to be listened by `listenMessageFromMainThread`. - * @param tid The thread ID of the worker thread. - * @param message The message to be sent to the worker thread. - */ - postMessageToWorkerThread: (tid: number, message: MainToWorkerMessage) => void; - /** - * This function is expected to be set in the main thread and should listen - * to messages sent by `postMessageToMainThread` from the worker thread. - * @param tid The thread ID of the worker thread. - * @param listener The listener function to be called when a message is received from the worker thread. - */ - listenMessageFromWorkerThread: ( - tid: number, - listener: (message: WorkerToMainMessage) => void - ) => void; - - /** - * This function is expected to be set in the main thread and called - * when the worker thread is terminated. - * @param tid The thread ID of the worker thread. - */ - terminateWorkerThread?: (tid: number) => void; - }; +import { deserializeError, MainToWorkerMessage, MessageBroker, ResponseMessage, ITCInterface, serializeError, SwiftRuntimeThreadChannel, WorkerToMainMessage } from "./itc.js"; +import { decodeObjectRefs } from "./js-value.js"; export type SwiftRuntimeOptions = { /** @@ -265,6 +182,52 @@ export class SwiftRuntime { importObjects = () => this.wasmImports; get wasmImports(): ImportedFunctions { + let broker: MessageBroker | null = null; + const getMessageBroker = (threadChannel: SwiftRuntimeThreadChannel) => { + if (broker) return broker; + const itcInterface = new ITCInterface(this.memory); + const newBroker = new MessageBroker(this.tid ?? -1, threadChannel, { + onRequest: (message) => { + let returnValue: ResponseMessage["data"]["response"]; + try { + // @ts-ignore + const result = itcInterface[message.data.request.method](...message.data.request.parameters); + returnValue = { ok: true, value: result }; + } catch (error) { + returnValue = { ok: false, error: serializeError(error) }; + } + const responseMessage: ResponseMessage = { + type: "response", + data: { + sourceTid: message.data.sourceTid, + context: message.data.context, + response: returnValue, + }, + } + try { + newBroker.reply(responseMessage); + } catch (error) { + responseMessage.data.response = { + ok: false, + error: serializeError(new TypeError(`Failed to serialize message: ${error}`)) + }; + newBroker.reply(responseMessage); + } + }, + onResponse: (message) => { + if (message.data.response.ok) { + const object = this.memory.retain(message.data.response.value.object); + this.exports.swjs_receive_response(object, message.data.context); + } else { + const error = deserializeError(message.data.response.error); + const errorObject = this.memory.retain(error); + this.exports.swjs_receive_error(errorObject, message.data.context); + } + } + }) + broker = newBroker; + return newBroker; + } return { swjs_set_prop: ( ref: ref, @@ -565,6 +528,25 @@ export class SwiftRuntime { this.memory.release(ref); }, + swjs_release_remote: (tid: number, ref: ref) => { + if (!this.options.threadChannel) { + throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to release objects on remote threads."); + } + const broker = getMessageBroker(this.options.threadChannel); + broker.request({ + type: "request", + data: { + sourceTid: this.tid ?? MAIN_THREAD_TID, + targetTid: tid, + context: 0, + request: { + method: "release", + parameters: [ref], + } + } + }) + }, + swjs_i64_to_bigint: (value: bigint, signed: number) => { return this.memory.retain( signed ? value : BigInt.asUintN(64, value) @@ -605,13 +587,22 @@ export class SwiftRuntime { "listenMessageFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread." ); } + const broker = getMessageBroker(threadChannel); threadChannel.listenMessageFromMainThread((message) => { switch (message.type) { case "wake": this.exports.swjs_wake_worker_thread(); break; + case "request": { + broker.onReceivingRequest(message); + break; + } + case "response": { + broker.onReceivingResponse(message); + break; + } default: - const unknownMessage: never = message.type; + const unknownMessage: never = message; throw new Error(`Unknown message type: ${unknownMessage}`); } }); @@ -626,14 +617,23 @@ export class SwiftRuntime { "listenMessageFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads." ); } + const broker = getMessageBroker(threadChannel); threadChannel.listenMessageFromWorkerThread( tid, (message) => { switch (message.type) { case "job": this.exports.swjs_enqueue_main_job_from_worker(message.data); break; + case "request": { + broker.onReceivingRequest(message); + break; + } + case "response": { + broker.onReceivingResponse(message); + break; + } default: - const unknownMessage: never = message.type; + const unknownMessage: never = message; throw new Error(`Unknown message type: ${unknownMessage}`); } }, @@ -649,27 +649,81 @@ export class SwiftRuntime { // Main thread's tid is always -1 return this.tid || -1; }, + swjs_request_sending_object: ( + sending_object: ref, + transferring_objects: pointer, + transferring_objects_count: number, + object_source_tid: number, + sending_context: pointer, + ) => { + if (!this.options.threadChannel) { + throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to request transferring objects."); + } + const broker = getMessageBroker(this.options.threadChannel); + const memory = this.memory; + const transferringObjects = decodeObjectRefs(transferring_objects, transferring_objects_count, memory); + broker.request({ + type: "request", + data: { + sourceTid: this.tid ?? MAIN_THREAD_TID, + targetTid: object_source_tid, + context: sending_context, + request: { + method: "send", + parameters: [sending_object, transferringObjects, sending_context], + } + } + }) + }, + swjs_request_sending_objects: ( + sending_objects: pointer, + sending_objects_count: number, + transferring_objects: pointer, + transferring_objects_count: number, + object_source_tid: number, + sending_context: pointer, + ) => { + if (!this.options.threadChannel) { + throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to request transferring objects."); + } + const broker = getMessageBroker(this.options.threadChannel); + const memory = this.memory; + const sendingObjects = decodeObjectRefs(sending_objects, sending_objects_count, memory); + const transferringObjects = decodeObjectRefs(transferring_objects, transferring_objects_count, memory); + broker.request({ + type: "request", + data: { + sourceTid: this.tid ?? MAIN_THREAD_TID, + targetTid: object_source_tid, + context: sending_context, + request: { + method: "sendObjects", + parameters: [sendingObjects, transferringObjects, sending_context], + } + } + }) + }, }; } - private postMessageToMainThread(message: WorkerToMainMessage) { + private postMessageToMainThread(message: WorkerToMainMessage, transfer: any[] = []) { const threadChannel = this.options.threadChannel; if (!(threadChannel && "postMessageToMainThread" in threadChannel)) { throw new Error( "postMessageToMainThread is not set in options given to SwiftRuntime. Please set it to send messages to the main thread." ); } - threadChannel.postMessageToMainThread(message); + threadChannel.postMessageToMainThread(message, transfer); } - private postMessageToWorkerThread(tid: number, message: MainToWorkerMessage) { + private postMessageToWorkerThread(tid: number, message: MainToWorkerMessage, transfer: any[] = []) { const threadChannel = this.options.threadChannel; if (!(threadChannel && "postMessageToWorkerThread" in threadChannel)) { throw new Error( "postMessageToWorkerThread is not set in options given to SwiftRuntime. Please set it to send messages to worker threads." ); } - threadChannel.postMessageToWorkerThread(tid, message); + threadChannel.postMessageToWorkerThread(tid, message, transfer); } } diff --git a/Runtime/src/itc.ts b/Runtime/src/itc.ts new file mode 100644 index 000000000..e2c93622a --- /dev/null +++ b/Runtime/src/itc.ts @@ -0,0 +1,247 @@ +// This file defines the interface for the inter-thread communication. +import type { ref, pointer } from "./types.js"; +import { Memory } from "./memory.js"; + +/** + * A thread channel is a set of functions that are used to communicate between + * the main thread and the worker thread. The main thread and the worker thread + * can send messages to each other using these functions. + * + * @example + * ```javascript + * // worker.js + * const runtime = new SwiftRuntime({ + * threadChannel: { + * postMessageToMainThread: postMessage, + * listenMessageFromMainThread: (listener) => { + * self.onmessage = (event) => { + * listener(event.data); + * }; + * } + * } + * }); + * + * // main.js + * const worker = new Worker("worker.js"); + * const runtime = new SwiftRuntime({ + * threadChannel: { + * postMessageToWorkerThread: (tid, data) => { + * worker.postMessage(data); + * }, + * listenMessageFromWorkerThread: (tid, listener) => { + * worker.onmessage = (event) => { + listener(event.data); + * }; + * } + * } + * }); + * ``` + */ +export type SwiftRuntimeThreadChannel = + | { + /** + * This function is used to send messages from the worker thread to the main thread. + * The message submitted by this function is expected to be listened by `listenMessageFromWorkerThread`. + * @param message The message to be sent to the main thread. + * @param transfer The array of objects to be transferred to the main thread. + */ + postMessageToMainThread: (message: WorkerToMainMessage, transfer: any[]) => void; + /** + * This function is expected to be set in the worker thread and should listen + * to messages from the main thread sent by `postMessageToWorkerThread`. + * @param listener The listener function to be called when a message is received from the main thread. + */ + listenMessageFromMainThread: (listener: (message: MainToWorkerMessage) => void) => void; + } + | { + /** + * This function is expected to be set in the main thread. + * The message submitted by this function is expected to be listened by `listenMessageFromMainThread`. + * @param tid The thread ID of the worker thread. + * @param message The message to be sent to the worker thread. + * @param transfer The array of objects to be transferred to the worker thread. + */ + postMessageToWorkerThread: (tid: number, message: MainToWorkerMessage, transfer: any[]) => void; + /** + * This function is expected to be set in the main thread and should listen + * to messages sent by `postMessageToMainThread` from the worker thread. + * @param tid The thread ID of the worker thread. + * @param listener The listener function to be called when a message is received from the worker thread. + */ + listenMessageFromWorkerThread: ( + tid: number, + listener: (message: WorkerToMainMessage) => void + ) => void; + + /** + * This function is expected to be set in the main thread and called + * when the worker thread is terminated. + * @param tid The thread ID of the worker thread. + */ + terminateWorkerThread?: (tid: number) => void; + }; + + +export class ITCInterface { + constructor(private memory: Memory) {} + + send(sendingObject: ref, transferringObjects: ref[], sendingContext: pointer): { object: any, sendingContext: pointer, transfer: Transferable[] } { + const object = this.memory.getObject(sendingObject); + const transfer = transferringObjects.map(ref => this.memory.getObject(ref)); + return { object, sendingContext, transfer }; + } + + sendObjects(sendingObjects: ref[], transferringObjects: ref[], sendingContext: pointer): { object: any[], sendingContext: pointer, transfer: Transferable[] } { + const objects = sendingObjects.map(ref => this.memory.getObject(ref)); + const transfer = transferringObjects.map(ref => this.memory.getObject(ref)); + return { object: objects, sendingContext, transfer }; + } + + release(objectRef: ref): { object: undefined, transfer: Transferable[] } { + this.memory.release(objectRef); + return { object: undefined, transfer: [] }; + } +} + +type AllRequests<Interface extends Record<string, any>> = { + [K in keyof Interface]: { + method: K, + parameters: Parameters<Interface[K]>, + } +} + +type ITCRequest<Interface extends Record<string, any>> = AllRequests<Interface>[keyof AllRequests<Interface>]; +type AllResponses<Interface extends Record<string, any>> = { + [K in keyof Interface]: ReturnType<Interface[K]> +} +type ITCResponse<Interface extends Record<string, any>> = AllResponses<Interface>[keyof AllResponses<Interface>]; + +export type RequestMessage = { + type: "request"; + data: { + /** The TID of the thread that sent the request */ + sourceTid: number; + /** The TID of the thread that should respond to the request */ + targetTid: number; + /** The context pointer of the request */ + context: pointer; + /** The request content */ + request: ITCRequest<ITCInterface>; + } +} + +type SerializedError = { isError: true; value: Error } | { isError: false; value: unknown } + +export type ResponseMessage = { + type: "response"; + data: { + /** The TID of the thread that sent the response */ + sourceTid: number; + /** The context pointer of the request */ + context: pointer; + /** The response content */ + response: { + ok: true, + value: ITCResponse<ITCInterface>; + } | { + ok: false, + error: SerializedError; + }; + } +} + +export type MainToWorkerMessage = { + type: "wake"; +} | RequestMessage | ResponseMessage; + +export type WorkerToMainMessage = { + type: "job"; + data: number; +} | RequestMessage | ResponseMessage; + + +export class MessageBroker { + constructor( + private selfTid: number, + private threadChannel: SwiftRuntimeThreadChannel, + private handlers: { + onRequest: (message: RequestMessage) => void, + onResponse: (message: ResponseMessage) => void, + } + ) { + } + + request(message: RequestMessage) { + if (message.data.targetTid == this.selfTid) { + // The request is for the current thread + this.handlers.onRequest(message); + } else if ("postMessageToWorkerThread" in this.threadChannel) { + // The request is for another worker thread sent from the main thread + this.threadChannel.postMessageToWorkerThread(message.data.targetTid, message, []); + } else if ("postMessageToMainThread" in this.threadChannel) { + // The request is for other worker threads or the main thread sent from a worker thread + this.threadChannel.postMessageToMainThread(message, []); + } else { + throw new Error("unreachable"); + } + } + + reply(message: ResponseMessage) { + if (message.data.sourceTid == this.selfTid) { + // The response is for the current thread + this.handlers.onResponse(message); + return; + } + const transfer = message.data.response.ok ? message.data.response.value.transfer : []; + if ("postMessageToWorkerThread" in this.threadChannel) { + // The response is for another worker thread sent from the main thread + this.threadChannel.postMessageToWorkerThread(message.data.sourceTid, message, transfer); + } else if ("postMessageToMainThread" in this.threadChannel) { + // The response is for other worker threads or the main thread sent from a worker thread + this.threadChannel.postMessageToMainThread(message, transfer); + } else { + throw new Error("unreachable"); + } + } + + onReceivingRequest(message: RequestMessage) { + if (message.data.targetTid == this.selfTid) { + this.handlers.onRequest(message); + } else if ("postMessageToWorkerThread" in this.threadChannel) { + // Receive a request from a worker thread to other worker on main thread. + // Proxy the request to the target worker thread. + this.threadChannel.postMessageToWorkerThread(message.data.targetTid, message, []); + } else if ("postMessageToMainThread" in this.threadChannel) { + // A worker thread won't receive a request for other worker threads + throw new Error("unreachable"); + } + } + + onReceivingResponse(message: ResponseMessage) { + if (message.data.sourceTid == this.selfTid) { + this.handlers.onResponse(message); + } else if ("postMessageToWorkerThread" in this.threadChannel) { + // Receive a response from a worker thread to other worker on main thread. + // Proxy the response to the target worker thread. + const transfer = message.data.response.ok ? message.data.response.value.transfer : []; + this.threadChannel.postMessageToWorkerThread(message.data.sourceTid, message, transfer); + } else if ("postMessageToMainThread" in this.threadChannel) { + // A worker thread won't receive a response for other worker threads + throw new Error("unreachable"); + } + } +} + +export function serializeError(error: unknown): SerializedError { + if (error instanceof Error) { + return { isError: true, value: { message: error.message, name: error.name, stack: error.stack } }; + } + return { isError: false, value: error }; +} + +export function deserializeError(error: SerializedError): unknown { + if (error.isError) { + return Object.assign(new Error(error.value.message), error.value); + } + return error.value; +} diff --git a/Runtime/src/js-value.ts b/Runtime/src/js-value.ts index 1b142de05..dcc378f61 100644 --- a/Runtime/src/js-value.ts +++ b/Runtime/src/js-value.ts @@ -1,5 +1,5 @@ import { Memory } from "./memory.js"; -import { assertNever, JavaScriptValueKindAndFlags, pointer } from "./types.js"; +import { assertNever, JavaScriptValueKindAndFlags, pointer, ref } from "./types.js"; export const enum Kind { Boolean = 0, @@ -142,3 +142,11 @@ export const writeAndReturnKindBits = ( } throw new Error("Unreachable"); }; + +export function decodeObjectRefs(ptr: pointer, length: number, memory: Memory): ref[] { + const result: ref[] = new Array(length); + for (let i = 0; i < length; i++) { + result[i] = memory.readUint32(ptr + 4 * i); + } + return result; +} diff --git a/Runtime/src/types.ts b/Runtime/src/types.ts index dd638acc5..587b60770 100644 --- a/Runtime/src/types.ts +++ b/Runtime/src/types.ts @@ -22,6 +22,8 @@ export interface ExportedFunctions { swjs_enqueue_main_job_from_worker(unowned_job: number): void; swjs_wake_worker_thread(): void; + swjs_receive_response(object: ref, transferring: pointer): void; + swjs_receive_error(error: ref, context: number): void; } export interface ImportedFunctions { @@ -102,6 +104,7 @@ export interface ImportedFunctions { ): number; swjs_load_typed_array(ref: ref, buffer: pointer): void; swjs_release(ref: number): void; + swjs_release_remote(tid: number, ref: number): void; swjs_i64_to_bigint(value: bigint, signed: bool): ref; swjs_bigint_to_i64(ref: ref, signed: bool): bigint; swjs_i64_to_bigint_slow(lower: number, upper: number, signed: bool): ref; @@ -112,6 +115,21 @@ export interface ImportedFunctions { swjs_listen_message_from_worker_thread: (tid: number) => void; swjs_terminate_worker_thread: (tid: number) => void; swjs_get_worker_thread_id: () => number; + swjs_request_sending_object: ( + sending_object: ref, + transferring_objects: pointer, + transferring_objects_count: number, + object_source_tid: number, + sending_context: pointer, + ) => void; + swjs_request_sending_objects: ( + sending_objects: pointer, + sending_objects_count: number, + transferring_objects: pointer, + transferring_objects_count: number, + object_source_tid: number, + sending_context: pointer, + ) => void; } export const enum LibraryFeatures { @@ -133,3 +151,5 @@ export type TypedArray = export function assertNever(x: never, message: string) { throw new Error(message); } + +export const MAIN_THREAD_TID = -1; diff --git a/Sources/JavaScriptEventLoop/JSSending.swift b/Sources/JavaScriptEventLoop/JSSending.swift new file mode 100644 index 000000000..615dadce6 --- /dev/null +++ b/Sources/JavaScriptEventLoop/JSSending.swift @@ -0,0 +1,377 @@ +@_spi(JSObject_id) import JavaScriptKit +import _CJavaScriptKit + +#if canImport(Synchronization) + import Synchronization +#endif + +/// A temporary object intended to send a JavaScript object from one thread to another. +/// +/// `JSSending` provides a way to safely transfer or clone JavaScript objects between threads +/// in a multi-threaded WebAssembly environment. +/// +/// There are two primary ways to use `JSSending`: +/// 1. Transfer an object (`JSSending.transfer`) - The original object becomes unusable +/// 2. Clone an object (`JSSending.init`) - Creates a copy, original remains usable +/// +/// To receive a sent object on the destination thread, call the `receive()` method. +/// +/// - Note: `JSSending` is `Sendable` and can be safely shared across thread boundaries. +/// +/// ## Example +/// +/// ```swift +/// // Transfer an object to another thread +/// let buffer = JSObject.global.Uint8Array.function!.new(100).buffer.object! +/// let transferring = JSSending.transfer(buffer) +/// +/// // Receive the object on a worker thread +/// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) +/// Task(executorPreference: executor) { +/// let receivedBuffer = try await transferring.receive() +/// // Use the received buffer +/// } +/// +/// // Clone an object for use in another thread +/// let object = JSObject.global.Object.function!.new() +/// object["test"] = "Hello, World!" +/// let cloning = JSSending(object) +/// +/// Task(executorPreference: executor) { +/// let receivedObject = try await cloning.receive() +/// // Use the received object +/// } +/// ``` +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +public struct JSSending<T>: @unchecked Sendable { + fileprivate struct Storage { + /// The original object that is sent. + /// + /// Retain it here to prevent it from being released before the sending is complete. + let sourceObject: JSObject + /// A function that constructs an object from a JavaScript object reference. + let construct: (_ object: JSObject) -> T + /// The JavaScript object reference of the original object. + let idInSource: JavaScriptObjectRef + /// The TID of the thread that owns the original object. + let sourceTid: Int32 + /// Whether the object should be "transferred" or "cloned". + let transferring: Bool + } + + private let storage: Storage + + fileprivate init( + sourceObject: T, + construct: @escaping (_ object: JSObject) -> T, + deconstruct: @escaping (_ object: T) -> JSObject, + getSourceTid: @escaping (_ object: T) -> Int32, + transferring: Bool + ) { + let object = deconstruct(sourceObject) + self.storage = Storage( + sourceObject: object, + construct: construct, + idInSource: object.id, + sourceTid: getSourceTid(sourceObject), + transferring: transferring + ) + } +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension JSSending where T == JSObject { + private init(_ object: JSObject, transferring: Bool) { + self.init( + sourceObject: object, + construct: { $0 }, + deconstruct: { $0 }, + getSourceTid: { + #if compiler(>=6.1) && _runtime(_multithreaded) + return $0.ownerTid + #else + _ = $0 + // On single-threaded runtime, source and destination threads are always the main thread (TID = -1). + return -1 + #endif + }, + transferring: transferring + ) + } + + /// Transfers a `JSObject` to another thread. + /// + /// The original `JSObject` is ["transferred"](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects) + /// to the receiving thread, which means its ownership is completely moved. After transferring, + /// the original object becomes neutered (unusable) in the source thread. + /// + /// This is more efficient than cloning for large objects like `ArrayBuffer` because no copying + /// is involved, but the original object can no longer be accessed. + /// + /// Only objects that implement the JavaScript [Transferable](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects) + /// interface can be transferred. Common transferable objects include: + /// - `ArrayBuffer` + /// - `MessagePort` + /// - `ImageBitmap` + /// - `OffscreenCanvas` + /// + /// ## Example + /// + /// ```swift + /// let buffer = JSObject.global.Uint8Array.function!.new(100).buffer.object! + /// let transferring = JSSending.transfer(buffer) + /// + /// // After transfer, the original buffer is neutered + /// // buffer.byteLength.number! will be 0 + /// ``` + /// + /// - Precondition: The thread calling this method should have the ownership of the `JSObject`. + /// - Postcondition: The original `JSObject` is no longer owned by the thread, further access to it + /// on the thread that called this method is invalid and will result in undefined behavior. + /// + /// - Parameter object: The `JSObject` to be transferred. + /// - Returns: A `JSSending` instance that can be shared across threads. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public static func transfer(_ object: JSObject) -> JSSending { + JSSending(object, transferring: true) + } + + /// Clones a `JSObject` to another thread. + /// + /// Creates a copy of the object that can be sent to another thread. The original object + /// remains usable in the source thread. This is safer than transferring when you need + /// to continue using the original object, but has higher memory overhead since it creates + /// a complete copy. + /// + /// Most JavaScript objects can be cloned, but some complex objects including closures may + /// not be clonable. + /// + /// ## Example + /// + /// ```swift + /// let object = JSObject.global.Object.function!.new() + /// object["test"] = "Hello, World!" + /// let cloning = JSSending(object) + /// + /// // Original object is still valid and usable + /// // object["test"].string! is still "Hello, World!" + /// ``` + /// + /// - Precondition: The thread calling this method should have the ownership of the `JSObject`. + /// - Parameter object: The `JSObject` to be cloned. + /// - Returns: A `JSSending` instance that can be shared across threads. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public init(_ object: JSObject) { + self.init(object, transferring: false) + } +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension JSSending { + + /// Receives a sent `JSObject` from a thread. + /// + /// This method completes the transfer or clone operation, making the object available + /// in the receiving thread. It must be called on the destination thread where you want + /// to use the object. + /// + /// - Important: This method should be called only once for each `JSSending` instance. + /// Attempting to receive the same object multiple times will result in an error. + /// + /// ## Example - Transferring + /// + /// ```swift + /// let canvas = JSObject.global.document.createElement("canvas").object! + /// let transferring = JSSending.transfer(canvas.transferControlToOffscreen().object!) + /// + /// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + /// Task(executorPreference: executor) { + /// let canvas = try await transferring.receive() + /// // Use the canvas in the worker thread + /// } + /// ``` + /// + /// ## Example - Cloning + /// + /// ```swift + /// let data = JSObject.global.Object.function!.new() + /// data["value"] = 42 + /// let cloning = JSSending(data) + /// + /// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + /// Task(executorPreference: executor) { + /// let data = try await cloning.receive() + /// print(data["value"].number!) // 42 + /// } + /// ``` + /// + /// - Parameter isolation: The actor isolation context for this call, used in Swift concurrency. + /// - Returns: The received object of type `T`. + /// - Throws: `JSSendingError` if the sending operation fails, or `JSException` if a JavaScript error occurs. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public func receive(isolation: isolated (any Actor)? = #isolation, file: StaticString = #file, line: UInt = #line) async throws -> T { + #if compiler(>=6.1) && _runtime(_multithreaded) + let idInDestination = try await withCheckedThrowingContinuation { continuation in + let context = _JSSendingContext(continuation: continuation) + let idInSource = self.storage.idInSource + let transferring = self.storage.transferring ? [idInSource] : [] + swjs_request_sending_object( + idInSource, + transferring, + Int32(transferring.count), + self.storage.sourceTid, + Unmanaged.passRetained(context).toOpaque() + ) + } + return storage.construct(JSObject(id: idInDestination)) + #else + return storage.construct(storage.sourceObject) + #endif + } + + // 6.0 and below can't compile the following without a compiler crash. + #if compiler(>=6.1) + /// Receives multiple `JSSending` instances from a thread in a single operation. + /// + /// This method is more efficient than receiving multiple objects individually, as it + /// batches the receive operations. It's especially useful when transferring or cloning + /// multiple related objects that need to be received together. + /// + /// - Important: All objects being received must come from the same source thread. + /// + /// ## Example + /// + /// ```swift + /// // Create and transfer multiple objects + /// let buffer1 = Uint8Array.new(10).buffer.object! + /// let buffer2 = Uint8Array.new(20).buffer.object! + /// let transferring1 = JSSending.transfer(buffer1) + /// let transferring2 = JSSending.transfer(buffer2) + /// + /// // Receive both objects in a single operation + /// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + /// Task(executorPreference: executor) { + /// let (receivedBuffer1, receivedBuffer2) = try await JSSending.receive(transferring1, transferring2) + /// // Use both buffers in the worker thread + /// } + /// ``` + /// + /// - Parameters: + /// - sendings: The `JSSending` instances to receive. + /// - isolation: The actor isolation context for this call, used in Swift concurrency. + /// - Returns: A tuple containing the received objects. + /// - Throws: `JSSendingError` if any sending operation fails, or `JSException` if a JavaScript error occurs. + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) + public static func receive<each U>( + _ sendings: repeat JSSending<each U>, + isolation: isolated (any Actor)? = #isolation, file: StaticString = #file, line: UInt = #line + ) async throws -> (repeat each U) where T == (repeat each U) { + #if compiler(>=6.1) && _runtime(_multithreaded) + var sendingObjects: [JavaScriptObjectRef] = [] + var transferringObjects: [JavaScriptObjectRef] = [] + var sourceTid: Int32? + for object in repeat each sendings { + sendingObjects.append(object.storage.idInSource) + if object.storage.transferring { + transferringObjects.append(object.storage.idInSource) + } + if sourceTid == nil { + sourceTid = object.storage.sourceTid + } else { + guard sourceTid == object.storage.sourceTid else { + throw JSSendingError("All objects sent at once must be from the same thread") + } + } + } + let objects = try await withCheckedThrowingContinuation { continuation in + let context = _JSSendingContext(continuation: continuation) + sendingObjects.withUnsafeBufferPointer { sendingObjects in + transferringObjects.withUnsafeBufferPointer { transferringObjects in + swjs_request_sending_objects( + sendingObjects.baseAddress!, + Int32(sendingObjects.count), + transferringObjects.baseAddress!, + Int32(transferringObjects.count), + sourceTid!, + Unmanaged.passRetained(context).toOpaque() + ) + } + } + } + guard let objectsArray = JSArray(JSObject(id: objects)) else { + fatalError("Non-array object received!?") + } + var index = 0 + func extract<R>(_ sending: JSSending<R>) -> R { + let result = objectsArray[index] + index += 1 + return sending.storage.construct(result.object!) + } + return (repeat extract(each sendings)) + #else + return try await (repeat (each sendings).receive()) + #endif + } + #endif // compiler(>=6.1) +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +fileprivate final class _JSSendingContext: Sendable { + let continuation: CheckedContinuation<JavaScriptObjectRef, Error> + + init(continuation: CheckedContinuation<JavaScriptObjectRef, Error>) { + self.continuation = continuation + } +} + +/// Error type representing failures during JavaScript object sending operations. +/// +/// This error is thrown when a problem occurs during object transfer or cloning +/// between threads, such as attempting to send objects from different threads +/// in a batch operation or other sending-related failures. +public struct JSSendingError: Error, CustomStringConvertible { + /// A description of the error that occurred. + public let description: String + + init(_ message: String) { + self.description = message + } +} + +/// A function that should be called when an object source thread sends an object to a +/// destination thread. +/// +/// - Parameters: +/// - object: The `JSObject` to be received. +/// - transferring: A pointer to the `Transferring.Storage` instance. +#if compiler(>=6.1) // @_expose and @_extern are only available in Swift 6.1+ +@_expose(wasm, "swjs_receive_response") +@_cdecl("swjs_receive_response") +#endif +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +func _swjs_receive_response(_ object: JavaScriptObjectRef, _ contextPtr: UnsafeRawPointer?) { + #if compiler(>=6.1) && _runtime(_multithreaded) + guard let contextPtr = contextPtr else { return } + let context = Unmanaged<_JSSendingContext>.fromOpaque(contextPtr).takeRetainedValue() + context.continuation.resume(returning: object) + #endif +} + +/// A function that should be called when an object source thread sends an error to a +/// destination thread. +/// +/// - Parameters: +/// - error: The error to be received. +/// - transferring: A pointer to the `Transferring.Storage` instance. +#if compiler(>=6.1) // @_expose and @_extern are only available in Swift 6.1+ +@_expose(wasm, "swjs_receive_error") +@_cdecl("swjs_receive_error") +#endif +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +func _swjs_receive_error(_ error: JavaScriptObjectRef, _ contextPtr: UnsafeRawPointer?) { + #if compiler(>=6.1) && _runtime(_multithreaded) + guard let contextPtr = contextPtr else { return } + let context = Unmanaged<_JSSendingContext>.fromOpaque(contextPtr).takeRetainedValue() + context.continuation.resume(throwing: JSException(JSObject(id: error).jsValue)) + #endif +} diff --git a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift index 14b13eee9..7373b9604 100644 --- a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift +++ b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift @@ -16,6 +16,34 @@ import _CJavaScriptEventLoop /// A task executor that runs tasks on Web Worker threads. /// +/// The `WebWorkerTaskExecutor` provides a way to execute Swift tasks in parallel across multiple +/// Web Worker threads, enabling true multi-threaded execution in WebAssembly environments. +/// This allows CPU-intensive tasks to be offloaded from the main thread, keeping the user +/// interface responsive. +/// +/// ## Multithreading Model +/// +/// Each task submitted to the executor runs on one of the available worker threads. By default, +/// child tasks created within a worker thread continue to run on the same worker thread, +/// maintaining thread locality and avoiding excessive context switching. +/// +/// ## Object Sharing Between Threads +/// +/// When working with JavaScript objects across threads, you must use the `JSSending` API to +/// explicitly transfer or clone objects: +/// +/// ```swift +/// // Create and transfer an object to a worker thread +/// let buffer = JSObject.global.ArrayBuffer.function!.new(1024).object! +/// let transferring = JSSending.transfer(buffer) +/// +/// let task = Task(executorPreference: executor) { +/// // Receive the transferred buffer in the worker +/// let workerBuffer = try await transferring.receive() +/// // Use the buffer in the worker thread +/// } +/// ``` +/// /// ## Prerequisites /// /// This task executor is designed to work with [wasi-threads](https://github.com/WebAssembly/wasi-threads) @@ -24,22 +52,40 @@ import _CJavaScriptEventLoop /// from spawned Web Workers, and forward the message to the main thread /// by calling `_swjs_enqueue_main_job_from_worker`. /// -/// ## Usage +/// ## Basic Usage /// /// ```swift -/// let executor = WebWorkerTaskExecutor(numberOfThreads: 4) +/// // Create an executor with 4 worker threads +/// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 4) /// defer { executor.terminate() } /// +/// // Execute a task on a worker thread +/// let task = Task(executorPreference: executor) { +/// // This runs on a worker thread +/// return performHeavyComputation() +/// } +/// let result = await task.value +/// +/// // Run a block on a worker thread /// await withTaskExecutorPreference(executor) { -/// // This block runs on the Web Worker thread. -/// await withTaskGroup(of: Int.self) { group in +/// // This entire block runs on a worker thread +/// performHeavyComputation() +/// } +/// +/// // Execute multiple tasks in parallel +/// await withTaskGroup(of: Int.self) { group in /// for i in 0..<10 { -/// // Structured child works are executed on the Web Worker thread. -/// group.addTask { fibonacci(of: i) } +/// group.addTask(executorPreference: executor) { +/// // Each task runs on a worker thread +/// return fibonacci(i) +/// } +/// } +/// +/// for await result in group { +/// // Process results as they complete /// } -/// } /// } -/// ```` +/// ``` /// /// ## Known limitations /// @@ -359,36 +405,89 @@ public final class WebWorkerTaskExecutor: TaskExecutor { private let executor: Executor - /// Create a new Web Worker task executor. + /// Creates a new Web Worker task executor with the specified number of worker threads. + /// + /// This initializer creates a pool of Web Worker threads that can execute Swift tasks + /// in parallel. The initialization is asynchronous because it waits for all worker + /// threads to be properly initialized before returning. + /// + /// The number of threads should typically match the number of available CPU cores + /// for CPU-bound workloads. For I/O-bound workloads, you might benefit from more + /// threads than CPU cores. + /// + /// ## Example + /// + /// ```swift + /// // Create an executor with 4 worker threads + /// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 4) + /// + /// // Always terminate the executor when you're done with it + /// defer { executor.terminate() } + /// + /// // Use the executor... + /// ``` /// /// - Parameters: /// - numberOfThreads: The number of Web Worker threads to spawn. - /// - timeout: The timeout to wait for all worker threads to be started. - /// - checkInterval: The interval to check if all worker threads are started. + /// - timeout: The maximum time to wait for all worker threads to be started. Default is 3 seconds. + /// - checkInterval: The interval to check if all worker threads are started. Default is 5 microseconds. + /// - Throws: An error if any worker thread fails to initialize within the timeout period. public init(numberOfThreads: Int, timeout: Duration = .seconds(3), checkInterval: Duration = .microseconds(5)) async throws { self.executor = Executor(numberOfThreads: numberOfThreads) try await self.executor.start(timeout: timeout, checkInterval: checkInterval) } - /// Terminate child Web Worker threads. - /// Jobs enqueued to the executor after calling this method will be ignored. + /// Terminates all worker threads managed by this executor. + /// + /// This method should be called when the executor is no longer needed to free up + /// resources. After calling this method, any tasks enqueued to this executor will + /// be ignored and may never complete. + /// + /// It's recommended to use a `defer` statement immediately after creating the executor + /// to ensure it's properly terminated when it goes out of scope. + /// + /// ## Example + /// + /// ```swift + /// do { + /// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 4) + /// defer { executor.terminate() } + /// + /// // Use the executor... + /// } + /// // Executor is automatically terminated when exiting the scope + /// ``` /// - /// NOTE: This method must be called after all tasks that prefer this executor are done. - /// Otherwise, the tasks may stuck forever. + /// - Important: This method must be called after all tasks that prefer this executor are done. + /// Otherwise, the tasks may stuck forever. public func terminate() { executor.terminate() } - /// The number of Web Worker threads. + /// Returns the number of worker threads managed by this executor. + /// + /// This property reflects the value provided during initialization and doesn't change + /// during the lifetime of the executor. + /// + /// ## Example + /// + /// ```swift + /// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 4) + /// print("Executor is running with \(executor.numberOfThreads) threads") + /// // Prints: "Executor is running with 4 threads" + /// ``` public var numberOfThreads: Int { executor.numberOfThreads } // MARK: TaskExecutor conformance - /// Enqueue a job to the executor. + /// Enqueues a job to be executed by one of the worker threads. + /// + /// This method is part of the `TaskExecutor` protocol and is called by the Swift + /// Concurrency runtime. You typically don't need to call this method directly. /// - /// NOTE: Called from the Swift Concurrency runtime. + /// - Parameter job: The job to enqueue. public func enqueue(_ job: UnownedJob) { Self.traceStatsIncrement(\.enqueueExecutor) executor.enqueue(job) @@ -431,9 +530,23 @@ public final class WebWorkerTaskExecutor: TaskExecutor { @MainActor private static var _swift_task_enqueueGlobalWithDelay_hook_original: UnsafeMutableRawPointer? @MainActor private static var _swift_task_enqueueGlobalWithDeadline_hook_original: UnsafeMutableRawPointer? - /// Install a global executor that forwards jobs from Web Worker threads to the main thread. + /// Installs a global executor that forwards jobs from Web Worker threads to the main thread. + /// + /// This method sets up the necessary hooks to ensure proper task scheduling between + /// the main thread and worker threads. It must be called once (typically at application + /// startup) before using any `WebWorkerTaskExecutor` instances. + /// + /// ## Example + /// + /// ```swift + /// // At application startup + /// WebWorkerTaskExecutor.installGlobalExecutor() + /// + /// // Later, create and use executor instances + /// let executor = try await WebWorkerTaskExecutor(numberOfThreads: 4) + /// ``` /// - /// This function must be called once before using the Web Worker task executor. + /// - Important: This method must be called from the main thread. public static func installGlobalExecutor() { MainActor.assumeIsolated { installGlobalExecutorIsolated() diff --git a/Sources/JavaScriptKit/FundamentalObjects/JSObject.swift b/Sources/JavaScriptKit/FundamentalObjects/JSObject.swift index f74b337d8..0958b33f4 100644 --- a/Sources/JavaScriptKit/FundamentalObjects/JSObject.swift +++ b/Sources/JavaScriptKit/FundamentalObjects/JSObject.swift @@ -1,13 +1,5 @@ import _CJavaScriptKit -#if arch(wasm32) - #if canImport(wasi_pthread) - import wasi_pthread - #endif -#else - import Foundation // for pthread_t on non-wasi platforms -#endif - /// `JSObject` represents an object in JavaScript and supports dynamic member lookup. /// Any member access like `object.foo` will dynamically request the JavaScript and Swift /// runtime bridge library for a member with the specified name in this object. @@ -31,14 +23,14 @@ public class JSObject: Equatable { public var id: JavaScriptObjectRef #if compiler(>=6.1) && _runtime(_multithreaded) - private let ownerThread: pthread_t + package let ownerTid: Int32 #endif @_spi(JSObject_id) public init(id: JavaScriptObjectRef) { self.id = id #if compiler(>=6.1) && _runtime(_multithreaded) - self.ownerThread = pthread_self() + self.ownerTid = swjs_get_worker_thread_id_cached() #endif } @@ -51,14 +43,14 @@ public class JSObject: Equatable { /// object spaces are not shared across threads backed by Web Workers. private func assertOnOwnerThread(hint: @autoclosure () -> String) { #if compiler(>=6.1) && _runtime(_multithreaded) - precondition(pthread_equal(ownerThread, pthread_self()) != 0, "JSObject is being accessed from a thread other than the owner thread: \(hint())") + precondition(ownerTid == swjs_get_worker_thread_id_cached(), "JSObject is being accessed from a thread other than the owner thread: \(hint())") #endif } /// Asserts that the two objects being compared are owned by the same thread. private static func assertSameOwnerThread(lhs: JSObject, rhs: JSObject, hint: @autoclosure () -> String) { #if compiler(>=6.1) && _runtime(_multithreaded) - precondition(pthread_equal(lhs.ownerThread, rhs.ownerThread) != 0, "JSObject is being accessed from a thread other than the owner thread: \(hint())") + precondition(lhs.ownerTid == rhs.ownerTid, "JSObject is being accessed from a thread other than the owner thread: \(hint())") #endif } @@ -211,7 +203,13 @@ public class JSObject: Equatable { }) deinit { - assertOnOwnerThread(hint: "deinitializing") + #if compiler(>=6.1) && _runtime(_multithreaded) + if ownerTid != swjs_get_worker_thread_id_cached() { + // If the object is not owned by the current thread + swjs_release_remote(ownerTid, id) + return + } + #endif swjs_release(id) } diff --git a/Sources/JavaScriptKit/Runtime/index.js b/Sources/JavaScriptKit/Runtime/index.js index 223fed3e1..25b6af3c9 100644 --- a/Sources/JavaScriptKit/Runtime/index.js +++ b/Sources/JavaScriptKit/Runtime/index.js @@ -25,6 +25,7 @@ function assertNever(x, message) { throw new Error(message); } + const MAIN_THREAD_TID = -1; const decode = (kind, payload1, payload2, memory) => { switch (kind) { @@ -121,6 +122,13 @@ } throw new Error("Unreachable"); }; + function decodeObjectRefs(ptr, length, memory) { + const result = new Array(length); + for (let i = 0; i < length; i++) { + result[i] = memory.readUint32(ptr + 4 * i); + } + return result; + } let globalVariable; if (typeof globalThis !== "undefined") { @@ -195,6 +203,110 @@ } } + class ITCInterface { + constructor(memory) { + this.memory = memory; + } + send(sendingObject, transferringObjects, sendingContext) { + const object = this.memory.getObject(sendingObject); + const transfer = transferringObjects.map(ref => this.memory.getObject(ref)); + return { object, sendingContext, transfer }; + } + sendObjects(sendingObjects, transferringObjects, sendingContext) { + const objects = sendingObjects.map(ref => this.memory.getObject(ref)); + const transfer = transferringObjects.map(ref => this.memory.getObject(ref)); + return { object: objects, sendingContext, transfer }; + } + release(objectRef) { + this.memory.release(objectRef); + return { object: undefined, transfer: [] }; + } + } + class MessageBroker { + constructor(selfTid, threadChannel, handlers) { + this.selfTid = selfTid; + this.threadChannel = threadChannel; + this.handlers = handlers; + } + request(message) { + if (message.data.targetTid == this.selfTid) { + // The request is for the current thread + this.handlers.onRequest(message); + } + else if ("postMessageToWorkerThread" in this.threadChannel) { + // The request is for another worker thread sent from the main thread + this.threadChannel.postMessageToWorkerThread(message.data.targetTid, message, []); + } + else if ("postMessageToMainThread" in this.threadChannel) { + // The request is for other worker threads or the main thread sent from a worker thread + this.threadChannel.postMessageToMainThread(message, []); + } + else { + throw new Error("unreachable"); + } + } + reply(message) { + if (message.data.sourceTid == this.selfTid) { + // The response is for the current thread + this.handlers.onResponse(message); + return; + } + const transfer = message.data.response.ok ? message.data.response.value.transfer : []; + if ("postMessageToWorkerThread" in this.threadChannel) { + // The response is for another worker thread sent from the main thread + this.threadChannel.postMessageToWorkerThread(message.data.sourceTid, message, transfer); + } + else if ("postMessageToMainThread" in this.threadChannel) { + // The response is for other worker threads or the main thread sent from a worker thread + this.threadChannel.postMessageToMainThread(message, transfer); + } + else { + throw new Error("unreachable"); + } + } + onReceivingRequest(message) { + if (message.data.targetTid == this.selfTid) { + this.handlers.onRequest(message); + } + else if ("postMessageToWorkerThread" in this.threadChannel) { + // Receive a request from a worker thread to other worker on main thread. + // Proxy the request to the target worker thread. + this.threadChannel.postMessageToWorkerThread(message.data.targetTid, message, []); + } + else if ("postMessageToMainThread" in this.threadChannel) { + // A worker thread won't receive a request for other worker threads + throw new Error("unreachable"); + } + } + onReceivingResponse(message) { + if (message.data.sourceTid == this.selfTid) { + this.handlers.onResponse(message); + } + else if ("postMessageToWorkerThread" in this.threadChannel) { + // Receive a response from a worker thread to other worker on main thread. + // Proxy the response to the target worker thread. + const transfer = message.data.response.ok ? message.data.response.value.transfer : []; + this.threadChannel.postMessageToWorkerThread(message.data.sourceTid, message, transfer); + } + else if ("postMessageToMainThread" in this.threadChannel) { + // A worker thread won't receive a response for other worker threads + throw new Error("unreachable"); + } + } + } + function serializeError(error) { + if (error instanceof Error) { + return { isError: true, value: { message: error.message, name: error.name, stack: error.stack } }; + } + return { isError: false, value: error }; + } + function deserializeError(error) { + if (error.isError) { + return Object.assign(new Error(error.value.message), error.value); + } + return error.value; + } + class SwiftRuntime { constructor(options) { this.version = 708; @@ -312,6 +424,57 @@ return output; } get wasmImports() { + let broker = null; + const getMessageBroker = (threadChannel) => { + var _a; + if (broker) + return broker; + const itcInterface = new ITCInterface(this.memory); + const newBroker = new MessageBroker((_a = this.tid) !== null && _a !== void 0 ? _a : -1, threadChannel, { + onRequest: (message) => { + let returnValue; + try { + // @ts-ignore + const result = itcInterface[message.data.request.method](...message.data.request.parameters); + returnValue = { ok: true, value: result }; + } + catch (error) { + returnValue = { ok: false, error: serializeError(error) }; + } + const responseMessage = { + type: "response", + data: { + sourceTid: message.data.sourceTid, + context: message.data.context, + response: returnValue, + }, + }; + try { + newBroker.reply(responseMessage); + } + catch (error) { + responseMessage.data.response = { + ok: false, + error: serializeError(new TypeError(`Failed to serialize message: ${error}`)) + }; + newBroker.reply(responseMessage); + } + }, + onResponse: (message) => { + if (message.data.response.ok) { + const object = this.memory.retain(message.data.response.value.object); + this.exports.swjs_receive_response(object, message.data.context); + } + else { + const error = deserializeError(message.data.response.error); + const errorObject = this.memory.retain(error); + this.exports.swjs_receive_error(errorObject, message.data.context); + } + } + }); + broker = newBroker; + return newBroker; + }; return { swjs_set_prop: (ref, name, kind, payload1, payload2) => { const memory = this.memory; @@ -473,6 +636,25 @@ swjs_release: (ref) => { this.memory.release(ref); }, + swjs_release_remote: (tid, ref) => { + var _a; + if (!this.options.threadChannel) { + throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to release objects on remote threads."); + } + const broker = getMessageBroker(this.options.threadChannel); + broker.request({ + type: "request", + data: { + sourceTid: (_a = this.tid) !== null && _a !== void 0 ? _a : MAIN_THREAD_TID, + targetTid: tid, + context: 0, + request: { + method: "release", + parameters: [ref], + } + } + }); + }, swjs_i64_to_bigint: (value, signed) => { return this.memory.retain(signed ? value : BigInt.asUintN(64, value)); }, @@ -507,13 +689,22 @@ if (!(threadChannel && "listenMessageFromMainThread" in threadChannel)) { throw new Error("listenMessageFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread."); } + const broker = getMessageBroker(threadChannel); threadChannel.listenMessageFromMainThread((message) => { switch (message.type) { case "wake": this.exports.swjs_wake_worker_thread(); break; + case "request": { + broker.onReceivingRequest(message); + break; + } + case "response": { + broker.onReceivingResponse(message); + break; + } default: - const unknownMessage = message.type; + const unknownMessage = message; throw new Error(`Unknown message type: ${unknownMessage}`); } }); @@ -526,13 +717,22 @@ if (!(threadChannel && "listenMessageFromWorkerThread" in threadChannel)) { throw new Error("listenMessageFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); } + const broker = getMessageBroker(threadChannel); threadChannel.listenMessageFromWorkerThread(tid, (message) => { switch (message.type) { case "job": this.exports.swjs_enqueue_main_job_from_worker(message.data); break; + case "request": { + broker.onReceivingRequest(message); + break; + } + case "response": { + broker.onReceivingResponse(message); + break; + } default: - const unknownMessage = message.type; + const unknownMessage = message; throw new Error(`Unknown message type: ${unknownMessage}`); } }); @@ -548,21 +748,64 @@ // Main thread's tid is always -1 return this.tid || -1; }, + swjs_request_sending_object: (sending_object, transferring_objects, transferring_objects_count, object_source_tid, sending_context) => { + var _a; + if (!this.options.threadChannel) { + throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to request transferring objects."); + } + const broker = getMessageBroker(this.options.threadChannel); + const memory = this.memory; + const transferringObjects = decodeObjectRefs(transferring_objects, transferring_objects_count, memory); + broker.request({ + type: "request", + data: { + sourceTid: (_a = this.tid) !== null && _a !== void 0 ? _a : MAIN_THREAD_TID, + targetTid: object_source_tid, + context: sending_context, + request: { + method: "send", + parameters: [sending_object, transferringObjects, sending_context], + } + } + }); + }, + swjs_request_sending_objects: (sending_objects, sending_objects_count, transferring_objects, transferring_objects_count, object_source_tid, sending_context) => { + var _a; + if (!this.options.threadChannel) { + throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to request transferring objects."); + } + const broker = getMessageBroker(this.options.threadChannel); + const memory = this.memory; + const sendingObjects = decodeObjectRefs(sending_objects, sending_objects_count, memory); + const transferringObjects = decodeObjectRefs(transferring_objects, transferring_objects_count, memory); + broker.request({ + type: "request", + data: { + sourceTid: (_a = this.tid) !== null && _a !== void 0 ? _a : MAIN_THREAD_TID, + targetTid: object_source_tid, + context: sending_context, + request: { + method: "sendObjects", + parameters: [sendingObjects, transferringObjects, sending_context], + } + } + }); + }, }; } - postMessageToMainThread(message) { + postMessageToMainThread(message, transfer = []) { const threadChannel = this.options.threadChannel; if (!(threadChannel && "postMessageToMainThread" in threadChannel)) { throw new Error("postMessageToMainThread is not set in options given to SwiftRuntime. Please set it to send messages to the main thread."); } - threadChannel.postMessageToMainThread(message); + threadChannel.postMessageToMainThread(message, transfer); } - postMessageToWorkerThread(tid, message) { + postMessageToWorkerThread(tid, message, transfer = []) { const threadChannel = this.options.threadChannel; if (!(threadChannel && "postMessageToWorkerThread" in threadChannel)) { throw new Error("postMessageToWorkerThread is not set in options given to SwiftRuntime. Please set it to send messages to worker threads."); } - threadChannel.postMessageToWorkerThread(tid, message); + threadChannel.postMessageToWorkerThread(tid, message, transfer); } } /// This error is thrown when yielding event loop control from `swift_task_asyncMainDrainQueue` diff --git a/Sources/JavaScriptKit/Runtime/index.mjs b/Sources/JavaScriptKit/Runtime/index.mjs index 34e4dd13f..668368203 100644 --- a/Sources/JavaScriptKit/Runtime/index.mjs +++ b/Sources/JavaScriptKit/Runtime/index.mjs @@ -19,6 +19,7 @@ class SwiftClosureDeallocator { function assertNever(x, message) { throw new Error(message); } +const MAIN_THREAD_TID = -1; const decode = (kind, payload1, payload2, memory) => { switch (kind) { @@ -115,6 +116,13 @@ const writeAndReturnKindBits = (value, payload1_ptr, payload2_ptr, is_exception, } throw new Error("Unreachable"); }; +function decodeObjectRefs(ptr, length, memory) { + const result = new Array(length); + for (let i = 0; i < length; i++) { + result[i] = memory.readUint32(ptr + 4 * i); + } + return result; +} let globalVariable; if (typeof globalThis !== "undefined") { @@ -189,6 +197,110 @@ class Memory { } } +class ITCInterface { + constructor(memory) { + this.memory = memory; + } + send(sendingObject, transferringObjects, sendingContext) { + const object = this.memory.getObject(sendingObject); + const transfer = transferringObjects.map(ref => this.memory.getObject(ref)); + return { object, sendingContext, transfer }; + } + sendObjects(sendingObjects, transferringObjects, sendingContext) { + const objects = sendingObjects.map(ref => this.memory.getObject(ref)); + const transfer = transferringObjects.map(ref => this.memory.getObject(ref)); + return { object: objects, sendingContext, transfer }; + } + release(objectRef) { + this.memory.release(objectRef); + return { object: undefined, transfer: [] }; + } +} +class MessageBroker { + constructor(selfTid, threadChannel, handlers) { + this.selfTid = selfTid; + this.threadChannel = threadChannel; + this.handlers = handlers; + } + request(message) { + if (message.data.targetTid == this.selfTid) { + // The request is for the current thread + this.handlers.onRequest(message); + } + else if ("postMessageToWorkerThread" in this.threadChannel) { + // The request is for another worker thread sent from the main thread + this.threadChannel.postMessageToWorkerThread(message.data.targetTid, message, []); + } + else if ("postMessageToMainThread" in this.threadChannel) { + // The request is for other worker threads or the main thread sent from a worker thread + this.threadChannel.postMessageToMainThread(message, []); + } + else { + throw new Error("unreachable"); + } + } + reply(message) { + if (message.data.sourceTid == this.selfTid) { + // The response is for the current thread + this.handlers.onResponse(message); + return; + } + const transfer = message.data.response.ok ? message.data.response.value.transfer : []; + if ("postMessageToWorkerThread" in this.threadChannel) { + // The response is for another worker thread sent from the main thread + this.threadChannel.postMessageToWorkerThread(message.data.sourceTid, message, transfer); + } + else if ("postMessageToMainThread" in this.threadChannel) { + // The response is for other worker threads or the main thread sent from a worker thread + this.threadChannel.postMessageToMainThread(message, transfer); + } + else { + throw new Error("unreachable"); + } + } + onReceivingRequest(message) { + if (message.data.targetTid == this.selfTid) { + this.handlers.onRequest(message); + } + else if ("postMessageToWorkerThread" in this.threadChannel) { + // Receive a request from a worker thread to other worker on main thread. + // Proxy the request to the target worker thread. + this.threadChannel.postMessageToWorkerThread(message.data.targetTid, message, []); + } + else if ("postMessageToMainThread" in this.threadChannel) { + // A worker thread won't receive a request for other worker threads + throw new Error("unreachable"); + } + } + onReceivingResponse(message) { + if (message.data.sourceTid == this.selfTid) { + this.handlers.onResponse(message); + } + else if ("postMessageToWorkerThread" in this.threadChannel) { + // Receive a response from a worker thread to other worker on main thread. + // Proxy the response to the target worker thread. + const transfer = message.data.response.ok ? message.data.response.value.transfer : []; + this.threadChannel.postMessageToWorkerThread(message.data.sourceTid, message, transfer); + } + else if ("postMessageToMainThread" in this.threadChannel) { + // A worker thread won't receive a response for other worker threads + throw new Error("unreachable"); + } + } +} +function serializeError(error) { + if (error instanceof Error) { + return { isError: true, value: { message: error.message, name: error.name, stack: error.stack } }; + } + return { isError: false, value: error }; +} +function deserializeError(error) { + if (error.isError) { + return Object.assign(new Error(error.value.message), error.value); + } + return error.value; +} + class SwiftRuntime { constructor(options) { this.version = 708; @@ -306,6 +418,57 @@ class SwiftRuntime { return output; } get wasmImports() { + let broker = null; + const getMessageBroker = (threadChannel) => { + var _a; + if (broker) + return broker; + const itcInterface = new ITCInterface(this.memory); + const newBroker = new MessageBroker((_a = this.tid) !== null && _a !== void 0 ? _a : -1, threadChannel, { + onRequest: (message) => { + let returnValue; + try { + // @ts-ignore + const result = itcInterface[message.data.request.method](...message.data.request.parameters); + returnValue = { ok: true, value: result }; + } + catch (error) { + returnValue = { ok: false, error: serializeError(error) }; + } + const responseMessage = { + type: "response", + data: { + sourceTid: message.data.sourceTid, + context: message.data.context, + response: returnValue, + }, + }; + try { + newBroker.reply(responseMessage); + } + catch (error) { + responseMessage.data.response = { + ok: false, + error: serializeError(new TypeError(`Failed to serialize message: ${error}`)) + }; + newBroker.reply(responseMessage); + } + }, + onResponse: (message) => { + if (message.data.response.ok) { + const object = this.memory.retain(message.data.response.value.object); + this.exports.swjs_receive_response(object, message.data.context); + } + else { + const error = deserializeError(message.data.response.error); + const errorObject = this.memory.retain(error); + this.exports.swjs_receive_error(errorObject, message.data.context); + } + } + }); + broker = newBroker; + return newBroker; + }; return { swjs_set_prop: (ref, name, kind, payload1, payload2) => { const memory = this.memory; @@ -467,6 +630,25 @@ class SwiftRuntime { swjs_release: (ref) => { this.memory.release(ref); }, + swjs_release_remote: (tid, ref) => { + var _a; + if (!this.options.threadChannel) { + throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to release objects on remote threads."); + } + const broker = getMessageBroker(this.options.threadChannel); + broker.request({ + type: "request", + data: { + sourceTid: (_a = this.tid) !== null && _a !== void 0 ? _a : MAIN_THREAD_TID, + targetTid: tid, + context: 0, + request: { + method: "release", + parameters: [ref], + } + } + }); + }, swjs_i64_to_bigint: (value, signed) => { return this.memory.retain(signed ? value : BigInt.asUintN(64, value)); }, @@ -501,13 +683,22 @@ class SwiftRuntime { if (!(threadChannel && "listenMessageFromMainThread" in threadChannel)) { throw new Error("listenMessageFromMainThread is not set in options given to SwiftRuntime. Please set it to listen to wake events from the main thread."); } + const broker = getMessageBroker(threadChannel); threadChannel.listenMessageFromMainThread((message) => { switch (message.type) { case "wake": this.exports.swjs_wake_worker_thread(); break; + case "request": { + broker.onReceivingRequest(message); + break; + } + case "response": { + broker.onReceivingResponse(message); + break; + } default: - const unknownMessage = message.type; + const unknownMessage = message; throw new Error(`Unknown message type: ${unknownMessage}`); } }); @@ -520,13 +711,22 @@ class SwiftRuntime { if (!(threadChannel && "listenMessageFromWorkerThread" in threadChannel)) { throw new Error("listenMessageFromWorkerThread is not set in options given to SwiftRuntime. Please set it to listen to jobs from worker threads."); } + const broker = getMessageBroker(threadChannel); threadChannel.listenMessageFromWorkerThread(tid, (message) => { switch (message.type) { case "job": this.exports.swjs_enqueue_main_job_from_worker(message.data); break; + case "request": { + broker.onReceivingRequest(message); + break; + } + case "response": { + broker.onReceivingResponse(message); + break; + } default: - const unknownMessage = message.type; + const unknownMessage = message; throw new Error(`Unknown message type: ${unknownMessage}`); } }); @@ -542,21 +742,64 @@ class SwiftRuntime { // Main thread's tid is always -1 return this.tid || -1; }, + swjs_request_sending_object: (sending_object, transferring_objects, transferring_objects_count, object_source_tid, sending_context) => { + var _a; + if (!this.options.threadChannel) { + throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to request transferring objects."); + } + const broker = getMessageBroker(this.options.threadChannel); + const memory = this.memory; + const transferringObjects = decodeObjectRefs(transferring_objects, transferring_objects_count, memory); + broker.request({ + type: "request", + data: { + sourceTid: (_a = this.tid) !== null && _a !== void 0 ? _a : MAIN_THREAD_TID, + targetTid: object_source_tid, + context: sending_context, + request: { + method: "send", + parameters: [sending_object, transferringObjects, sending_context], + } + } + }); + }, + swjs_request_sending_objects: (sending_objects, sending_objects_count, transferring_objects, transferring_objects_count, object_source_tid, sending_context) => { + var _a; + if (!this.options.threadChannel) { + throw new Error("threadChannel is not set in options given to SwiftRuntime. Please set it to request transferring objects."); + } + const broker = getMessageBroker(this.options.threadChannel); + const memory = this.memory; + const sendingObjects = decodeObjectRefs(sending_objects, sending_objects_count, memory); + const transferringObjects = decodeObjectRefs(transferring_objects, transferring_objects_count, memory); + broker.request({ + type: "request", + data: { + sourceTid: (_a = this.tid) !== null && _a !== void 0 ? _a : MAIN_THREAD_TID, + targetTid: object_source_tid, + context: sending_context, + request: { + method: "sendObjects", + parameters: [sendingObjects, transferringObjects, sending_context], + } + } + }); + }, }; } - postMessageToMainThread(message) { + postMessageToMainThread(message, transfer = []) { const threadChannel = this.options.threadChannel; if (!(threadChannel && "postMessageToMainThread" in threadChannel)) { throw new Error("postMessageToMainThread is not set in options given to SwiftRuntime. Please set it to send messages to the main thread."); } - threadChannel.postMessageToMainThread(message); + threadChannel.postMessageToMainThread(message, transfer); } - postMessageToWorkerThread(tid, message) { + postMessageToWorkerThread(tid, message, transfer = []) { const threadChannel = this.options.threadChannel; if (!(threadChannel && "postMessageToWorkerThread" in threadChannel)) { throw new Error("postMessageToWorkerThread is not set in options given to SwiftRuntime. Please set it to send messages to worker threads."); } - threadChannel.postMessageToWorkerThread(tid, message); + threadChannel.postMessageToWorkerThread(tid, message, transfer); } } /// This error is thrown when yielding event loop control from `swift_task_asyncMainDrainQueue` diff --git a/Sources/_CJavaScriptKit/_CJavaScriptKit.c b/Sources/_CJavaScriptKit/_CJavaScriptKit.c index ea8b5b43d..ed8240ca1 100644 --- a/Sources/_CJavaScriptKit/_CJavaScriptKit.c +++ b/Sources/_CJavaScriptKit/_CJavaScriptKit.c @@ -59,5 +59,13 @@ __attribute__((export_name("swjs_library_features"))) int swjs_library_features(void) { return _library_features(); } + +int swjs_get_worker_thread_id_cached(void) { + _Thread_local static int tid = 0; + if (tid == 0) { + tid = swjs_get_worker_thread_id(); + } + return tid; +} #endif #endif diff --git a/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h b/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h index 5cb6e6037..2b96a81ea 100644 --- a/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h +++ b/Sources/_CJavaScriptKit/include/_CJavaScriptKit.h @@ -290,6 +290,12 @@ IMPORT_JS_FUNCTION(swjs_load_typed_array, void, (const JavaScriptObjectRef ref, /// @param ref The target JavaScript object. IMPORT_JS_FUNCTION(swjs_release, void, (const JavaScriptObjectRef ref)) +/// Decrements reference count of `ref` retained by `SwiftRuntimeHeap` in `object_tid` thread. +/// +/// @param object_tid The TID of the thread that owns the target object. +/// @param ref The target JavaScript object. +IMPORT_JS_FUNCTION(swjs_release_remote, void, (int object_tid, const JavaScriptObjectRef ref)) + /// Yields current program control by throwing `UnsafeEventLoopYield` JavaScript exception. /// See note on `UnsafeEventLoopYield` for more details /// @@ -308,4 +314,22 @@ IMPORT_JS_FUNCTION(swjs_terminate_worker_thread, void, (int tid)) IMPORT_JS_FUNCTION(swjs_get_worker_thread_id, int, (void)) +int swjs_get_worker_thread_id_cached(void); + +/// Requests sending a JavaScript object to another worker thread. +/// +/// This must be called from the destination thread of the transfer. +IMPORT_JS_FUNCTION(swjs_request_sending_object, void, (JavaScriptObjectRef sending_object, + const JavaScriptObjectRef * _Nonnull transferring_objects, + int transferring_objects_count, + int object_source_tid, + void * _Nonnull sending_context)) + +IMPORT_JS_FUNCTION(swjs_request_sending_objects, void, (const JavaScriptObjectRef * _Nonnull sending_objects, + int sending_objects_count, + const JavaScriptObjectRef * _Nonnull transferring_objects, + int transferring_objects_count, + int object_source_tid, + void * _Nonnull sending_context)) + #endif /* _CJavaScriptKit_h */ diff --git a/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift b/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift index 3848ba4cc..16cfd6374 100644 --- a/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift +++ b/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift @@ -9,7 +9,7 @@ func isMainThread() -> Bool final class WebWorkerTaskExecutorTests: XCTestCase { override func setUp() async { - await WebWorkerTaskExecutor.installGlobalExecutor() + WebWorkerTaskExecutor.installGlobalExecutor() } func testTaskRunOnMainThread() async throws { @@ -264,6 +264,175 @@ final class WebWorkerTaskExecutorTests: XCTestCase { executor.terminate() } + func testSendingWithoutReceiving() async throws { + let object = JSObject.global.Object.function!.new() + _ = JSSending.transfer(object) + _ = JSSending(object) + } + + func testTransferMainToWorker() async throws { + let Uint8Array = JSObject.global.Uint8Array.function! + let buffer = Uint8Array.new(100).buffer.object! + let transferring = JSSending.transfer(buffer) + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + let task = Task(executorPreference: executor) { + let buffer = try await transferring.receive() + return buffer.byteLength.number! + } + let byteLength = try await task.value + XCTAssertEqual(byteLength, 100) + + // Transferred Uint8Array should have 0 byteLength + XCTAssertEqual(buffer.byteLength.number!, 0) + } + + func testTransferWorkerToMain() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + let task = Task(executorPreference: executor) { + let Uint8Array = JSObject.global.Uint8Array.function! + let buffer = Uint8Array.new(100).buffer.object! + let transferring = JSSending.transfer(buffer) + return transferring + } + let transferring = await task.value + let buffer = try await transferring.receive() + XCTAssertEqual(buffer.byteLength.number!, 100) + } + + func testTransferNonTransferable() async throws { + let object = JSObject.global.Object.function!.new() + let transferring = JSSending.transfer(object) + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + let task = Task<String?, Error>(executorPreference: executor) { + do { + _ = try await transferring.receive() + return nil + } catch let error as JSException { + return error.thrownValue.description + } + } + guard let jsErrorMessage = try await task.value else { + XCTFail("Should throw an error") + return + } + XCTAssertTrue(jsErrorMessage.contains("Failed to serialize message"), jsErrorMessage) + } + + /* + // Node.js 20 and below doesn't throw exception when transferring the same ArrayBuffer + // multiple times. + // See https://github.com/nodejs/node/commit/38dee8a1c04237bd231a01410f42e9d172f4c162 + func testTransferMultipleTimes() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + let Uint8Array = JSObject.global.Uint8Array.function! + let buffer = Uint8Array.new(100).buffer.object! + let transferring = JSSending.transfer(buffer) + let task1 = Task(executorPreference: executor) { + let buffer = try await transferring.receive() + return buffer.byteLength.number! + } + let byteLength1 = try await task1.value + XCTAssertEqual(byteLength1, 100) + + let task2 = Task<String?, Never>(executorPreference: executor) { + do { + _ = try await transferring.receive() + return nil + } catch { + return String(describing: error) + } + } + guard let jsErrorMessage = await task2.value else { + XCTFail("Should throw an error") + return + } + XCTAssertTrue(jsErrorMessage.contains("Failed to serialize message")) + } + */ + + func testCloneMultipleTimes() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + let object = JSObject.global.Object.function!.new() + object["test"] = "Hello, World!" + + for _ in 0..<2 { + let cloning = JSSending(object) + let task = Task(executorPreference: executor) { + let object = try await cloning.receive() + return object["test"].string! + } + let result = try await task.value + XCTAssertEqual(result, "Hello, World!") + } + } + + func testTransferBetweenWorkers() async throws { + let executor1 = try await WebWorkerTaskExecutor(numberOfThreads: 1) + let executor2 = try await WebWorkerTaskExecutor(numberOfThreads: 1) + let task = Task(executorPreference: executor1) { + let Uint8Array = JSObject.global.Uint8Array.function! + let buffer = Uint8Array.new(100).buffer.object! + let transferring = JSSending.transfer(buffer) + return transferring + } + let transferring = await task.value + let task2 = Task(executorPreference: executor2) { + let buffer = try await transferring.receive() + return buffer.byteLength.number! + } + let byteLength = try await task2.value + XCTAssertEqual(byteLength, 100) + } + + func testTransferMultipleItems() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + let Uint8Array = JSObject.global.Uint8Array.function! + let buffer1 = Uint8Array.new(10).buffer.object! + let buffer2 = Uint8Array.new(11).buffer.object! + let transferring1 = JSSending.transfer(buffer1) + let transferring2 = JSSending.transfer(buffer2) + let task = Task(executorPreference: executor) { + let (buffer1, buffer2) = try await JSSending.receive(transferring1, transferring2) + return (buffer1.byteLength.number!, buffer2.byteLength.number!) + } + let (byteLength1, byteLength2) = try await task.value + XCTAssertEqual(byteLength1, 10) + XCTAssertEqual(byteLength2, 11) + XCTAssertEqual(buffer1.byteLength.number!, 0) + XCTAssertEqual(buffer2.byteLength.number!, 0) + + // Mix transferring and cloning + let buffer3 = Uint8Array.new(12).buffer.object! + let buffer4 = Uint8Array.new(13).buffer.object! + let transferring3 = JSSending.transfer(buffer3) + let cloning4 = JSSending(buffer4) + let task2 = Task(executorPreference: executor) { + let (buffer3, buffer4) = try await JSSending.receive(transferring3, cloning4) + return (buffer3.byteLength.number!, buffer4.byteLength.number!) + } + let (byteLength3, byteLength4) = try await task2.value + XCTAssertEqual(byteLength3, 12) + XCTAssertEqual(byteLength4, 13) + XCTAssertEqual(buffer3.byteLength.number!, 0) + XCTAssertEqual(buffer4.byteLength.number!, 13) + } + + func testCloneObjectToWorker() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + let object = JSObject.global.Object.function!.new() + object["test"] = "Hello, World!" + let cloning = JSSending(object) + let task = Task(executorPreference: executor) { + let object = try await cloning.receive() + return object["test"].string! + } + let result = try await task.value + XCTAssertEqual(result, "Hello, World!") + + // Further access to the original object is valid + XCTAssertEqual(object["test"].string!, "Hello, World!") + } + /* func testDeinitJSObjectOnDifferentThread() async throws { let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)