Skip to content

Commit 7683516

Browse files
committed
feat: new hooks API
1 parent cae5625 commit 7683516

File tree

2 files changed

+113
-17
lines changed

2 files changed

+113
-17
lines changed

lib/core/request.js

+82-17
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
const {
44
InvalidArgumentError,
5-
NotSupportedError
5+
NotSupportedError,
6+
AbortError
67
} = require('./errors')
78
const assert = require('node:assert')
9+
const { parseHeaders } = require('./util')
810
const {
911
isValidHTTPToken,
1012
isValidHeaderChar,
@@ -25,6 +27,45 @@ const { headerNameLowerCasedRecord } = require('./constants')
2527
const invalidPathRegex = /[^\u0021-\u00ff]/
2628

2729
const kHandler = Symbol('handler')
30+
const kResume = Symbol('resume')
31+
const kAbort = Symbol('abort')
32+
33+
class Controller {
34+
#paused = false
35+
#reason = null
36+
#abort = null
37+
38+
constructor (abort) {
39+
this.#abort = abort
40+
this[kResume] = null
41+
}
42+
43+
pause () {
44+
this.#paused = true
45+
}
46+
47+
resume () {
48+
this.#paused = false
49+
this[kResume]?.()
50+
}
51+
52+
abort (reason) {
53+
this.#reason = reason ?? new AbortError()
54+
this[kAbort]?.(this.#reason)
55+
}
56+
57+
get paused () {
58+
return this.#paused
59+
}
60+
61+
get aborted () {
62+
return this.#reason !== null
63+
}
64+
65+
get reason () {
66+
return this.#reason
67+
}
68+
}
2869

2970
class Request {
3071
constructor (origin, {
@@ -91,6 +132,8 @@ class Request {
91132

92133
this.abort = null
93134

135+
this.controller = new Controller()
136+
94137
if (body == null) {
95138
this.body = null
96139
} else if (isStream(body)) {
@@ -192,12 +235,11 @@ class Request {
192235
}
193236

194237
onBodySent (chunk) {
195-
if (this[kHandler].onBodySent) {
196-
try {
197-
return this[kHandler].onBodySent(chunk)
198-
} catch (err) {
199-
this.abort(err)
200-
}
238+
try {
239+
this[kHandler].onRequestData?.(chunk)
240+
this[kHandler].onBodySent?.(chunk)
241+
} catch (err) {
242+
this.abort(err)
201243
}
202244
}
203245

@@ -206,19 +248,20 @@ class Request {
206248
channels.bodySent.publish({ request: this })
207249
}
208250

209-
if (this[kHandler].onRequestSent) {
210-
try {
211-
return this[kHandler].onRequestSent()
212-
} catch (err) {
213-
this.abort(err)
214-
}
251+
try {
252+
this[kHandler].onRequestEnd?.()
253+
this[kHandler].onRequestSent?.()
254+
} catch (err) {
255+
this.abort(err)
215256
}
216257
}
217258

218259
onConnect (abort) {
219260
assert(!this.aborted)
220261
assert(!this.completed)
221262

263+
this.controller[kAbort] = abort
264+
222265
if (this.error) {
223266
abort(this.error)
224267
} else {
@@ -228,7 +271,12 @@ class Request {
228271
}
229272

230273
onResponseStarted () {
231-
return this[kHandler].onResponseStarted?.()
274+
try {
275+
this[kHandler].onResponseStart?.(this.controller)
276+
return this[kHandler].onResponseStarted?.()
277+
} catch (err) {
278+
this.abort(err)
279+
}
232280
}
233281

234282
onHeaders (statusCode, headers, resume, statusText) {
@@ -239,8 +287,16 @@ class Request {
239287
channels.headers.publish({ request: this, response: { statusCode, headers, statusText } })
240288
}
241289

290+
this.controller[kResume] = resume
291+
242292
try {
243-
return this[kHandler].onHeaders(statusCode, headers, resume, statusText)
293+
this[kHandler].onResponseHeaders?.(parseHeaders(headers), statusCode, statusText)
294+
295+
if (this[kHandler].onHeaders?.(statusCode, headers, () => this.controller.resume(), statusText) === false) {
296+
this.controller.pause()
297+
}
298+
299+
return !this.controller.paused
244300
} catch (err) {
245301
this.abort(err)
246302
}
@@ -251,7 +307,13 @@ class Request {
251307
assert(!this.completed)
252308

253309
try {
254-
return this[kHandler].onData(chunk)
310+
this[kHandler].onResponseData?.(chunk)
311+
312+
if (this[kHandler].onData?.(chunk) === false) {
313+
this.controller.pause()
314+
}
315+
316+
return !this.controller.paused
255317
} catch (err) {
256318
this.abort(err)
257319
return false
@@ -276,7 +338,10 @@ class Request {
276338
}
277339

278340
try {
279-
return this[kHandler].onComplete(trailers)
341+
this[kHandler].onResponseTrailers?.(parseHeaders(trailers))
342+
this[kHandler].onResponseEnd?.()
343+
344+
this[kHandler].onComplete(trailers)
280345
} catch (err) {
281346
// TODO (fix): This might be a bad idea?
282347
this.onError(err)

types/dispatcher.d.ts

+31
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,15 @@ declare namespace Dispatcher {
215215
context: object;
216216
}
217217
export type StreamFactory = (data: StreamFactoryData) => Writable;
218+
export interface Controller {
219+
readonly aborted: boolean;
220+
readonly reason: Error | null;
221+
readonly paused: boolean;
222+
223+
pause(): void;
224+
resume(): void;
225+
abort(reason: Error): void;
226+
}
218227
export interface DispatchHandlers {
219228
/** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */
220229
onConnect?(abort: () => void): void;
@@ -232,6 +241,28 @@ declare namespace Dispatcher {
232241
onComplete?(trailers: string[] | null): void;
233242
/** Invoked when a body chunk is sent to the server. May be invoked multiple times for chunked requests */
234243
onBodySent?(chunkSize: number, totalBytesSent: number): void;
244+
245+
// New API
246+
247+
/** Invoked after request is starting to be processed */
248+
onRequestStart?(/* controller: Controller */): void;
249+
/** Invoked after headers data is sent */
250+
// onRequestHeaders?(headers: Record<string, string>): void;
251+
/** Invoked after payload data is sent. */
252+
onRequestData?(chunk: Buffer | string): void;
253+
/** Invoked after request has finished sending */
254+
onRequestEnd?(): void;
255+
256+
/** Invoked after response is starting to be processed */
257+
onResponseStart?(controller: Controller): void;
258+
/** Invoked after headers data has been received */
259+
onResponseHeaders?(headers: Record<string, string>, statusCode: number, statusText?: string): void;
260+
/** Invoked after response payload data is received. */
261+
onResponseData?(chunk: Buffer | string): void;
262+
/** Invoked after trailers data has been received */
263+
onResponseTrailers?(trailers: Record<string, string>): void;
264+
/** Invoked after response has finished */
265+
onResponseEnd?(): void;
235266
}
236267
export type PipelineHandler = (data: PipelineHandlerData) => Readable;
237268
export type HttpMethod = 'GET' | 'HEAD' | 'POST' | 'PUT' | 'DELETE' | 'CONNECT' | 'OPTIONS' | 'TRACE' | 'PATCH';

0 commit comments

Comments
 (0)