Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions lib/handler/cache-deduplication-handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
'use strict'

/**
* @typedef {import('../../types/dispatcher.d.ts').default.DispatchHandler} DispatchHandler
*/

/**
* Handler that buffers response data and notifies multiple waiting handlers.
* Used for request deduplication in the cache interceptor.
*
* @implements {DispatchHandler}
*/
class CacheDeduplicationHandler {
/**
* @type {DispatchHandler}
*/
#primaryHandler

/**
* @type {DispatchHandler[]}
*/
#waitingHandlers = []

/**
* @type {Buffer[]}
*/
#chunks = []

/**
* @type {number}
*/
#statusCode = 0

/**
* @type {Record<string, string | string[]>}
*/
#headers = {}

/**
* @type {string}
*/
#statusMessage = ''

/**
* @type {boolean}
*/
#aborted = false

/**
* @type {import('../../types/dispatcher.d.ts').default.DispatchController | null}
*/
#controller = null

/**
* @type {(() => void) | null}
*/
#onComplete = null

/**
* @param {DispatchHandler} primaryHandler The primary handler (CacheHandler)
* @param {() => void} onComplete Callback when request completes
*/
constructor (primaryHandler, onComplete) {
this.#primaryHandler = primaryHandler
this.#onComplete = onComplete
}

/**
* Add a waiting handler that will receive the buffered response
* @param {DispatchHandler} handler
*/
addWaitingHandler (handler) {
this.#waitingHandlers.push(handler)
}

/**
* @param {() => void} abort
* @param {any} context
*/
onRequestStart (controller, context) {
this.#controller = controller
this.#primaryHandler.onRequestStart?.(controller, context)
}

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
* @param {number} statusCode
* @param {import('../../types/header.d.ts').IncomingHttpHeaders} headers
* @param {Socket} socket
*/
onRequestUpgrade (controller, statusCode, headers, socket) {
this.#primaryHandler.onRequestUpgrade?.(controller, statusCode, headers, socket)
}

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
* @param {number} statusCode
* @param {Record<string, string | string[]>} headers
* @param {string} statusMessage
*/
onResponseStart (controller, statusCode, headers, statusMessage) {
this.#statusCode = statusCode
this.#headers = headers
this.#statusMessage = statusMessage
this.#primaryHandler.onResponseStart?.(controller, statusCode, headers, statusMessage)
}

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
* @param {Buffer} chunk
*/
onResponseData (controller, chunk) {
// Buffer the chunk for waiting handlers
this.#chunks.push(Buffer.from(chunk))
this.#primaryHandler.onResponseData?.(controller, chunk)
}

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
* @param {object} trailers
*/
onResponseEnd (controller, trailers) {
this.#primaryHandler.onResponseEnd?.(controller, trailers)
this.#notifyWaitingHandlers()
this.#onComplete?.()
}

/**
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
* @param {Error} err
*/
onResponseError (controller, err) {
this.#aborted = true
this.#primaryHandler.onResponseError?.(controller, err)
this.#notifyWaitingHandlersError(err)
this.#onComplete?.()
}

/**
* Notify all waiting handlers with the buffered response
*/
#notifyWaitingHandlers () {
const body = Buffer.concat(this.#chunks)

for (const handler of this.#waitingHandlers) {
// Create a simple controller for each waiting handler
const waitingController = {
resume () {},
pause () {},
get paused () { return false },
get aborted () { return false },
get reason () { return null },
abort () {}
}

try {
handler.onRequestStart?.(waitingController, null)

if (waitingController.aborted) {
continue
}

handler.onResponseStart?.(
waitingController,
this.#statusCode,
this.#headers,
this.#statusMessage
)

if (waitingController.aborted) {
continue
}

if (body.length > 0) {
handler.onResponseData?.(waitingController, body)
}

handler.onResponseEnd?.(waitingController, {})
} catch {
// Ignore errors from waiting handlers
}
}

this.#waitingHandlers = []
this.#chunks = []
}

/**
* Notify all waiting handlers of an error
* @param {Error} err
*/
#notifyWaitingHandlersError (err) {
for (const handler of this.#waitingHandlers) {
const waitingController = {
resume () {},
pause () {},
get paused () { return false },
get aborted () { return true },
get reason () { return err },
abort () {}
}

try {
handler.onRequestStart?.(waitingController, null)
handler.onResponseError?.(waitingController, err)
} catch {
// Ignore errors from waiting handlers
}
}

this.#waitingHandlers = []
this.#chunks = []
}
}

module.exports = CacheDeduplicationHandler
61 changes: 54 additions & 7 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

const assert = require('node:assert')
const { Readable } = require('node:stream')
const diagnosticsChannel = require('node:diagnostics_channel')
const util = require('../core/util')
const CacheHandler = require('../handler/cache-handler')
const CacheDeduplicationHandler = require('../handler/cache-deduplication-handler')
const MemoryCacheStore = require('../cache/memory-cache-store')
const CacheRevalidationHandler = require('../handler/cache-revalidation-handler')
const { assertCacheStore, assertCacheMethods, makeCacheKey, normalizeHeaders, parseCacheControlHeader } = require('../util/cache.js')
const { assertCacheStore, assertCacheMethods, makeCacheKey, normalizeHeaders, parseCacheControlHeader, makeDeduplicationKey } = require('../util/cache.js')
const { AbortError } = require('../core/errors.js')

