Skip to content

Fix: removeSubscription hangs after Runloop crashes#1712

Open
acrow wants to merge 2 commits into
zio:masterfrom
acrow:fix/runloop-done-deadlock
Open

Fix: removeSubscription hangs after Runloop crashes#1712
acrow wants to merge 2 commits into
zio:masterfrom
acrow:fix/runloop-done-deadlock

Conversation

@acrow

@acrow acrow commented Jun 8, 2026

Copy link
Copy Markdown

Problem

When the Runloop crashes (e.g. after TopicAuthorizationException exhausts retries), scope finalizers for active subscriptions call removeSubscription →
offerAndAwaitCommand. At this point the Runloop.make scope finalizer may be concurrently shutting down the commandQueue. This causes commandQueue.offer to
block, leaving removeSubscription hung indefinitely. In turn, flatMapPar waits for the inner fiber to finish, runDrain never returns, and Consumer.consumeWith hangs forever.

Fix

Add runloopDone: Promise[Throwable, Nothing] that is completed (failed with the runloop cause) when the runloop exits. offerAndAwaitCommand checks
runloopDone.isDone first:

  • If already done → skip commandQueue.offer entirely and return runloopDone.await directly
  • If not done → offer to queue and race promise.await against runloopDone.await

Also adds dataQueue.offer(Exit.fail(cause)) in halt() so partition stream fibers that are between polls detect the failure on their next poll cycle.

RunloopAccess: changed removeSubscription.orDie to .ignore since failure is expected when the runloop has already crashed.

Tests

Added RunloopSpec test: removeSubscription does not hang after Runloop crashes — verifies that removeSubscription completes promptly (does not time out) after the runloop crashes.

Comment thread zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala Outdated
@acrow acrow force-pushed the fix/runloop-done-deadlock branch from a31bd27 to cb6f6c9 Compare June 10, 2026 02:40

@erikvanoosten erikvanoosten left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the problem that caused partition streams to dead lock. Please see the comments for a solution.

The new unit test fails now though. Do you want to take another look at that?

Comment thread zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala Outdated
Comment thread zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala Outdated
Comment thread zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala Outdated
Comment on lines +74 to +84
private[internal] def offerAndAwaitCommand[E <: Throwable, A](f: Promise[E, A] => RunloopCommand): Task[A] =
runloopDone.isDone.flatMap {
case true => runloopDone.await
case false =>
(Promise
.make[E, A]
.flatMap { promise =>
commandQueue.offer(f(promise)) *> promise.await
}: Task[A])
.raceFirst(runloopDone.await)
}

@erikvanoosten erikvanoosten Jun 10, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During normal (non-error) shut down, runloop does not end the streams directly, only via the unsubscribe command. Therefore, skipping the command is problematic, the tests fail on it.

Suggested change
private[internal] def offerAndAwaitCommand[E <: Throwable, A](f: Promise[E, A] => RunloopCommand): Task[A] =
runloopDone.isDone.flatMap {
case true => runloopDone.await
case false =>
(Promise
.make[E, A]
.flatMap { promise =>
commandQueue.offer(f(promise)) *> promise.await
}: Task[A])
.raceFirst(runloopDone.await)
}
private[internal] def offerAndAwaitCommand[E <: Throwable, A](f: Promise[E, A] => RunloopCommand): Task[A] =
runloopDone.await.raceFirst {
for {
promise <- Promise.make[E, A]
_ <- commandQueue.offer(f(promise))
r <- promise.await
} yield r
}

Comment thread zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala Outdated
Comment on lines +580 to +582
// Signal runloopDone FIRST so that removeSubscription calls in partition
// stream finalizers can return immediately without offering to commandQueue.
_ <- runloopDone.failCause(cause).ignore

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the changes in offerAndAwaitCommand I think this should go last. Either way, with the other changes, the comment is no longer applicable.

@acrow acrow force-pushed the fix/runloop-done-deadlock branch from cb6f6c9 to e6b5499 Compare June 11, 2026 02:19
@acrow

acrow commented Jun 11, 2026

