Skip to content

Commit

Permalink
time out stalled streams
Browse files Browse the repository at this point in the history
  • Loading branch information
turbocrime committed Feb 22, 2025
1 parent 5dcd3e5 commit 33cab5f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .changeset/popular-keys-look.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@penumbra-zone/transport-dom': patch
---

response streams will now respect the request's timeout configuration..
40 changes: 26 additions & 14 deletions packages/transport-dom/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export const createChannelTransport = ({
async unary<I extends Message<I> = AnyMessage, O extends Message<O> = AnyMessage>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
signal: AbortSignal | undefined = AbortSignal.any([]),
timeoutMs: number | undefined = defaultTimeoutMs,
header: HeadersInit | undefined,
input: PartialMessage<I>,
Expand All @@ -160,10 +160,10 @@ export const createChannelTransport = ({
const requestId = crypto.randomUUID();

const requestFailure = new AbortController();
const deadline = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;
const requestDeadline = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;

const response = Promise.race([
rejectOnSignal(transportFailure.signal, requestFailure.signal, deadline, signal),
rejectOnSignal(transportFailure.signal, requestFailure.signal, requestDeadline, signal),
new Promise<TransportMessage>((resolve, reject) => {
pending.set(requestId, (tev: TransportEvent) => {
if (isTransportMessage(tev, requestId)) {
Expand All @@ -177,13 +177,13 @@ export const createChannelTransport = ({
}),
]).finally(() => pending.delete(requestId));

if (!signal?.aborted) {
if (!signal.aborted) {
try {
switch (method.kind) {
case MethodKind.Unary:
{
const message = Any.pack(new method.I(input)).toJson(jsonOptions);
signal?.addEventListener('abort', () =>
signal.addEventListener('abort', () =>
port?.postMessage({ requestId, abort: true } satisfies TransportAbort),
);

Expand Down Expand Up @@ -219,7 +219,7 @@ export const createChannelTransport = ({
async stream<I extends Message<I> = AnyMessage, O extends Message<O> = AnyMessage>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
signal: AbortSignal | undefined = AbortSignal.any([]),
timeoutMs: number | undefined = defaultTimeoutMs,
header: HeadersInit | undefined,
input: AsyncIterable<PartialMessage<I>>,
Expand All @@ -230,10 +230,10 @@ export const createChannelTransport = ({
const requestId = crypto.randomUUID();

const requestFailure = new AbortController();
const deadline = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;
const requestDeadline = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;

const response = Promise.race([
rejectOnSignal(transportFailure.signal, requestFailure.signal, deadline, signal),
rejectOnSignal(transportFailure.signal, requestFailure.signal, requestDeadline, signal),
new Promise<TransportStream>((resolve, reject) => {
pending.set(requestId, (tev: TransportEvent) => {
if (isTransportStream(tev, requestId)) {
Expand All @@ -247,7 +247,7 @@ export const createChannelTransport = ({
}),
]).finally(() => pending.delete(requestId));

if (!signal?.aborted) {
if (!signal.aborted) {
try {
switch (method.kind) {
case MethodKind.ServerStreaming:
Expand Down Expand Up @@ -303,24 +303,36 @@ export const createChannelTransport = ({
}
}

const chunkAc = new AbortController();
const chunkDeadline = chunkAc.signal;
const chunkDeadlineExceeded = () => {
console.debug('chunkDeadlineExceeded');
if (timeoutMs) {
chunkAc.abort(ConnectError.from('Stream stalled', Code.DeadlineExceeded));
}
};

return {
service,
method,
stream: true,
header: new Headers((await response).header),
trailer: new Headers((await response).trailer),
message: await response.then(({ stream }) =>
stream.pipeThrough(
message: await response.then(({ stream }) => {
let chunkTimeout = setTimeout(chunkDeadlineExceeded, timeoutMs);
return stream.pipeThrough(
new TransformStream({
transform: (chunk, cont) => {
clearTimeout(chunkTimeout);
chunkTimeout = setTimeout(chunkDeadlineExceeded, timeoutMs);
const o = new method.O();
Any.fromJson(chunk, jsonOptions).unpackTo(o);
cont.enqueue(o);
},
}),
{ signal },
),
),
{ signal: AbortSignal.any([signal, chunkDeadline]) },
);
}),
};
},
};
Expand Down

0 comments on commit 33cab5f

Please sign in to comment.