Skip to content

Commit

Permalink
refactor message handler and add waiting timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
toyobayashi committed May 10, 2024
1 parent 047aec3 commit a619356
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 95 deletions.
4 changes: 2 additions & 2 deletions packages/core/src/emnapi/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export declare interface NapiModule {
executeAsyncWork (work: number): void
postMessage?: (msg: any) => any

waitThreadStart: boolean
waitThreadStart: boolean | number
/** @internal */
PThread: ThreadManager
}
Expand Down Expand Up @@ -73,7 +73,7 @@ export declare type BaseCreateOptions = {
nodeBinding?: NodeBinding
reuseWorker?: boolean
asyncWorkPoolSize?: number
waitThreadStart?: boolean
waitThreadStart?: boolean | number
onCreateWorker?: (info: CreateWorkerInfo) => any
print?: (str: string) => void
printErr?: (str: string) => void
Expand Down
6 changes: 4 additions & 2 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ export type {
} from './load'

export type {
OnLoadData,
HandleOptions
InstantiatePayload,
MessageHandlerOptions
} from './worker'

export type {
InputType
} from './util'

export * from '@emnapi/wasi-threads'
39 changes: 28 additions & 11 deletions packages/core/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,43 @@
import {
MessageHandler as WASIThreadsMessageHandler,
type HandleOptions,
type OnLoadData
ThreadMessageHandler,
type ThreadMessageHandlerOptions,
type InstantiatePayload
} from '@emnapi/wasi-threads'
import type { NapiModule } from './emnapi/index'
import type { InstantiatedSource } from './load'

export type { HandleOptions, OnLoadData }
export type { ThreadMessageHandlerOptions, InstantiatePayload }

/** @public */
export class MessageHandler extends WASIThreadsMessageHandler {
napiModule: NapiModule | undefined
export interface MessageHandlerOptions extends ThreadMessageHandlerOptions {
onLoad: (data: InstantiatePayload) => InstantiatedSource | PromiseLike<InstantiatedSource>
}

/** @public */
export class MessageHandler extends ThreadMessageHandler {
public napiModule: NapiModule | undefined

constructor (options: HandleOptions) {
public constructor (options: MessageHandlerOptions) {
if (typeof options.onLoad !== 'function') {
throw new TypeError('options.onLoad is not a function')
}
super(options)
this.napiModule = undefined
}

public override instantiate (data: InstantiatePayload): InstantiatedSource | PromiseLike<InstantiatedSource> {
const source = this.onLoad!(data) as InstantiatedSource | PromiseLike<InstantiatedSource>
const then = (source as PromiseLike<InstantiatedSource>).then
if (typeof then === 'function') {
return (source as PromiseLike<InstantiatedSource>).then((result) => {
this.napiModule = result.napiModule
return result
})
}
this.napiModule = (source as InstantiatedSource).napiModule
return source
}

public override handle (e: any): void {
super.handle(e)
if (e?.data?.__emnapi__) {
Expand All @@ -34,8 +55,4 @@ export class MessageHandler extends WASIThreadsMessageHandler {
}
}
}

protected override onLoadSuccess (source: InstantiatedSource): void {
this.napiModule = source.napiModule
}
}
4 changes: 2 additions & 2 deletions packages/emnapi/src/core/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface INapiModule {
executeAsyncWork (work: number): void
postMessage?: (msg: any) => any

waitThreadStart: boolean
waitThreadStart: boolean | number
PThread: ThreadManager
}

Expand All @@ -37,7 +37,7 @@ declare const process: any
export var ENVIRONMENT_IS_NODE = typeof process === 'object' && process !== null && typeof process.versions === 'object' && process.versions !== null && typeof process.versions.node === 'string'
export var ENVIRONMENT_IS_PTHREAD = Boolean(options.childThread)
export var reuseWorker = Boolean(options.reuseWorker)
export var waitThreadStart = Boolean(options.waitThreadStart)
export var waitThreadStart = typeof options.waitThreadStart === 'number' ? options.waitThreadStart : Boolean(options.waitThreadStart)

export var wasmInstance: WebAssembly.Instance
export var wasmModule: WebAssembly.Module
Expand Down
2 changes: 1 addition & 1 deletion packages/emnapi/src/core/scope.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ declare interface CreateOptions {
childThread?: boolean
reuseWorker?: boolean
asyncWorkPoolSize?: number
waitThreadStart?: boolean
waitThreadStart?: boolean | number
onCreateWorker?: () => any
print?: (str: string) => void
printErr?: (str: string) => void
Expand Down
2 changes: 1 addition & 1 deletion packages/test/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ function loadPath (request, options) {
: -RUNTIME_UV_THREADPOOL_SIZE,
filename: request,
reuseWorker: true,
waitThreadStart: true,
waitThreadStart: 1000,
onCreateWorker () {
return new Worker(join(__dirname, './worker.js'), {
env: process.env,
Expand Down
4 changes: 2 additions & 2 deletions packages/wasi-threads/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export type {
} from './wasi-threads'
export { WASIThreads } from './wasi-threads'

export { MessageHandler } from './worker'
export type { OnLoadData, HandleOptions } from './worker'
export { ThreadMessageHandler } from './worker'
export type { InstantiatePayload, ThreadMessageHandlerOptions } from './worker'

export { createInstanceProxy } from './proxy'
4 changes: 2 additions & 2 deletions packages/wasi-threads/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ export const ENVIRONMENT_IS_NODE = typeof process === 'object' && process !== nu
typeof process.versions === 'object' && process.versions !== null &&
typeof process.versions.node === 'string'

export function getPostMessage (options: { postMessage?: (message: any) => void }): ((message: any) => void) | undefined {
return typeof options.postMessage === 'function'
export function getPostMessage (options?: { postMessage?: (message: any) => void }): ((message: any) => void) | undefined {
return typeof options?.postMessage === 'function'
? options.postMessage
: typeof postMessage === 'function'
? postMessage
Expand Down
23 changes: 16 additions & 7 deletions packages/wasi-threads/src/wasi-threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export interface BaseOptions {

/** @public */
export interface MainThreadBaseOptions extends BaseOptions {
waitThreadStart?: boolean
waitThreadStart?: boolean | number
}

/** @public */
Expand Down Expand Up @@ -71,9 +71,9 @@ export class WASIThreads {
}
}

let waitThreadStart = false
let waitThreadStart: boolean | number = false
if ('waitThreadStart' in options) {
waitThreadStart = Boolean(options.waitThreadStart)
waitThreadStart = typeof options.waitThreadStart === 'number' ? options.waitThreadStart : Boolean(options.waitThreadStart)
}

const postMessage = getPostMessage(options as ChildThreadOptions)
Expand Down Expand Up @@ -130,8 +130,10 @@ export class WASIThreads {
return isError ? -result : result
}

const shouldWait = waitThreadStart || (waitThreadStart === 0)

let sab: Int32Array | undefined
if (waitThreadStart) {
if (shouldWait) {
sab = new Int32Array(new SharedArrayBuffer(16 + 8192))
Atomics.store(sab, 0, 0)
}
Expand Down Expand Up @@ -160,8 +162,15 @@ export class WASIThreads {
}
}
})
if (waitThreadStart) {
Atomics.wait(sab!, 0, 0)
if (shouldWait) {
if (typeof waitThreadStart === 'number') {
const waitResult = Atomics.wait(sab!, 0, 0, waitThreadStart)
if (waitResult === 'timed-out') {
throw new Error('Spawning thread timed out. Please check if the worker is created successfully and if message is handled properly in the worker.')
}
} else {
Atomics.wait(sab!, 0, 0)
}
const r = Atomics.load(sab!, 0)
if (r > 1) {
throw deserizeErrorFromBuffer(sab!.buffer as SharedArrayBuffer)!
Expand All @@ -187,7 +196,7 @@ export class WASIThreads {
Atomics.notify(struct, 1)

PThread!.runningWorkers.push(worker)
if (!waitThreadStart) {
if (!shouldWait) {
worker.whenLoaded.catch((err: any) => {
delete worker.whenLoaded
PThread!.cleanThread(worker, tid, true)
Expand Down
46 changes: 22 additions & 24 deletions packages/wasi-threads/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { WorkerMessageEvent } from './thread-manager'
import { getPostMessage, serizeErrorToBuffer } from './util'

/** @public */
export interface OnLoadData {
export interface InstantiatePayload {
wasmModule: WebAssembly.Module
wasmMemory: WebAssembly.Memory
sab?: Int32Array
Expand All @@ -15,34 +15,38 @@ export interface OnStartData {
}

/** @public */
export interface HandleOptions {
onLoad (data: OnLoadData): WebAssembly.WebAssemblyInstantiatedSource | PromiseLike<WebAssembly.WebAssemblyInstantiatedSource>
export interface ThreadMessageHandlerOptions {
onLoad?: (data: InstantiatePayload) => WebAssembly.WebAssemblyInstantiatedSource | PromiseLike<WebAssembly.WebAssemblyInstantiatedSource>
postMessage?: (message: any) => void
}

/** @public */
export class MessageHandler {
onLoad: (data: OnLoadData) => WebAssembly.WebAssemblyInstantiatedSource | PromiseLike<WebAssembly.WebAssemblyInstantiatedSource>
instance: WebAssembly.Instance | undefined
messagesBeforeLoad: any[]
postMessage: (message: any) => void

public constructor (options: HandleOptions) {
const onLoad = options.onLoad
export class ThreadMessageHandler {
protected instance: WebAssembly.Instance | undefined
private messagesBeforeLoad: any[]
protected postMessage: (message: any) => void
protected onLoad?: (data: InstantiatePayload) => WebAssembly.WebAssemblyInstantiatedSource | PromiseLike<WebAssembly.WebAssemblyInstantiatedSource>

public constructor (options?: ThreadMessageHandlerOptions) {
const postMsg = getPostMessage(options)
if (typeof onLoad !== 'function') {
throw new TypeError('options.onLoad is not a function')
}
if (typeof postMsg !== 'function') {
throw new TypeError('options.postMessage is not a function')
}
this.onLoad = onLoad
this.postMessage = postMsg
this.onLoad = options?.onLoad
this.instance = undefined
// this.module = undefined
this.messagesBeforeLoad = []
}

/** @virtual */
public instantiate (data: InstantiatePayload): WebAssembly.WebAssemblyInstantiatedSource | PromiseLike<WebAssembly.WebAssemblyInstantiatedSource> {
if (typeof this.onLoad === 'function') {
return this.onLoad(data)
}
throw new Error('ThreadMessageHandler.prototype.instantiate is not implemented')
}

/** @virtual */
public handle (e: WorkerMessageEvent): void {
if (e?.data?.__emnapi__) {
Expand All @@ -59,12 +63,11 @@ export class MessageHandler {
}
}

private _load (payload: OnLoadData): void {
private _load (payload: InstantiatePayload): void {
if (this.instance !== undefined) return
const onLoad = this.onLoad
let source: WebAssembly.WebAssemblyInstantiatedSource | PromiseLike<WebAssembly.WebAssemblyInstantiatedSource>
try {
source = onLoad(payload)
source = this.instantiate(payload)
} catch (err) {
this._loaded(err, null, payload)
return
Expand Down Expand Up @@ -103,10 +106,7 @@ export class MessageHandler {
})
}

/** @virtual */
protected onLoadSuccess (_source: WebAssembly.WebAssemblyInstantiatedSource): void {}

protected _loaded (err: Error | null, source: WebAssembly.WebAssemblyInstantiatedSource | null, payload: OnLoadData): void {
protected _loaded (err: Error | null, source: WebAssembly.WebAssemblyInstantiatedSource | null, payload: InstantiatePayload): void {
if (err) {
notifyPthreadCreateResult(payload.sab, 2, err)
throw err
Expand All @@ -128,8 +128,6 @@ export class MessageHandler {

this.instance = instance

this.onLoadSuccess(source)

const postMessage = this.postMessage!
postMessage({
__emnapi__: {
Expand Down
Loading

0 comments on commit a619356

Please sign in to comment.