Skip to content

Commit 51e7160

Browse files
committed
Summary of changes in this PR:
* Added a new global `set_connection_limit!` function for controlling the global connection limit that will be applied to all requests This is one way to resolve #1033. I added a deprecation warning when passing `connect_limit` to individual requests. So usage would be: calling `HTTP.set_connection_limit!` and any time this is called, it changes the global value. * Add a try-finally in keepalive! around our global IO lock usage just for good house-keeping * Refactored `try_with_timeout` to use a `Channel` instead of the non-threaded `@async`; it's much simpler, seems cleaner, and allows us to avoid the usage of `@async` when not needed. Note that I included a change in StreamRequest.jl however that wraps all the actual write/read IO operations in a `fetch(@async dostuff())` because this will currently prevent code in this task from migrating across threads, which is important for OpenSSL usage where error handling is done per-thread. I don't love the solution, but it seems ok for now. * I refactored a few of the stream IO functions so that we always know the number of bytes downloaded, whether in memory or written to an IO, so we can log them and use them in verbose logging to give bit-rate calculations * Ok, the big one: I rewrote the internal implementation of ConnectionPool.ConnectionPools.Pod `acquire`/`release` functions; under really heavy workloads, there was a ton of contention on the Pod lock. I also observed at least one "hang" where GDB backtraces seemed to indicate that somehow a task failed/died/hung while trying to make a new connection _while holding the Pod lock_, which then meant that no other requests could ever make progress. The new implementation includes a lock-free "fastpath" where an existing connection that can be re-used doesn't require any lock-taking. It uses a lock-free concurrent Stack implementation copied from JuliaConcurrent/ConcurrentCollections.jl ( doesn't seem actively maintained and it's not much code, so just copied). The rest of the `acquire`/`release` code is now modeled after Base.Event in how releasing always acquires the lock and slow-path acquires also take the lock to ensure fairness and no deadlocks. I've included some benchmark results on a variety of heavy workloads [here](https://everlasting-mahogany-a5f.notion.site/Issue-heavy-load-perf-degradation-1cd275c75037481a9cd6378b8303cfb3) that show some great improvements, a bulk of which are attributable to reducing contention when acquiring/releasing connections during requests. The other key change included in this rewrite is that we ensure we _do not_ hold any locks while _making new connections_ to avoid the possibility of the lock ever getting "stuck", and because it's not necessary: the pod is in charge of just keeping track of numbers and doesn't need to worry about whether the connection was actually made yet or not (if it fails, it will be immediately released back and retried). Overall, the code is also _much_ simpler, which I think is a huge win, because the old code was always pretty scary to have to dig into. * Added a new `logerrors::Bool=false` keyword arg that allows doing `@error` logs on errors that may otherwise be "swallowed" when doing retries; it can be helpful to sometimes be able to at least see what kinds of errors are happening * Added lots of metrics around various time spent in various layers, read vs. write durations, etc.
1 parent 75c1b25 commit 51e7160

25 files changed

+543
-320
lines changed

docs/src/client.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ Many remote web services/APIs have rate limits or throttling in place to avoid b
9797

9898
#### `readtimeout`
9999

100-
After a connection is established and a request is sent, a response is expected. If a non-zero value is passed to the `readtimeout` keyword argument, `HTTP.request` will wait to receive a response that many seconds before throwing an error. Note that for chunked or streaming responses, each chunk/packet of bytes received causes the timeout to reset. Passing `readtimeout = 0` disables any timeout checking and is the default.
100+
After a connection is established and a request is sent, a response is expected. If a non-zero value is passed to the `readtimeout` keyword argument, `HTTP.request` will wait to receive a response that many seconds before throwing an error. Passing `readtimeout = 0` disables any timeout checking and is the default.
101101

102102
### `status_exception`
103103

@@ -150,6 +150,16 @@ Controls the total number of retries that will be attempted. Can also disable al
150150

151151
By default, this keyword argument is `false`, which controls whether non-idempotent requests will be retried (POST or PATCH requests).
152152

