@@ -356,7 +356,9 @@ QueueMessage::QueueMessage(
356
356
attempts(message.attempts),
357
357
result(result) {}
358
358
359
- jsg::JsValue QueueMessage::getBody (jsg::Lock& js) { return body.getHandle (js); }
359
+ jsg::JsValue QueueMessage::getBody (jsg::Lock& js) {
360
+ return body.getHandle (js);
361
+ }
360
362
361
363
void QueueMessage::retry (jsg::Optional<QueueRetryOptions> options) {
362
364
if (result->ackAll ) {
@@ -459,15 +461,20 @@ void QueueEvent::ackAll() {
459
461
}
460
462
461
463
namespace {
462
- jsg::Ref<QueueEvent> startQueueEvent (EventTarget& globalEventTarget,
464
+
465
+ struct StartQueueEventResponse {
466
+ jsg::Ref<QueueEvent> event = nullptr ;
467
+ kj::Maybe<kj::Promise<void >> exportedHandlerProm;
468
+ bool isServiceWorkerHandler = false ;
469
+ };
470
+
471
+ StartQueueEventResponse startQueueEvent (EventTarget& globalEventTarget,
463
472
kj::OneOf<rpc::EventDispatcher::QueueParams::Reader, QueueEvent::Params> params,
464
473
IoPtr<QueueEventResult> result,
465
474
Worker::Lock& lock,
466
475
kj::Maybe<ExportedHandler&> exportedHandler,
467
476
const jsg::TypeHandler<QueueExportedHandler>& handlerHandler) {
468
477
jsg::Lock& js = lock;
469
- // Start a queue event (called from C++, not JS). Similar to startScheduled(), the caller must
470
- // wait for waitUntil()s to produce the final QueueResult.
471
478
jsg::Ref<QueueEvent> event (nullptr );
472
479
KJ_SWITCH_ONEOF (params) {
473
480
KJ_CASE_ONEOF (p, rpc::EventDispatcher::QueueParams::Reader) {
@@ -478,23 +485,31 @@ jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,
478
485
}
479
486
}
480
487
488
+ kj::Maybe<kj::Promise<void >> exportedHandlerProm;
489
+ bool isServiceWorkerHandler = false ;
481
490
KJ_IF_SOME (h, exportedHandler) {
482
491
auto queueHandler = KJ_ASSERT_NONNULL (handlerHandler.tryUnwrap (lock, h.self .getHandle (lock)));
483
492
KJ_IF_SOME (f, queueHandler.queue ) {
484
493
auto promise = f (lock, jsg::alloc<QueueController>(event.addRef ()),
485
- jsg::JsValue (h.env .getHandle (js)).addRef (js), h.getCtx ());
486
- event-> waitUntil (promise .then ([event = event.addRef ()]() mutable {
494
+ jsg::JsValue (h.env .getHandle (js)).addRef (js), h.getCtx ())
495
+ .then ([event = event.addRef ()]() mutable {
487
496
event->setCompletionStatus (QueueEvent::CompletedSuccessfully{});
488
497
}, [event = event.addRef ()](kj::Exception&& e) mutable {
489
498
event->setCompletionStatus (QueueEvent::CompletedWithError{kj::cp (e)});
490
499
return kj::mv (e);
491
- }));
500
+ });
501
+ if (FeatureFlags::get (js).getQueueConsumerNoWaitForWaitUntil ()) {
502
+ exportedHandlerProm = kj::mv (promise);
503
+ } else {
504
+ event->waitUntil (kj::mv (promise));
505
+ }
492
506
} else {
493
507
lock.logWarningOnce (" Received a QueueEvent but we lack a handler for QueueEvents. "
494
508
" Did you remember to export a queue() function?" );
495
509
JSG_FAIL_REQUIRE (Error, " Handler does not export a queue() function." );
496
510
}
497
511
} else {
512
+ isServiceWorkerHandler = true ;
498
513
if (globalEventTarget.getHandlerCount (" queue" ) == 0 ) {
499
514
lock.logWarningOnce (" Received a QueueEvent but we lack an event listener for queue events. "
500
515
" Did you remember to call addEventListener(\" queue\" , ...)?" );
@@ -504,8 +519,10 @@ jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,
504
519
event->setCompletionStatus (QueueEvent::CompletedSuccessfully{});
505
520
}
506
521
507
- return event.addRef ();
522
+ return StartQueueEventResponse{
523
+ kj::mv (event), kj::mv (exportedHandlerProm), isServiceWorkerHandler};
508
524
}
525
+
509
526
} // namespace
510
527
511
528
kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run (
@@ -515,6 +532,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
515
532
kj::TaskSet& waitUntilTasks) {
516
533
incomingRequest->delivered ();
517
534
auto & context = incomingRequest->getContext ();
535
+ KJ_DEFER ({ waitUntilTasks.add (incomingRequest->drain ().attach (kj::mv (incomingRequest))); });
518
536
519
537
kj::String queueName;
520
538
uint32_t batchSize;
@@ -542,6 +560,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
542
560
// waitUntil'ed callback safely without worrying about whether this coroutine gets canceled.
543
561
struct QueueEventHolder : public kj ::Refcounted {
544
562
jsg::Ref<QueueEvent> event = nullptr ;
563
+ kj::Maybe<kj::Promise<void >> exportedHandlerProm;
564
+ bool isServiceWorkerHandler = false ;
545
565
};
546
566
auto queueEventHolder = kj::refcounted<QueueEventHolder>();
547
567
@@ -555,16 +575,61 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
555
575
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope (lock);
556
576
557
577
auto & typeHandler = lock.getWorker ().getIsolate ().getApi ().getQueueTypeHandler (lock);
558
- queueEvent-> event = startQueueEvent (lock.getGlobalScope (), kj::mv (params),
578
+ auto startResp = startQueueEvent (lock.getGlobalScope (), kj::mv (params),
559
579
context.addObject (result), lock,
560
580
lock.getExportedHandler (entrypointName, kj::mv (props), context.getActor ()), typeHandler);
581
+ queueEvent->event = kj::mv (startResp.event );
582
+ queueEvent->exportedHandlerProm = kj::mv (startResp.exportedHandlerProm );
583
+ queueEvent->isServiceWorkerHandler = startResp.isServiceWorkerHandler ;
561
584
});
562
585
563
586
auto compatFlags = context.getWorker ().getIsolate ().getApi ().getFeatureFlags ();
564
587
if (compatFlags.getQueueConsumerNoWaitForWaitUntil ()) {
565
588
// The user has opted in to only waiting on their event handler rather than all waitUntil'd
566
589
// promises.
567
- KJ_UNIMPLEMENTED (" TODO(now)" );
590
+ auto timeoutPromise = context.getLimitEnforcer ().limitScheduled ();
591
+ // Start invoking the queue handler. The promise chain here is intended to mimic the behavior of
592
+ // finishScheduled, but only waiting on the promise returned by the event handler rather than on
593
+ // all waitUntil'ed promises.
594
+ auto outcome = co_await runProm
595
+ .then ([queueEvent = kj::addRef (
596
+ *queueEventHolder)]() mutable -> kj::Promise<EventOutcome> {
597
+ // If it returned a promise, wait on the promise.
598
+ KJ_IF_SOME (handlerProm, queueEvent->exportedHandlerProm ) {
599
+ return handlerProm.then ([]() { return EventOutcome::OK; });
600
+ }
601
+ return EventOutcome::OK;
602
+ })
603
+ .catch_ ([](kj::Exception&& e) {
604
+ // If any exceptions were thrown, mark the outcome accordingly.
605
+ return EventOutcome::EXCEPTION;
606
+ })
607
+ .exclusiveJoin (timeoutPromise.then ([] {
608
+ // Join everything against a timeout to ensure queue handlers can't run forever.
609
+ return EventOutcome::EXCEEDED_CPU;
610
+ })).exclusiveJoin (context.onAbort ().then ([] {
611
+ // Also handle anything that might cause the worker to get aborted.
612
+ // This is a change from the outcome we returned on abort before the compat flag, but better
613
+ // matches the behavior of fetch() handlers and the semantics of what's actually happening.
614
+ return EventOutcome::EXCEPTION;
615
+ }, [](kj::Exception&&) { return EventOutcome::EXCEPTION; }));
616
+
617
+ if (outcome == EventOutcome::OK && queueEventHolder->isServiceWorkerHandler ) {
618
+ // HACK: For service-worker syntax, we effectively ignore the compatibility flag and wait
619
+ // for all waitUntil tasks anyway, since otherwise there's no way to do async work from an
620
+ // event listener callback.
621
+ // It'd be nicer if we could fall through to the code below for the non-compat-flag logic in
622
+ // this case, but we don't even know if the worker uses service worker syntax until after
623
+ // runProm resolves, so we just copy the bare essentials here.
624
+ auto result = co_await incomingRequest->finishScheduled ();
625
+ bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;
626
+ outcome = completed ? context.waitUntilStatus () : EventOutcome::EXCEEDED_CPU;
627
+ }
628
+
629
+ KJ_IF_SOME (status, context.getLimitEnforcer ().getLimitsExceeded ()) {
630
+ outcome = status;
631
+ }
632
+ co_return WorkerInterface::CustomEvent::Result{.outcome = outcome};
568
633
} else {
569
634
// The user has not opted in to the new waitUntil behavior, so we need to add the queue()
570
635
// handler's promise to the waitUntil promises and then wait on them all to finish.
0 commit comments