-
-
Notifications
You must be signed in to change notification settings - Fork 487
Description
What version of Elysia is running?
1.4.25
What platform is your computer?
Linux 6.6.87.2-microsoft-standard-WSL2 x86_64 x86_64
What environment are you using
bun 1.3.10
Are you using dynamic mode?
No
What steps can reproduce the bug?
import { Elysia } from 'elysia'
let cancelled = false
const app = new Elysia()
.get('/stream', () => {
cancelled = false
const stream = new ReadableStream({
pull(controller) {
controller.enqueue(new TextEncoder().encode('data\n'))
return new Promise((resolve) => setTimeout(resolve, 100))
},
cancel() {
cancelled = true
}
})
return new Response(stream, {
headers: {
'content-type': 'application/octet-stream',
'transfer-encoding': 'chunked',
'cache-control': 'no-transform'
}
})
})
.get('/cancelled', () => Response.json({ cancelled }))
.listen(3000)const ac = new AbortController()
const res = await fetch('http://localhost:3000/stream', { signal: ac.signal })
const reader = res.body!.getReader()
await reader.read() // receive first chunk
ac.abort() // simulate client disconnect
await new Promise((r) => setTimeout(r, 1000))
const status = await fetch('http://localhost:3000/cancelled').then((r) => r.json())
console.log(status.cancelled) // prints false — should be trueWhat is the expected behavior?
After the client disconnects, cancelled should be true. The ReadableStream's cancel() callback should fire so that producers can clean up resources (close DB cursors, stop background work, etc.).
What do you see instead?
cancelled remains false. The stream's cancel() callback is never called.
The following error also appears in the console:
TypeError: Invalid state: Controller is already closed
at start (elysia/src/adapter/utils.ts:271:21)
Additional information
1. The abort listener sets a flag instead of cancelling the reader
streamResponse is an async generator consumed by createStreamHandler via a for await loop
inside start():
// src/adapter/utils.ts
export async function* streamResponse(response: Response) {
const reader = body.getReader()
try {
while (true) {
const { done, value } = await reader.read() // ← suspended here waiting for next chunk
if (done) break
yield decoder.decode(value)
}
} finally {
reader.releaseLock()
}
}// src/adapter/utils.ts — inside createStreamHandler → new ReadableStream({ async start(controller) {
request?.signal?.addEventListener('abort', () => {
end = true // ← BUG: flag is set, but for await is still blocked inside reader.read()
try { controller.close() } catch {}
})
for await (const chunk of generator) { // generator = streamResponse(response)
if (end) break // ← never reached while reader.read() is pending
controller.enqueue(chunk)
}When the client disconnects, end = true is set — but the for await is suspended inside
await reader.read() in streamResponse. It cannot see the flag until reader.read() returns on
its own, which may not happen until the next chunk arrives or the producer times out.
Because the loop never breaks via the flag while blocked, streamResponse's finally block
(reader.releaseLock()) never runs during the disconnect — and even when it eventually does,
releaseLock() does not invoke the stream's cancel algorithm. The cancel() callback on the
original ReadableStream is never called.
The fix is reader.cancel() called directly from the abort listener. It does two things
atomically: resolves any pending reader.read() with { done: true } (unblocking the loop), and
invokes the stream's cancel algorithm. Confirmed by test in Bun 1.3.10:
| Abort listener action | Inner cancel() fires? |
|---|---|
end = true + controller.close() (current) |
✗ — for await stays blocked in reader.read() |
reader.cancel() directly |
✓ — immediately resolves pending reader.read() at the same tick |
2. createResponseHandler routes all chunked responses through streamResponse
// src/adapter/utils.ts
export const createResponseHandler = (handler: CreateHandlerParameter) => {
const handleStream = createStreamHandler(handler)
return (response: Response, set: Context['set'], request?: Request) => {
const newResponse = new Response(response.body, { ... })
if (
!newResponse.headers.has('content-length') &&
newResponse.headers.get('transfer-encoding') === 'chunked'
)
return handleStream(
streamResponse(newResponse), // ← every chunked response goes through here
...
)
return newResponse
}
}Every Response with transfer-encoding: chunked goes through this path, so all streaming
responses are affected.
3. The "Controller is already closed" error is a secondary symptom
Inside createStreamHandler's start(), controller.close() is called in two places:
request?.signal?.addEventListener('abort', () => {
end = true
try { controller.close() } catch {} // ← first close
})
// ... for await eventually exits ...
try { controller.close() } catch {} // ← second close → "Controller is already closed"The second call throws because the abort listener already closed it. The error is caught and
silently swallowed — it is a symptom of the same root cause.
Suggested fix (tested in Bun 1.3.10)
Expose the reader from streamResponse and call reader.cancel() directly from the abort
listener. reader.cancel() does two things atomically: resolves any pending reader.read() with
{ done: true }, and invokes the stream's cancel algorithm. This unblocks the for await loop
immediately and propagates cancellation to the original ReadableStream.
// 1. Refactor streamResponse to expose the reader
export function streamResponseCancellable(response: Response) {
const body = response.body
if (!body) return null
const reader = body.getReader()
const decoder = new TextDecoder()
async function* gen() {
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
if (typeof value === 'string') yield value
else yield decoder.decode(value)
}
} finally {
reader.releaseLock()
}
}
return { gen: gen(), reader }
}
// 2. In createStreamHandler, replace `end = true` with reader.cancel()
// (keeping the existing start()-based structure unchanged otherwise)
const handle = streamResponseCancellable(response) // { gen, reader }
let cancelled = false
let storedController: ReadableStreamDefaultController | null = null
const doCancel = () => {
if (cancelled) return
cancelled = true
handle.reader.cancel() // ← unblocks reader.read() immediately + fires inner cancel()
try { storedController?.close() } catch {}
}
return new ReadableStream({
async start(controller) {
storedController = controller
request?.signal?.addEventListener('abort', doCancel, { once: true })
try {
for await (const chunk of handle.gen) {
controller.enqueue(chunk)
}
} catch {}
try { controller.close() } catch {}
},
cancel() {
doCancel() // ← also propagate web-streams-level cancel (e.g. framework closes the stream)
}
})Have you try removing the node_modules and bun.lockb and try again yet?
No response