153+
#### `retry_delays`
154+
155+
Allows providing a custom `ExponentialBackOff` object to control the delay between retries.
156+
Default is `ExponentialBackOff(n = retries)`.
157+
158+
#### `retry_check`
159+
160+
Allows providing a custom function to control whether a retry should be attempted.
161+
The function should accept 5 arguments: the delay state, exception, request, response (an `HTTP.Response` object *if* a request was successfully made, otherwise `nothing`), and `resp_body` response body (which may be `nothing` if there is no response yet, otherwise a `Vector{UInt8}`), and return `true` if a retry should be attempted. So in traditional nomenclature, the function would have the form `f(s, ex, req, resp, resp_body) -> Bool`.
162+
153163
### Redirect Arguments
154164

155165
#### `redirect`

src/ConnectionPool.jl

Lines changed: 66 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,21 @@ remotely closed, a connection will be reused.
1919
"""
2020
module ConnectionPool
2121

22-
export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout, set_default_connection_limit!
22+
export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout, set_default_connection_limit!, set_connection_limit!
2323

2424
using Sockets, LoggingExtras, NetworkOptions
2525
using MbedTLS: SSLConfig, SSLContext, setup!, associate!, hostname!, handshake!
2626
using MbedTLS, OpenSSL
2727
using ..IOExtras, ..Conditions, ..Exceptions
2828

29-
const default_connection_limit = Ref(8)
3029
const nolimit = typemax(Int)
3130

32-
set_default_connection_limit!(n) = default_connection_limit[] = n
33-
3431
taskid(t=current_task()) = string(hash(t) & 0xffff, base=16, pad=4)
3532

3633
include("connectionpools.jl")
3734
using .ConnectionPools
35+
set_default_connection_limit!(n) = ConnectionPools.connection_limit[] = n
36+
set_connection_limit!(n) = ConnectionPools.connection_limit[] = n
3837

3938
"""
4039
Connection
@@ -364,35 +363,41 @@ or create a new `Connection` if required.
364363
function newconnection(::Type{T},
365364
host::AbstractString,
366365
port::AbstractString;
367-
connection_limit=default_connection_limit[],
366+
connection_limit=nothing,
368367
forcenew::Bool=false,
369368
idle_timeout=typemax(Int),
370369
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
370+
metrics=(connect_duration_ms=Ref(0.0), ssl_connect_duration_ms=Ref(0.0)),
371371
kw...) where {T <: IO}
372372
return acquire(
373373
getpool(T),
374374
(T, host, port, require_ssl_verification, true);
375-
max_concurrent_connections=Int(connection_limit),
375+
max_concurrent_connections=connection_limit,
376376
forcenew=forcenew,
377377
idle_timeout=Int(idle_timeout)) do
378378
Connection(host, port,
379379
idle_timeout, require_ssl_verification,
380380
getconnection(T, host, port;
381-
require_ssl_verification=require_ssl_verification, kw...)
381+
require_ssl_verification=require_ssl_verification, metrics=metrics, kw...)
382382
)
383383
end
384384
end
385385

386-
releaseconnection(c::Connection{T}, reuse) where {T} =
386+
function releaseconnection(c::Connection{T}, reuse) where {T}
387+
c.timestamp = time()
387388
release(getpool(T), connectionkey(c), c; return_for_reuse=reuse)
389+
end
388390

389391
function keepalive!(tcp)
390392
Base.iolock_begin()
391-
Base.check_open(tcp)
392-
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint),
393-
tcp.handle, 1, 1)
394-
Base.uv_error("failed to set keepalive on tcp socket", err)
395-
Base.iolock_end()
393+
try
394+
Base.check_open(tcp)
395+
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint),
396+
tcp.handle, 1, 1)
397+
Base.uv_error("failed to set keepalive on tcp socket", err)
398+
finally
399+
Base.iolock_end()
400+
end
396401
return
397402
end
398403

@@ -418,32 +423,38 @@ function getconnection(::Type{TCPSocket},
418423
keepalive::Bool=true,
419424
connect_timeout::Int=10,
420425
readtimeout::Int=0,
426+
metrics=(connect_duration_ms=Ref(0.0), ssl_connect_duration_ms=Ref(0.0)),
421427
kw...)::TCPSocket
422428