// Diagnostic channel for cache deduplication events
const pendingRequestsChannel = diagnosticsChannel.channel('undici:cache:pending-requests')

const nop = () => {}

/**
Expand Down Expand Up @@ -96,14 +101,16 @@ function withinStaleWhileRevalidateWindow (result) {
* @param {import('../../types/dispatcher.d.ts').default.DispatchHandler} handler
* @param {import('../../types/dispatcher.d.ts').default.RequestOptions} opts
* @param {import('../../types/cache-interceptor.d.ts').default.CacheControlDirectives | undefined} reqCacheControl
* @param {Map<string, CacheDeduplicationHandler>} pendingRequests
*/
function handleUncachedResponse (
dispatch,
globalOpts,
cacheKey,
handler,
opts,
reqCacheControl
reqCacheControl,
pendingRequests
) {
if (reqCacheControl?.['only-if-cached']) {
let aborted = false
Expand Down Expand Up @@ -137,7 +144,37 @@ function handleUncachedResponse (
return true
}

return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
// Generate deduplication key
const dedupeKey = makeDeduplicationKey(cacheKey)

// Check if there's already a pending request for this key
const pendingHandler = pendingRequests.get(dedupeKey)
if (pendingHandler) {
// Add this handler to the waiting list
pendingHandler.addWaitingHandler(handler)
return true
}

// Create a new deduplication handler
const cacheHandler = new CacheHandler(globalOpts, cacheKey, handler)
const deduplicationHandler = new CacheDeduplicationHandler(
cacheHandler,
() => {
// Clean up when request completes
pendingRequests.delete(dedupeKey)
if (pendingRequestsChannel.hasSubscribers) {
pendingRequestsChannel.publish({ size: pendingRequests.size, key: dedupeKey, type: 'removed' })
}
}
)

// Register the pending request
pendingRequests.set(dedupeKey, deduplicationHandler)
if (pendingRequestsChannel.hasSubscribers) {
pendingRequestsChannel.publish({ size: pendingRequests.size, key: dedupeKey, type: 'added' })
}

return dispatch(opts, deduplicationHandler)
}

/**
Expand Down Expand Up @@ -229,6 +266,7 @@ function sendCachedValue (handler, opts, result, age, context, isStale) {
* @param {import('../../types/dispatcher.d.ts').default.RequestOptions} opts
* @param {import('../../types/cache-interceptor.d.ts').default.CacheControlDirectives | undefined} reqCacheControl
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult | undefined} result
* @param {Map<string, CacheDeduplicationHandler>} pendingRequests
*/
function handleResult (
dispatch,
Expand All @@ -237,10 +275,11 @@ function handleResult (
handler,
opts,
reqCacheControl,
result
result,
pendingRequests
) {
if (!result) {
return handleUncachedResponse(dispatch, globalOpts, cacheKey, handler, opts, reqCacheControl)
return handleUncachedResponse(dispatch, globalOpts, cacheKey, handler, opts, reqCacheControl, pendingRequests)
}

const now = Date.now()
Expand Down Expand Up @@ -399,6 +438,12 @@ module.exports = (opts = {}) => {

const safeMethodsToNotCache = util.safeHTTPMethods.filter(method => methods.includes(method) === false)

/**
* Map of pending requests for deduplication
* @type {Map<string, CacheDeduplicationHandler>}
*/
const pendingRequests = new Map()

return dispatch => {
return (opts, handler) => {
if (!opts.origin || safeMethodsToNotCache.includes(opts.method)) {
Expand Down Expand Up @@ -433,7 +478,8 @@ module.exports = (opts = {}) => {
handler,
opts,
reqCacheControl,
result
result,
pendingRequests
))
} else {
return handleResult(
Expand All @@ -443,7 +489,8 @@ module.exports = (opts = {}) => {
handler,
opts,
reqCacheControl,
result
result,
pendingRequests
)
}
}
Expand Down
25 changes: 24 additions & 1 deletion lib/util/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,28 @@ function assertCacheMethods (methods, name = 'CacheMethods') {
}
}

/**
* Creates a string key for request deduplication purposes.
* This key is used to identify in-flight requests that can be shared.
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} cacheKey
* @returns {string}
*/
function makeDeduplicationKey (cacheKey) {
// Create a deterministic string key from the cache key
// Include origin, method, path, and sorted headers
let key = `${cacheKey.origin}:${cacheKey.method}:${cacheKey.path}`

if (cacheKey.headers) {
const sortedHeaders = Object.keys(cacheKey.headers).sort()
for (const header of sortedHeaders) {
const value = cacheKey.headers[header]
key += `:${header}=${Array.isArray(value) ? value.join(',') : value}`
}
}

return key
}

module.exports = {
makeCacheKey,
normalizeHeaders,
Expand All @@ -373,5 +395,6 @@ module.exports = {
parseVaryHeader,
isEtagUsable,
assertCacheMethods,
assertCacheStore
assertCacheStore,
makeDeduplicationKey
}
Loading
Loading