Skip to content

Commit 2a27dc1

Browse files
authored
fix: minor connect cleanup (#2877)
1 parent 46b1b0f commit 2a27dc1

File tree

3 files changed

+80
-62
lines changed

3 files changed

+80
-62
lines changed

lib/dispatcher/client-h1.js

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ const {
4949
kMaxResponseSize,
5050
kListeners,
5151
kOnError,
52-
kResume
52+
kResume,
53+
kHTTPContext
5354
} = require('../core/symbols.js')
5455

5556
const constants = require('../llhttp/constants.js')
@@ -403,6 +404,7 @@ class Parser {
403404
removeAllListeners(socket)
404405

405406
client[kSocket] = null
407+
client[kHTTPContext] = null // TODO (fix): This is hacky...
406408
client[kQueue][client[kRunningIdx]++] = null
407409
client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))
408410

@@ -643,6 +645,8 @@ function onParserTimeout (parser) {
643645
}
644646

645647
async function connectH1 (client, socket) {
648+
client[kSocket] = socket
649+
646650
if (!llhttpInstance) {
647651
llhttpInstance = await llhttpPromise
648652
llhttpPromise = null
@@ -706,6 +710,7 @@ async function connectH1 (client, socket) {
706710
const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
707711

708712
client[kSocket] = null
713+
client[kHTTPContext] = null // TODO (fix): This is hacky...
709714

710715
if (client.destroyed) {
711716
assert(client[kPending] === 0)
@@ -733,6 +738,11 @@ async function connectH1 (client, socket) {
733738
client[kResume]()
734739
})
735740

741+
let closed = false
742+
socket.on('close', () => {
743+
closed = true
744+
})
745+
736746
return {
737747
version: 'h1',
738748
defaultPipelining: 1,
@@ -742,38 +752,48 @@ async function connectH1 (client, socket) {
742752
resume () {
743753
resumeH1(client)
744754
},
745-
destroy () {
755+
destroy (err, callback) {
756+
if (closed) {
757+
queueMicrotask(callback)
758+
} else {
759+
socket.destroy(err).on('close', callback)
760+
}
761+
},
762+
get destroyed () {
763+
return socket.destroyed
746764
},
747765
busy (request) {
748766
if (socket[kWriting] || socket[kReset] || socket[kBlocking]) {
749767
return true
750768
}
751769

752-
if (client[kRunning] > 0 && !request.idempotent) {
753-
// Non-idempotent request cannot be retried.
754-
// Ensure that no other requests are inflight and
755-
// could cause failure.
756-
return true
757-
}
758-
759-
if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
760-
// Don't dispatch an upgrade until all preceding requests have completed.
761-
// A misbehaving server might upgrade the connection before all pipelined
762-
// request has completed.
763-
return true
764-
}
770+
if (request) {
771+
if (client[kRunning] > 0 && !request.idempotent) {
772+
// Non-idempotent request cannot be retried.
773+
// Ensure that no other requests are inflight and
774+
// could cause failure.
775+
return true
776+
}
765777

766-
if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
767-
(util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) {
768-
// Request with stream or iterator body can error while other requests
769-
// are inflight and indirectly error those as well.
770-
// Ensure this doesn't happen by waiting for inflight
771-
// to complete before dispatching.
778+
if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
779+
// Don't dispatch an upgrade until all preceding requests have completed.
780+
// A misbehaving server might upgrade the connection before all pipelined
781+
// request has completed.
782+
return true
783+
}
772784

773-
// Request with stream or iterator body cannot be retried.
774-
// Ensure that no other requests are inflight and
775-
// could cause failure.
776-
return true
785+
if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
786+
(util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) {
787+
// Request with stream or iterator body can error while other requests
788+
// are inflight and indirectly error those as well.
789+
// Ensure this doesn't happen by waiting for inflight
790+
// to complete before dispatching.
791+
792+
// Request with stream or iterator body cannot be retried.
793+
// Ensure that no other requests are inflight and
794+
// could cause failure.
795+
return true
796+
}
777797
}
778798

779799
return false

lib/dispatcher/client-h2.js

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ const {
5555
} = http2
5656

5757
async function connectH2 (client, socket) {
58+
client[kSocket] = socket
59+
5860
if (!h2ExperimentalWarned) {
5961
h2ExperimentalWarned = true
6062
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
@@ -114,6 +116,11 @@ async function connectH2 (client, socket) {
114116
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
115117
})
116118

119+
let closed = false
120+
socket.on('close', () => {
121+
closed = true
122+
})
123+
117124
return {
118125
version: 'h2',
119126
defaultPipelining: Infinity,
@@ -124,8 +131,16 @@ async function connectH2 (client, socket) {
124131
resume () {
125132

126133
},
127-
destroy (err) {
134+
destroy (err, callback) {
128135
session.destroy(err)
136+
if (closed) {
137+
queueMicrotask(callback)
138+
} else {
139+
socket.destroy(err).on('close', callback)
140+
}
141+
},
142+
get destroyed () {
143+
return socket.destroyed
129144
},
130145
busy () {
131146
return false

lib/dispatcher/client.js

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@ const {
1717
const buildConnector = require('../core/connect.js')
1818
const {
1919
kUrl,
20-
kReset,
2120
kServerName,
2221
kClient,
2322
kBusy,
2423
kConnect,
25-
kBlocking,
2624
kResuming,
2725
kRunning,
2826
kPending,
2927
kSize,
30-
kWriting,
3128
kQueue,
3229
kConnected,
3330
kConnecting,
@@ -38,7 +35,6 @@ const {
3835
kRunningIdx,
3936
kError,
4037
kPipelining,
41-
kSocket,
4238
kKeepAliveTimeoutValue,
4339
kMaxHeadersSize,
4440
kKeepAliveMaxTimeout,
@@ -216,7 +212,6 @@ class Client extends DispatcherBase {
216212
: [createRedirectInterceptor({ maxRedirections })]
217213
this[kUrl] = util.parseOrigin(url)
218214
this[kConnector] = connect
219-
this[kSocket] = null
220215
this[kPipelining] = pipelining != null ? pipelining : 1
221216
this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize
222217
this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
@@ -277,13 +272,12 @@ class Client extends DispatcherBase {
277272
}
278273

279274
get [kConnected] () {
280-
return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
275+
return !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed
281276
}
282277

283278
get [kBusy] () {
284-
const socket = this[kSocket]
285-
return (
286-
(socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
279+
return Boolean(
280+
this[kHTTPContext]?.busy(null) ||
287281
(this[kSize] >= (getPipelining(this) || 1)) ||
288282
this[kPending] > 0
289283
)
@@ -346,13 +340,9 @@ class Client extends DispatcherBase {
346340
resolve(null)
347341
}
348342

349-
if (this[kHTTPContext] != null) {
350-
this[kHTTPContext].destroy(err)
343+
if (this[kHTTPContext]) {
344+
this[kHTTPContext].destroy(err, callback)
351345
this[kHTTPContext] = null
352-
}
353-
354-
if (this[kSocket]) {
355-
this[kSocket].destroy(err).on('close', callback)
356346
} else {
357347
queueMicrotask(callback)
358348
}
@@ -386,7 +376,7 @@ function onError (client, err) {
386376

387377
async function connect (client) {
388378
assert(!client[kConnecting])
389-
assert(!client[kSocket])
379+
assert(!client[kHTTPContext])
390380

391381
let { host, hostname, protocol, port } = client[kUrl]
392382

@@ -441,21 +431,24 @@ async function connect (client) {
441431
return
442432
}
443433

444-
client[kConnecting] = false
445-
446434
assert(socket)
447435

448-
client[kHTTPContext] = socket.alpnProtocol === 'h2'
449-
? await connectH2(client, socket)
450-
: await connectH1(client, socket)
436+
try {
437+
client[kHTTPContext] = socket.alpnProtocol === 'h2'
438+
? await connectH2(client, socket)
439+
: await connectH1(client, socket)
440+
} catch (err) {
441+
socket.destroy().on('error', () => {})
442+
throw err
443+
}
444+
445+
client[kConnecting] = false
451446

452447
socket[kCounter] = 0
453448
socket[kMaxRequests] = client[kMaxRequests]
454449
socket[kClient] = client
455450
socket[kError] = null
456451

457-
client[kSocket] = socket
458-
459452
if (channels.connected.hasSubscribers) {
460453
channels.connected.publish({
461454
connectParams: {
@@ -546,8 +539,6 @@ function _resume (client, sync) {
546539
return
547540
}
548541

549-
const socket = client[kSocket]
550-
551542
if (client[kHTTPContext]) {
552543
client[kHTTPContext].resume()
553544
}
@@ -580,27 +571,19 @@ function _resume (client, sync) {
580571
}
581572

582573
client[kServerName] = request.servername
583-
584-
if (socket && socket.servername !== request.servername) {
585-
util.destroy(socket, new InformationalError('servername changed'))
586-
return
587-
}
574+
client[kHTTPContext]?.destroy(new InformationalError('servername changed'))
588575
}
589576

590577
if (client[kConnecting]) {
591578
return
592579
}
593580

594-
if (!socket) {
581+
if (!client[kHTTPContext]) {
595582
connect(client)
596583
return
597584
}
598585

599-
if (socket.destroyed) {
600-
return
601-
}
602-
603-
if (!client[kHTTPContext]) {
586+
if (client[kHTTPContext].destroyed) {
604587
return
605588
}
606589

0 commit comments

Comments
 (0)