423429
p::UInt = isempty(port) ? UInt(80) : parse(UInt, port)
424430
@debugv 2 "TCP connect: $host:$p..."
425431
addrs = Sockets.getalladdrinfo(host)
426432
connect_timeout = connect_timeout == 0 && readtimeout > 0 ? readtimeout : connect_timeout
427433
lasterr = ErrorException("unknown connection error")
428-
434+
start_time = time()
429435
for addr in addrs
430436
try
431-
return if connect_timeout > 0
437+
if connect_timeout > 0
432438
tcp = Sockets.TCPSocket()
433439
Sockets.connect!(tcp, addr, p)
434-
try_with_timeout(() -> checkconnected(tcp), connect_timeout, () -> close(tcp)) do
435-
Sockets.wait_connected(tcp)
436-
keepalive && keepalive!(tcp)
440+
try
441+
try_with_timeout(connect_timeout) do
442+
Sockets.wait_connected(tcp)
443+
keepalive && keepalive!(tcp)
444+
end
445+
catch
446+
close(tcp)
447+
rethrow()
437448
end
438-
return tcp
439449
else
440450
tcp = Sockets.connect(addr, p)
441451
keepalive && keepalive!(tcp)
442-
tcp
443452
end
453+
return tcp
444454
catch e
445455
lasterr = e isa TimeoutError ? ConnectTimeout(host, port) : e
446-
continue
456+
finally
457+
metrics.connect_duration_ms[] = (time() - start_time) * 1000
447458
end
448459
end
449460
# If no connetion could be set up, to any address, throw last error
@@ -498,7 +509,6 @@ function getconnection(::Type{SSLStream},
498509
host::AbstractString,
499510
port::AbstractString;
500511
kw...)::SSLStream
501-
502512
port = isempty(port) ? "443" : port
503513
@debugv 2 "SSL connect: $host:$port..."
504514
tcp = getconnection(TCPSocket, host, port; kw...)
@@ -508,32 +518,42 @@ end
508518
function sslconnection(::Type{SSLStream}, tcp::TCPSocket, host::AbstractString;
509519
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
510520
sslconfig::OpenSSL.SSLContext=nosslcontext[],
521+
metrics=(connect_duration_ms=Ref(0.0), ssl_connect_duration_ms=Ref(0.0)),
511522
kw...)::SSLStream
512-
if sslconfig === nosslcontext[]
513-
sslconfig = global_sslcontext()
523+
start_time = time()
524+
try
525+
if sslconfig === nosslcontext[]
526+
sslconfig = global_sslcontext()
527+
end
528+
# Create SSL stream.
529+
ssl_stream = SSLStream(sslconfig, tcp)
530+
OpenSSL.hostname!(ssl_stream, host)
531+
OpenSSL.connect(ssl_stream; require_ssl_verification)
532+
return ssl_stream
533+
finally
534+
metrics.ssl_connect_duration_ms[] = (time() - start_time) * 1000
514535
end
515-
# Create SSL stream.
516-
ssl_stream = SSLStream(sslconfig, tcp)
517-
OpenSSL.hostname!(ssl_stream, host)
518-
OpenSSL.connect(ssl_stream; require_ssl_verification)
519-
return ssl_stream
520536
end
521537

522538
function sslconnection(::Type{SSLContext}, tcp::TCPSocket, host::AbstractString;
523539
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
524540
sslconfig::SSLConfig=nosslconfig,
541+
metrics=(connect_duration_ms=Ref(0.0), ssl_connect_duration_ms=Ref(0.0)),
525542
kw...)::SSLContext
526-
527-
if sslconfig === nosslconfig
528-
sslconfig = global_sslconfig(require_ssl_verification)
543+
start_time = time()
544+
try
545+
if sslconfig === nosslconfig
546+
sslconfig = global_sslconfig(require_ssl_verification)
547+
end
548+
io = SSLContext()
549+
setup!(io, sslconfig)
550+
associate!(io, tcp)
551+
hostname!(io, host)
552+
handshake!(io)
553+
return io
554+
finally
555+
metrics.ssl_connect_duration_ms[] = (time() - start_time) * 1000
529556
end
530-
531-
io = SSLContext()
532-
setup!(io, sslconfig)
533-
associate!(io, tcp)
534-
hostname!(io, host)
535-
handshake!(io)
536-
return io
537557
end
538558

539559
function sslupgrade(::Type{IOType}, c::Connection{T},
@@ -545,8 +565,13 @@ function sslupgrade(::Type{IOType}, c::Connection{T},
545565
# if the upgrade fails, an error will be thrown and the original c will be closed
546566
# in ConnectionRequest
547567
tls = if readtimeout > 0
548-
try_with_timeout(() -> shouldtimeout(c, readtimeout), readtimeout, () -> close(c)) do
549-
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)
568+
try
569+
try_with_timeout(readtimeout) do
570+
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)
571+
end
572+
catch
573+
close(c)
574+
rethrow()
550575
end
551576
else
552577
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)

src/Exceptions.jl

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,41 +25,22 @@ macro $(:try)(exes...)
2525
end
2626
end # @eval
2727

28-
function try_with_timeout(f, shouldtimeout, delay, iftimeout=() -> nothing)
29-
@assert delay > 0
30-
cond = Condition()
31-
# execute f async
32-
t = @async try
33-
notify(cond, f())
34-
catch e
35-
@debugv 1 "error executing f in try_with_timeout"
36-
isopen(timer) && notify(cond, e, error = true)
37-
end
38-
# start a timer
39-
timer = Timer(delay; interval=delay / 10) do tm
28+
function try_with_timeout(f, timeout)
29+
ch = Channel(0)
30+
timer = Timer(tm -> close(ch, TimeoutError(timeout)), timeout)
31+
Threads.@spawn begin
4032
try
41-
if shouldtimeout()
42-
@debugv 1 "❗️ Timeout: $delay"
43-
close(tm)
44-
iftimeout()
45-
notify(cond, TimeoutError(delay), error = true)
46-
end
33+
put!(ch, $f())
4734
catch e
48-
@debugv 1 "callback error in try_with_timeout"
49-
close(tm)
50-
notify(cond, e, error = true)
35+
if !(e isa HTTPError)
36+
e = CapturedException(e, catch_backtrace())
37+
end
38+
close(ch, e)
39+
finally
40+
close(timer)
5141
end
5242
end
53-
try
54-
res = wait(cond)
55-
@debugv 1 "try_with_timeout finished with: $res"
56-
res
57-
catch e
58-
@debugv 1 "try_with_timeout failed with: $e"
59-
rethrow()
60-
finally
61-
close(timer)
62-
end
43+
return take!(ch)
6344
end
6445

6546
abstract type HTTPError <: Exception end

src/HTTP.jl

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,32 @@ include("StatusCodes.jl") ;using .StatusCodes
4242
include("Messages.jl") ;using .Messages
4343
include("cookies.jl") ;using .Cookies
4444
include("Streams.jl") ;using .Streams
45+
46+
function observe_layer(f, cntnm, durnm, req, args...; kw...)
47+
start_time = time()
48+
req.context[cntnm] = Base.get(req.context, cntnm, 0) + 1
49+
try
50+
return f(args...; kw...)
51+
finally
52+
req.context[durnm] = Base.get(req.context, durnm, 0) + (time() - start_time) * 1000
53+
# @info "observed layer = $f, count = $(req.context[cntnm]), duration = $(req.context[durnm])"
54+
end
55+
end
56+
57+
function observe_request_layer(f)
58+
nm = nameof(f)
59+
cntnm = Symbol(nm, "_count")
60+
durnm = Symbol(nm, "_duration_ms")
61+
return (req::Request; kw...) -> observe_layer(f, cntnm, durnm, req, req; kw...)
62+
end
63+
64+
function observe_stream_layer(f)
65+
nm = nameof(f)
66+
cntnm = Symbol(nm, "_count")
67+
durnm = Symbol(nm, "_duration_ms")
68+
return (stream::Stream; kw...) -> observe_layer(f, cntnm, durnm, stream.message.request, stream; kw...)
69+
end
70+
4571
include("clientlayers/MessageRequest.jl"); using .MessageRequest
4672
include("clientlayers/RedirectRequest.jl"); using .RedirectRequest
4773
include("clientlayers/DefaultHeadersRequest.jl"); using .DefaultHeadersRequest
@@ -120,8 +146,7 @@ Supported optional keyword arguments:
120146
- `connect_timeout = 10`, close the connection after this many seconds if it
121147
is still attempting to connect. Use `connect_timeout = 0` to disable.
122148
- `connection_limit = 8`, number of concurrent connections allowed to each host:port.
123-
- `readtimeout = 0`, close the connection if no data is received for this many
124-
seconds. Use `readtimeout = 0` to disable.
149+
- `readtimeout = 0`, abort a request after this many seconds. Will trigger retries if applicable. Use `readtimeout = 0` to disable.
125150
- `status_exception = true`, throw `HTTP.StatusError` for response status >= 300.
126151
- Basic authentication is detected automatically from the provided url's `userinfo` (in the form `scheme://user:password@host`)
127152
and adds the `Authorization: Basic` header; this can be disabled by passing `basicauth=false`
@@ -139,7 +164,7 @@ Retry arguments:
139164
- `retry = true`, retry idempotent requests in case of error.
140165
- `retries = 4`, number of times to retry.
141166
- `retry_non_idempotent = false`, retry non-idempotent requests too. e.g. POST.
142-
- `retry_delay = ExponentialBackOff(n = retries)`, provide a custom `ExponentialBackOff` object to control the delay between retries.
167+
- `retry_delays = ExponentialBackOff(n = retries)`, provide a custom `ExponentialBackOff` object to control the delay between retries.
143168
- `retry_check = (s, ex, req, resp, resp_body) -> Bool`, provide a custom function to control whether a retry should be attempted.
144169
The function should accept 5 arguments: the delay state, exception, request, response (an `HTTP.Response` object *if* a request was
145170
successfully made, otherwise `nothing`), and `resp_body` response body (which may be `nothing` if there is no response yet, otherwise
@@ -286,6 +311,7 @@ function request(method, url, h=nothing, b=nobody;
286311
return request(HTTP.stack(), method, url, headers, body, query; kw...)
287312
end
288313

314+
# layers are applied from left to right, i.e. the first layer is the outermost that is called first, which then calls into the second layer, etc.
289315
const STREAM_LAYERS = [timeoutlayer, exceptionlayer]
290316
const REQUEST_LAYERS = [redirectlayer, defaultheaderslayer, basicauthlayer, contenttypedetectionlayer, cookielayer, retrylayer, canonicalizelayer]
291317

@@ -398,10 +424,10 @@ function stack(
398424
inner_stream_layers = streamlayers
399425
outer_stream_layers = ()
400426
end
401-
layers = foldr((x, y) -> x(y), inner_stream_layers, init=streamlayer)
402-
layers2 = foldr((x, y) -> x(y), STREAM_LAYERS, init=layers)
427+
layers = foldr((x, y) -> observe_stream_layer(x(y)), inner_stream_layers, init=observe_stream_layer(streamlayer))
428+
layers2 = foldr((x, y) -> observe_stream_layer(x(y)), STREAM_LAYERS, init=layers)
403429
if !isempty(outer_stream_layers)
404-
layers2 = foldr((x, y) -> x(y), outer_stream_layers, init=layers2)
430+
layers2 = foldr((x, y) -> observe_stream_layer(x(y)), outer_stream_layers, init=layers2)
405431
end
406432
# request layers
407433
# messagelayer must be the 1st/outermost layer to convert initial args to Request
@@ -413,7 +439,7 @@ function stack(
413439
inner_request_layers = requestlayers
414440
outer_request_layers = ()
415441
end
416-
layers3 = foldr((x, y) -> x(y), inner_request_layers; init=connectionlayer(layers2))
442+
layers3 = foldr((x, y) -> x(y), inner_request_layers; init=observe_request_layer(connectionlayer(layers2)))
417443
layers4 = foldr((x, y) -> x(y), REQUEST_LAYERS; init=layers3)
418444
if !isempty(outer_request_layers)
419445
layers4 = foldr((x, y) -> x(y), outer_request_layers, init=layers4)

0 commit comments

Comments
 (0)