Skip to content

Commit e42dfd5

Browse files
Ensure a job enqueued on a worker must be run within the same macro task
1 parent 3b92b8c commit e42dfd5

File tree

2 files changed

+215
-20
lines changed

2 files changed

+215
-20
lines changed

Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift

+51-20
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ import WASILibc
8787
/// }
8888
/// ```
8989
///
90+
/// ## Scheduling invariants
91+
///
92+
/// * Jobs enqueued on a worker are guaranteed to run within the same macrotask in which they were scheduled.
93+
///
9094
/// ## Known limitations
9195
///
9296
/// Currently, the Cooperative Global Executor of Swift runtime has a bug around
@@ -147,10 +151,12 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
147151
enum State: UInt32, AtomicRepresentable {
148152
/// The worker is idle and waiting for a new job.
149153
case idle = 0
154+
/// A wake message is sent to the worker, but it has not been received it yet
155+
case ready = 1
150156
/// The worker is processing a job.
151-
case running = 1
157+
case running = 2
152158
/// The worker is terminated.
153-
case terminated = 2
159+
case terminated = 3
154160
}
155161
let state: Atomic<State> = Atomic(.idle)
156162
/// TODO: Rewrite it to use real queue :-)
@@ -197,32 +203,42 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
197203
func enqueue(_ job: UnownedJob) {
198204
statsIncrement(\.enqueuedJobs)
199205
var locked: Bool
206+
let onTargetThread = Self.currentThread === self
207+
// If it's on the thread and it's idle, we can directly schedule a `Worker/run` microtask.
208+
let desiredState: State = onTargetThread ? .running : .ready
200209
repeat {
201210
let result: Void? = jobQueue.withLockIfAvailable { queue in
202211
queue.append(job)
212+
trace("Worker.enqueue idle -> running")
203213
// Wake up the worker to process a job.
204-
switch state.exchange(.running, ordering: .sequentiallyConsistent) {
205-
case .idle:
206-
if Self.currentThread === self {
214+
trace("Worker.enqueue idle -> \(desiredState)")
215+
switch state.compareExchange(expected: .idle, desired: desiredState, ordering: .sequentiallyConsistent) {
216+
case (true, _):
217+
if onTargetThread {
207218
// Enqueueing a new job to the current worker thread, but it's idle now.
208219
// This is usually the case when a continuation is resumed by JS events
209220
// like `setTimeout` or `addEventListener`.
210221
// We can run the job and subsequently spawned jobs immediately.
211-
// JSPromise.resolve(JSValue.undefined).then { _ in
212-
_ = JSObject.global.queueMicrotask!(
213-
JSOneshotClosure { _ in
214-
self.run()
215-
return JSValue.undefined
216-
}
217-
)
222+
scheduleRunWithinMacroTask()
218223
} else {
219224
let tid = self.tid.load(ordering: .sequentiallyConsistent)
220225
swjs_wake_up_worker_thread(tid)
221226
}
222-
case .running:
227+
case (false, .idle):
228+
preconditionFailure("unreachable: idle -> \(desiredState) should return exchanged=true")
229+
case (false, .ready):
230+
// A wake message is sent to the worker, but it has not been received it yet
231+
if onTargetThread {
232+
// This means the job is enqueued outside of `Worker/run` (typically triggered
233+
// JS microtasks not awaited by Swift), then schedule a `Worker/run` within
234+
// the same macrotask.
235+
state.store(.running, ordering: .sequentiallyConsistent)
236+
scheduleRunWithinMacroTask()
237+
}
238+
case (false, .running):
223239
// The worker is already running, no need to wake up.
224240
break
225-
case .terminated:
241+
case (false, .terminated):
226242
// Will not wake up the worker because it's already terminated.
227243
break
228244
}
@@ -231,7 +247,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
231247
} while !locked
232248
}
233249

234-
func scheduleNextRun() {
250+
func scheduleRunWithinMacroTask() {
235251
_ = JSObject.global.queueMicrotask!(
236252
JSOneshotClosure { _ in
237253
self.run()
@@ -265,12 +281,27 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
265281
trace("Worker.start tid=\(tid)")
266282
}
267283

284+
/// On receiving a wake-up message from other thread
285+
func wakeUpFromOtherThread() {
286+
let (exchanged, _) = state.compareExchange(
287+
expected: .ready,
288+
desired: .running,
289+
ordering: .sequentiallyConsistent
290+
)
291+
guard exchanged else {
292+
// `Worker/run` was scheduled on the thread before JS event loop starts
293+
// a macrotask handling wake-up message.
294+
return
295+
}
296+
run()
297+
}
298+
268299
/// Process jobs in the queue.
269300
///
270301
/// Return when the worker has no more jobs to run or terminated.
271302
/// This method must be called from the worker thread after the worker
272303
/// is started by `start(executor:)`.
273-
func run() {
304+
private func run() {
274305
trace("Worker.run")
275306
guard let executor = parentTaskExecutor else {
276307
preconditionFailure("The worker must be started with a parent executor.")
@@ -290,7 +321,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
290321
queue.removeFirst()
291322
return job
292323
}
293-
// No more jobs to run now. Wait for a new job to be enqueued.
324+
// No more jobs to run now.
294325
let (exchanged, original) = state.compareExchange(
295326
expected: .running,
296327
desired: .idle,
@@ -301,7 +332,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
301332
case (true, _):
302333
trace("Worker.run exited \(original) -> idle")
303334
return nil // Regular case
304-
case (false, .idle):
335+
case (false, .idle), (false, .ready):
305336
preconditionFailure("unreachable: Worker/run running in multiple threads!?")
306337
case (false, .running):
307338
preconditionFailure("unreachable: running -> idle should return exchanged=true")
@@ -657,12 +688,12 @@ func _swjs_enqueue_main_job_from_worker(_ job: UnownedJob) {
657688
@_expose(wasm, "swjs_wake_worker_thread")
658689
#endif
659690
func _swjs_wake_worker_thread() {
660-
WebWorkerTaskExecutor.Worker.currentThread!.run()
691+
WebWorkerTaskExecutor.Worker.currentThread!.wakeUpFromOtherThread()
661692
}
662693

663694
private func trace(_ message: String) {
664695
#if JAVASCRIPTKIT_TRACE
665-
JSObject.global.process.stdout.write("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n")
696+
_ = JSObject.global.console.warn("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n")
666697
#endif
667698
}
668699

Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift

+164
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#if compiler(>=6.1) && _runtime(_multithreaded)
2+
import Synchronization
23
import XCTest
34
import _CJavaScriptKit // For swjs_get_worker_thread_id
45
@testable import JavaScriptKit
@@ -22,6 +23,7 @@ func pthread_mutex_lock(_ mutex: UnsafeMutablePointer<pthread_mutex_t>) -> Int32
2223
}
2324
#endif
2425

26+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
2527
final class WebWorkerTaskExecutorTests: XCTestCase {
2628
func testTaskRunOnMainThread() async throws {
2729
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
@@ -97,6 +99,168 @@ final class WebWorkerTaskExecutorTests: XCTestCase {
9799
executor.terminate()
98100
}
99101

102+
func testScheduleJobWithinMacroTask1() async throws {
103+
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
104+
defer { executor.terminate() }
105+
106+
final class Context: @unchecked Sendable {
107+
let hasEndedFirstWorkerWakeLoop = Atomic<Bool>(false)
108+
let hasEnqueuedFromMain = Atomic<Bool>(false)
109+
let hasReachedNextMacroTask = Atomic<Bool>(false)
110+
let hasJobBEnded = Atomic<Bool>(false)
111+
let hasJobCEnded = Atomic<Bool>(false)
112+
}
113+
114+
// Scenario 1.
115+
// | Main | Worker |
116+
// | +---------------------+--------------------------+
117+
// | | | Start JS macrotask |
118+
// | | | Start 1st wake-loop |
119+
// | | | Enq JS microtask A |
120+
// | | | End 1st wake-loop |
121+
// | | | Start a JS microtask A |
122+
// time | Enq job B to Worker | [PAUSE] |
123+
// | | | Enq Swift job C |
124+
// | | | End JS microtask A |
125+
// | | | Start 2nd wake-loop |
126+
// | | | Run Swift job B |
127+
// | | | Run Swift job C |
128+
// | | | End 2nd wake-loop |
129+
// v | | End JS macrotask |
130+
// +---------------------+--------------------------+
131+
132+
let context = Context()
133+
Task {
134+
while !context.hasEndedFirstWorkerWakeLoop.load(ordering: .sequentiallyConsistent) {
135+
try! await Task.sleep(nanoseconds: 1_000)
136+
}
137+
// Enqueue job B to Worker
138+
Task(executorPreference: executor) {
139+
XCTAssertFalse(isMainThread())
140+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
141+
context.hasJobBEnded.store(true, ordering: .sequentiallyConsistent)
142+
}
143+
XCTAssertTrue(isMainThread())
144+
// Resume worker thread to let it enqueue job C
145+
context.hasEnqueuedFromMain.store(true, ordering: .sequentiallyConsistent)
146+
}
147+
148+
// Start worker
149+
await Task(executorPreference: executor) {
150+
// Schedule a new macrotask to detect if the current macrotask has completed
151+
JSObject.global.setTimeout.function!(JSOneshotClosure { _ in
152+
context.hasReachedNextMacroTask.store(true, ordering: .sequentiallyConsistent)
153+
return .undefined
154+
}, 0)
155+
156+
// Enqueue a microtask, not managed by WebWorkerTaskExecutor
157+
JSObject.global.queueMicrotask.function!(JSOneshotClosure { _ in
158+
// Resume the main thread and let it enqueue job B
159+
context.hasEndedFirstWorkerWakeLoop.store(true, ordering: .sequentiallyConsistent)
160+
// Wait until the enqueue has completed
161+
while !context.hasEnqueuedFromMain.load(ordering: .sequentiallyConsistent) {}
162+
// Should be still in the same macrotask
163+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
164+
// Enqueue job C
165+
Task(executorPreference: executor) {
166+
// Should be still in the same macrotask
167+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
168+
// Notify that job C has completed
169+
context.hasJobCEnded.store(true, ordering: .sequentiallyConsistent)
170+
}
171+
return .undefined
172+
}, 0)
173+
// Wait until job B, C and the next macrotask have completed
174+
while !context.hasJobBEnded.load(ordering: .sequentiallyConsistent) ||
175+
!context.hasJobCEnded.load(ordering: .sequentiallyConsistent) ||
176+
!context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent) {
177+
try! await Task.sleep(nanoseconds: 1_000)
178+
}
179+
}.value
180+
}
181+
182+
func testScheduleJobWithinMacroTask2() async throws {
183+
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
184+
defer { executor.terminate() }
185+
186+
final class Context: @unchecked Sendable {
187+
let hasEndedFirstWorkerWakeLoop = Atomic<Bool>(false)
188+
let hasEnqueuedFromMain = Atomic<Bool>(false)
189+
let hasReachedNextMacroTask = Atomic<Bool>(false)
190+
let hasJobBEnded = Atomic<Bool>(false)
191+
let hasJobCEnded = Atomic<Bool>(false)
192+
}
193+
194+
// Scenario 2.
195+
// (The order of enqueue of job B and C are reversed from Scenario 1)
196+
//
197+
// | Main | Worker |
198+
// | +---------------------+--------------------------+
199+
// | | | Start JS macrotask |
200+
// | | | Start 1st wake-loop |
201+
// | | | Enq JS microtask A |
202+
// | | | End 1st wake-loop |
203+
// | | | Start a JS microtask A |
204+
// | | | Enq Swift job C |
205+
// time | Enq job B to Worker | [PAUSE] |
206+
// | | | End JS microtask A |
207+
// | | | Start 2nd wake-loop |
208+
// | | | Run Swift job B |
209+
// | | | Run Swift job C |
210+
// | | | End 2nd wake-loop |
211+
// v | | End JS macrotask |
212+
// +---------------------+--------------------------+
213+
214+
let context = Context()
215+
Task {
216+
while !context.hasEndedFirstWorkerWakeLoop.load(ordering: .sequentiallyConsistent) {
217+
try! await Task.sleep(nanoseconds: 1_000)
218+
}
219+
// Enqueue job B to Worker
220+
Task(executorPreference: executor) {
221+
XCTAssertFalse(isMainThread())
222+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
223+
context.hasJobBEnded.store(true, ordering: .sequentiallyConsistent)
224+
}
225+
XCTAssertTrue(isMainThread())
226+
// Resume worker thread to let it enqueue job C
227+
context.hasEnqueuedFromMain.store(true, ordering: .sequentiallyConsistent)
228+
}
229+
230+
// Start worker
231+
await Task(executorPreference: executor) {
232+
// Schedule a new macrotask to detect if the current macrotask has completed
233+
JSObject.global.setTimeout.function!(JSOneshotClosure { _ in
234+
context.hasReachedNextMacroTask.store(true, ordering: .sequentiallyConsistent)
235+
return .undefined
236+
}, 0)
237+
238+
// Enqueue a microtask, not managed by WebWorkerTaskExecutor
239+
JSObject.global.queueMicrotask.function!(JSOneshotClosure { _ in
240+
// Enqueue job C
241+
Task(executorPreference: executor) {
242+
// Should be still in the same macrotask
243+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
244+
// Notify that job C has completed
245+
context.hasJobCEnded.store(true, ordering: .sequentiallyConsistent)
246+
}
247+
// Resume the main thread and let it enqueue job B
248+
context.hasEndedFirstWorkerWakeLoop.store(true, ordering: .sequentiallyConsistent)
249+
// Wait until the enqueue has completed
250+
while !context.hasEnqueuedFromMain.load(ordering: .sequentiallyConsistent) {}
251+
// Should be still in the same macrotask
252+
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
253+
return .undefined
254+
}, 0)
255+
// Wait until job B, C and the next macrotask have completed
256+
while !context.hasJobBEnded.load(ordering: .sequentiallyConsistent) ||
257+
!context.hasJobCEnded.load(ordering: .sequentiallyConsistent) ||
258+
!context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent) {
259+
try! await Task.sleep(nanoseconds: 1_000)
260+
}
261+
}.value
262+
}
263+
100264
func testTaskGroupRunOnSameThread() async throws {
101265
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 3)
102266

0 commit comments

Comments
 (0)