Copy link
Copy Markdown
Author

Hi @erikvanoosten, thank you for finding the real root cause — the dataQueue.poll not racing with interruptionPromise was indeed the missing piece. All your suggestions have
been applied in the latest commit:

  • dataQueue.poll now races with interruptionPromise.await.mapError(Option.apply) ✓
  • mapError moved inside the interruptionPromise branch in requestAndAwaitData ✓
  • dataQueue.offer(Exit.fail(...)) reverted from halt() ✓
  • Error message changed to "Consumer shutting down" ✓
  • runloopDone.failCause moved to last in catchAllCause ✓

Regarding offerAndAwaitCommand — we kept the isDone check because our live tests hang without it, even with your raceFirst-only approach. We believe the reason is that removeSubscription is called from inside ZIO.addFinalizer, which runs uninterruptibly. When runloopDone.await wins the race, raceFirst interrupts the right fiber and waits for its finalizers. But the right fiber is inside an uninterruptible context, so the interrupt signal is masked and promise.await blocks forever — raceFirst never returns.

The isDone check avoids this entirely: if the runloop is already done, we never start the right fiber, so there is nothing to interrupt.

Regarding the unit test — the RunloopSpec test removeSubscription does not hang after Runloop crashes passes with all your changes applied. The test runs outside a finalizer context so it does not hit the uninterruptible issue.

@erikvanoosten

erikvanoosten commented Jun 14, 2026

Copy link
Copy Markdown
Collaborator

@acrow Thanks for that analysis.

I agree that the problem is that removeSubscription is called from inside ZIO.addFinalizer. I experimented with wrapping the call with interruptible like this:

// RunloopAccess subscribe
      _ <- ZIO.addFinalizer {
             ZIO.interruptible {
               withRunloopZIO(requireRunning = false)(_.removeSubscription(subscription).ignore) <*
                 diagnostics.emit(DiagnosticEvent.SubscriptionFinalized)
             }
           }

and that works a lot better, but there are still failing tests. In addition, it doesn't solve the race condition.

The race condition can occur because Runloop.offerAndAwaitCommand checks runloopDone from outside the fiber that runs the 'runloop'. What if we move the check to inside the fiber that runs the runloop? That is, from where we handle the command (method handleCommand).

Thinking again, this brings other problems... We are just moving the race condition; how do we make sure the runloop actually processes the queued commands?
... 🤔 ... 🤔

This might work:

  1. We keep Runloop.offerAndAwaitCommand as proposed in this PR.
  2. In Runloop's catchAllCause, we add some code that loops through all pending commands in commandQueue and fails the open promises.

Note: with this the race condition doesn't go away, we just make it less likely to result in a problem. Step 2 should be done last inside Runloop's catchAllCause, to make the chances of a missed command as small as possible.

The only way to really make the race condition go away is by having 2-way communication between Runloop.offerAndAwaitCommand and the runloop... Not sure how to do it yet.

We'll need to do step 2. also at end of the runloop (the second place where we call runloopDone.fail). (Perhaps move calling runloopDone.fail and emptying the command queue to a separate method...)

WDYT?

A side note: Runloop.shouldPoll should also check runloopDone:

  private def shouldPoll(state: State): UIO[Boolean] =
    for {
      runloopIsDone      <- runloopDone.isDone
      pendingCommitCount <- committer.pendingCommitCount
    } yield !runloopIsDone &&
      state.subscriptionState.isSubscribed &&
      (state.pendingRequests.nonEmpty || pendingCommitCount > 0 || state.assignedStreams.isEmpty)

@acrow

acrow commented Jun 15, 2026

Copy link
Copy Markdown
Author

Hi @erikvanoosten, thank you for the detailed analysis and the suggestions.

Both changes have been applied in the latest commit:

  • drainCommandQueue: after runloopDone is set, we now drain all remaining commands from commandQueue and fail their open promises with the runloop's cause. This is called at the end of both catchAllCause and zipLeft, so the window in which a command can be queued but never answered is as small as possible.
  • shouldPoll guards on runloopDone.isDone: the runloop now stops polling immediately once it has signalled done, rather than relying solely on the takeWhile stream guard.
  • AddSubscription.cont widened from Promise[InvalidSubscriptionUnion, Unit] to Promise[Throwable, Unit] so all three command types (AddSubscription, RemoveSubscription, EndStreamsBySubscription) can be drained uniformly.

