Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions docs/docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,46 @@ diagnosticsChannel.channel('undici:proxy:connected').subscribe(({ socket, connec
// const { origin, port, path, signal, headers, servername } = connectParams
})
```

## `undici:cache:pending-requests`

This message is published when the cache interceptor's pending request deduplication map changes. This is useful for monitoring and debugging request deduplication behavior.

The cache interceptor automatically deduplicates concurrent requests for the same cacheable resource. When multiple identical requests are made while one is already in-flight, only one request is sent to the origin server, and all waiting handlers receive the same response.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:cache:pending-requests').subscribe(({ type, size, key }) => {
console.log(type) // 'added' or 'removed'
console.log(size) // current number of pending requests
console.log(key) // the deduplication key for this request
})
```

### Event Properties

- `type` (`string`): Either `'added'` when a new pending request is registered, or `'removed'` when a pending request completes (successfully or with an error).
- `size` (`number`): The current number of pending requests after the change.
- `key` (`string`): The deduplication key for the request, composed of the origin, method, path, and request headers.

### Example: Monitoring Request Deduplication

```js
import diagnosticsChannel from 'diagnostics_channel'

const channel = diagnosticsChannel.channel('undici:cache:pending-requests')

channel.subscribe(({ type, size, key }) => {
if (type === 'added') {
console.log(`New pending request: ${key} (${size} total pending)`)
} else {
console.log(`Request completed: ${key} (${size} remaining)`)
}
})
```

This can be useful for:
- Verifying that request deduplication is working as expected
- Monitoring the number of concurrent in-flight requests
- Debugging cache behavior in production environments
24 changes: 24 additions & 0 deletions docs/docs/api/Dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,30 @@ The `cache` interceptor implements client-side response caching as described in
- `methods` - The [**safe** HTTP methods](https://www.rfc-editor.org/rfc/rfc9110#section-9.2.1) to cache the response of.
- `cacheByDefault` - The default expiration time to cache responses by if they don't have an explicit expiration and cannot have an heuristic expiry computed. If this isn't present, responses neither with an explicit expiration nor heuristically cacheable will not be cached. Default `undefined`.
- `type` - The [type of cache](https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/Caching#types_of_caches) for Undici to act as. Can be `shared` or `private`. Default `shared`. `private` implies privately cacheable responses will be cached and potentially shared with other users of your application.
- `deduplication` - Enable request deduplication. When `true`, concurrent identical requests will be deduplicated so only one request is sent to the origin server. Default `false`.

**Request Deduplication**

When enabled via the `deduplication: true` option, the cache interceptor deduplicates concurrent requests for the same cacheable resource. When multiple identical requests are made while one is already in-flight, only one request is sent to the origin server, and all waiting handlers receive the same response. This reduces server load and improves performance.

```js
const { Client, interceptors } = require("undici");
const { cache } = interceptors;

const client = new Client("http://example.com").compose(
cache({ deduplication: true })
);
```

Requests are considered identical if they have the same:
- Origin
- HTTP method
- Path
- Request headers (used for cache key generation)

All deduplicated requests receive the complete response including status code, headers, and body.

For observability, request deduplication events are published to the `undici:cache:pending-requests` [diagnostic channel](/docs/docs/api/DiagnosticsChannel.md#undiciacachepending-requests).

## Instance Events

Expand Down
216 changes: 216 additions & 0 deletions lib/handler/cache-deduplication-handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
'use strict'

/**
* @typedef {import('../../types/dispatcher.d.ts').default.DispatchHandler} DispatchHandler
*/

/**
* Handler that buffers response data and notifies multiple waiting handlers.
* Used for request deduplication in the cache interceptor.
*
* @implements {DispatchHandler}
*/
class CacheDeduplicationHandler {
/**
* @type {DispatchHandler}
*/
#primaryHandler

/**
* @type {DispatchHandler[]}
*/
#waitingHandlers = []

/**
* @type {Buffer[]}
*/
#chunks = []

/**
* @type {number}
*/
#statusCode = 0

/**
* @type {Record<string, string | string[]>}
*/
#headers = {}

/**
* @type {string}
*/
#statusMessage = ''

/**
* @type {boolean}
*/
#aborted = false

/**
* @type {import('../../types/dispatcher.d.ts').default.DispatchController | null}
*/
#controller = null

/**
* @type {(() => void) | null}
*/
#onComplete = null

/**
* @param {DispatchHandler} primaryHandler The primary handler (CacheHandler)
* @param {() => void} onComplete Callback when request completes
*/
constructor (primaryHandler, onComplete) {
this.#primaryHandler = primaryHandler
this.#onComplete = onComplete
}

/**
* Add a waiting handler that will receive the buffered response
* @param {DispatchHandler} handler
*/
addWaitingHandler (handler) {
this.#waitingHandlers.push(handler)
}

/**
* @param {() => void} abort
* @param {any} context
*/
onRequestStart (controller, context) {
this.#controller = controller
this.#primaryHandler.onRequestStart?.(controller, context)
}

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
* @param {number} statusCode
* @param {import('../../types/header.d.ts').IncomingHttpHeaders} headers
* @param {Socket} socket
*/
onRequestUpgrade (controller, statusCode, headers, socket) {
this.#primaryHandler.onRequestUpgrade?.(controller, statusCode, headers, socket)
}

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
* @param {number} statusCode
* @param {Record<string, string | string[]>} headers
* @param {string} statusMessage
*/
onResponseStart (controller, statusCode, headers, statusMessage) {
this.#statusCode = statusCode
this.#headers = headers
this.#statusMessage = statusMessage
this.#primaryHandler.onResponseStart?.(controller, statusCode, headers, statusMessage)
}

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
* @param {Buffer} chunk
*/
onResponseData (controller, chunk) {
// Buffer the chunk for waiting handlers
this.#chunks.push(Buffer.from(chunk))
this.#primaryHandler.onResponseData?.(controller, chunk)
}

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
* @param {object} trailers
*/
onResponseEnd (controller, trailers) {
this.#primaryHandler.onResponseEnd?.(controller, trailers)
this.#notifyWaitingHandlers()
this.#onComplete?.()
}

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
* @param {Error} err
*/
onResponseError (controller, err) {
this.#aborted = true
this.#primaryHandler.onResponseError?.(controller, err)
this.#notifyWaitingHandlersError(err)
this.#onComplete?.()
}

/**
* Notify all waiting handlers with the buffered response
*/
#notifyWaitingHandlers () {
const body = Buffer.concat(this.#chunks)

for (const handler of this.#waitingHandlers) {
// Create a simple controller for each waiting handler
const waitingController = {
resume () {},
pause () {},
get paused () { return false },
get aborted () { return false },
get reason () { return null },
abort () {}
}

try {
handler.onRequestStart?.(waitingController, null)

if (waitingController.aborted) {
continue
}

handler.onResponseStart?.(
waitingController,
this.#statusCode,
this.#headers,
this.#statusMessage
)

if (waitingController.aborted) {
continue
}

if (body.length > 0) {
handler.onResponseData?.(waitingController, body)
}

handler.onResponseEnd?.(waitingController, {})
} catch {
// Ignore errors from waiting handlers
}
}

this.#waitingHandlers = []
this.#chunks = []
}

/**
* Notify all waiting handlers of an error
* @param {Error} err
*/
#notifyWaitingHandlersError (err) {
for (const handler of this.#waitingHandlers) {
const waitingController = {
resume () {},
pause () {},
get paused () { return false },
get aborted () { return true },
get reason () { return err },
abort () {}
}

try {
handler.onRequestStart?.(waitingController, null)
handler.onResponseError?.(waitingController, err)
} catch {
// Ignore errors from waiting handlers
}
}

this.#waitingHandlers = []
this.#chunks = []
}
}

module.exports = CacheDeduplicationHandler
Loading
Loading