@@ -3450,6 +3450,19 @@ Worker::Actor::Actor(const Worker& worker,
3450
3450
3451
3451
void Worker::Actor::ensureConstructed (IoContext& context) {
3452
3452
KJ_IF_SOME (info, impl->classInstance .tryGet <ActorClassInfo*>()) {
3453
+ // IMPORTANT: We need to set the state to "Initializing" synchronously, before
3454
+ // ensureConstructedImpl() actually executes and acquires the input lock.
3455
+ // This prevents multiple concurrent initialization attempts if multiple calls to
3456
+ // ensureConstructed() arrive back-to-back.
3457
+ //
3458
+ // This doesn't create a race condition with getHandler() because InputGate::wait()
3459
+ // synchronously adds the caller to the wait queue, even though it completes
3460
+ // asynchronously. Any call to getHandler() that arrives after this point will
3461
+ // have to wait for the input lock, which is only acquired and released by
3462
+ // ensureConstructedImpl() when it completes initialization.
3463
+ //
3464
+ // So the "actor still initializing" error in getHandler() should be impossible
3465
+ // unless a code path is bypassing the input lock mechanism.
3453
3466
context.addWaitUntil (ensureConstructedImpl (context, *info));
3454
3467
impl->classInstance = Impl::Initializing ();
3455
3468
}
@@ -3458,56 +3471,56 @@ void Worker::Actor::ensureConstructed(IoContext& context) {
3458
3471
kj::Promise<void > Worker::Actor::ensureConstructedImpl (IoContext& context, ActorClassInfo& info) {
3459
3472
InputGate::Lock inputLock = co_await impl->inputGate .wait ();
3460
3473
3461
- bool containerRunning = false ;
3462
- KJ_IF_SOME (c, impl->container ) {
3463
- // We need to do an RPC to check if the container is running.
3464
- // TODO(perf): It would be nice if we could have started this RPC earlier, e.g. in parallel
3465
- // with starting the script, and also if we could save the status across hibernations. But
3466
- // that would require some refactoring, and this RPC should (eventally) be local, so it's
3467
- // not a huge deal.
3468
- auto status = co_await c.statusRequest (capnp::MessageSize{4 , 0 }).send ();
3469
- containerRunning = status.getRunning ();
3470
- }
3474
+ try {
3475
+ bool containerRunning = false ;
3476
+ KJ_IF_SOME (c, impl->container ) {
3477
+ // We need to do an RPC to check if the container is running.
3478
+ // TODO(perf): It would be nice if we could have started this RPC earlier, e.g. in parallel
3479
+ // with starting the script, and also if we could save the status across hibernations. But
3480
+ // that would require some refactoring, and this RPC should (eventally) be local, so it's
3481
+ // not a huge deal.
3482
+ auto status = co_await c.statusRequest (capnp::MessageSize{4 , 0 }).send ();
3483
+ containerRunning = status.getRunning ();
3484
+ }
3471
3485
3472
- co_await context
3473
- .run (
3474
- [this , &info, containerRunning](Worker::Lock& lock) {
3475
- jsg::Lock& js = lock;
3486
+ co_await context.run ([this , &info, containerRunning](Worker::Lock& lock) {
3487
+ jsg::Lock& js = lock;
3476
3488
3477
- kj::Maybe<jsg::Ref<api::DurableObjectStorage>> storage;
3478
- KJ_IF_SOME (c, impl->actorCache ) {
3479
- storage = impl->makeStorage (lock, worker->getIsolate ().getApi (), *c);
3480
- }
3481
- auto handler = info.cls (lock,
3482
- jsg::alloc<api::DurableObjectState>(cloneId (),
3483
- jsg::JsRef<jsg::JsValue>(
3484
- js, KJ_ASSERT_NONNULL (lock.getWorker ().impl ->ctxExports ).addRef (js)),
3485
- kj::mv (storage), kj::mv (impl->container ), containerRunning),
3486
- KJ_ASSERT_NONNULL (lock.getWorker ().impl ->env ).addRef (js));
3487
-
3488
- // HACK: We set handler.env to undefined because we already passed the real env into the
3489
- // constructor, and we want the handler methods to act like they take just one parameter.
3490
- // We do the same for handler.ctx, as ExecutionContext related tasks are performed
3491
- // on the actor's state field instead.
3492
- handler.env = js.v8Ref (js.v8Undefined ());
3493
- handler.ctx = kj::none;
3494
- handler.missingSuperclass = info.missingSuperclass ;
3495
-
3496
- impl->classInstance = kj::mv (handler);
3497
- }, kj::mv (inputLock))
3498
- .catch_ ([this ](kj::Exception&& e) {
3499
- auto msg = e.getDescription ();
3489
+ kj::Maybe<jsg::Ref<api::DurableObjectStorage>> storage;
3490
+ KJ_IF_SOME (c, impl->actorCache ) {
3491
+ storage = impl->makeStorage (lock, worker->getIsolate ().getApi (), *c);
3492
+ }
3493
+ auto handler = info.cls (lock,
3494
+ jsg::alloc<api::DurableObjectState>(cloneId (),
3495
+ jsg::JsRef<jsg::JsValue>(
3496
+ js, KJ_ASSERT_NONNULL (lock.getWorker ().impl ->ctxExports ).addRef (js)),
3497
+ kj::mv (storage), kj::mv (impl->container ), containerRunning),
3498
+ KJ_ASSERT_NONNULL (lock.getWorker ().impl ->env ).addRef (js));
3499
+
3500
+ // HACK: We set handler.env to undefined because we already passed the real env into the
3501
+ // constructor, and we want the handler methods to act like they take just one parameter.
3502
+ // We do the same for handler.ctx, as ExecutionContext related tasks are performed
3503
+ // on the actor's state field instead.
3504
+ handler.env = js.v8Ref (js.v8Undefined ());
3505
+ handler.ctx = kj::none;
3506
+ handler.missingSuperclass = info.missingSuperclass ;
3507
+
3508
+ impl->classInstance = kj::mv (handler);
3509
+ }, kj::mv (inputLock));
3510
+ } catch (...) {
3511
+ // Get the KJ exception
3512
+ auto e = kj::getCaughtExceptionAsKj ();
3500
3513
3514
+ auto msg = e.getDescription ();
3501
3515
if (!msg.startsWith (" broken." _kj) && !msg.startsWith (" remote.broken." _kj)) {
3502
3516
// If we already set up a brokenness reason, we shouldn't override it.
3503
-
3504
3517
auto description = jsg::annotateBroken (msg, " broken.constructorFailed" );
3505
3518
e.setDescription (kj::mv (description));
3506
3519
}
3507
3520
3508
3521
impl->constructorFailedPaf .fulfiller ->reject (kj::cp (e));
3509
3522
impl->classInstance = kj::mv (e);
3510
- });
3523
+ }
3511
3524
}
3512
3525
3513
3526
Worker::Actor::~Actor () noexcept (false ) {
0 commit comments