We tried to write a unit test that specifically exercises the race window (command queued while catchAllCause is running), but the window is too narrow to hit reliably in a unit test without injecting artificial delays into catchAllCause. The existing test removeSubscription does not hang after Runloop crashes still passes and validates the overall no-hang property. Please let us know if you'd like a different test approach.

@erikvanoosten

Copy link
Copy Markdown
Collaborator

I finally got some time again (not much though) to look at this PR. Did you run the unit tests? When I run them on my laptop some of the test time out. I tried to debug the issue but I have not found the problem yet. Do the tests (sbt test) run on your machine?

@acrow acrow force-pushed the fix/runloop-done-deadlock branch from 092bbbb to 16f3c8f Compare June 22, 2026 07:43
@CLAassistant

CLAassistant commented Jun 22, 2026

Copy link
Copy Markdown

CLA assistant check
All committers have signed the CLA.

When the Runloop crashes, scope finalizers for active subscriptions call
removeSubscription -> offerAndAwaitCommand. The commandQueue may be
concurrently shutting down, causing commandQueue.offer to block and
leaving removeSubscription hung indefinitely. This in turn blocks
flatMapPar from completing its cleanup, so consumeWith never returns.

Fix: add runloopDone: Promise[Throwable, Nothing] that is completed
(failed with the runloop cause) when the runloop exits. offerAndAwaitCommand
checks runloopDone.isDone first — if done, it skips the offer entirely and
returns runloopDone.await directly. For the not-yet-done path it races
promise.await against runloopDone.await so callers unblock as soon as the
runloop exits regardless of which happens first.

Also adds dataQueue.offer(Exit.fail) in halt() so partition stream fibers
that are between polls detect the failure on their next poll cycle.

RunloopAccess: removeSubscription.ignore instead of .orDie since failure
is expected when the runloop has already crashed.
@acrow acrow force-pushed the fix/runloop-done-deadlock branch from 16f3c8f to e2a00a4 Compare June 22, 2026 07:48
@erikvanoosten

erikvanoosten commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

Oh smart! You swapped the place where runloopDone is checked in offerAndAwaitCommand!

I have some small tweaks standing by. I'll try to find time later today to send those over.

@acrow

acrow commented Jun 22, 2026

Copy link
Copy Markdown
Author

Hi @erikvanoosten, yes we found the issue and have pushed a fix.

The root cause was raceFirst in offerAndAwaitCommand. When runloopDone.await wins the race, raceFirst tries to interrupt the losing fiber (promise.await). But
removeSubscription and endStreamsBySubscription are both called from ZIO.addFinalizer, which is uninterruptible — so the interrupt is masked and raceFirst waits forever for the loser's finalizers. This caused the widespread timeouts.

We replaced raceFirst with a two-pronged approach:

  1. drainCommandQueue in catchAllCause: when the runloop crashes, drain all pending promise-bearing commands from the queue and fail their promises with the crash cause.
    Handles commands already in the queue at crash time.
  2. Post-offer isDone check in offerAndAwaitCommand: after offering the command, check runloopDone.isDone. If the runloop already crashed (and drainCommandQueue ran before our offer), fail the promise immediately. Then simply promise.await — no racing at all.

There is still a narrow race window between commandQueue.offer and the isDone check, but it is effectively closed because runloopDone.failCause is called before drainCommandQueue in catchAllCause — so either drainCommandQueue sees the command, or the isDone check fires.

We also added two simulation tests to RunloopSpec that wrap removeSubscription and endStreamsBySubscription in ZIO.uninterruptible, replicating the ZIO.addFinalizer context.
These would have caught the raceFirst bug earlier.

Results: all 8 RunloopSpec tests and all 44 ConsumerSpec integration tests pass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants