Skip to content

Commit ee6fe6b

Browse files
committed
Summary of changes in this PR:
* Added a new global `set_connection_limit!` function for controlling the global connection limit that will be applied to all requests This is one way to resolve #1033. I added a deprecation warning when passing `connect_limit` to individual requests. So usage would be: calling `HTTP.set_connection_limit!` and any time this is called, it changes the global value. * Add a try-finally in keepalive! around our global IO lock usage just for good house-keeping * Refactored `try_with_timeout` to use a `Threads.Condition` and `Threads.@spawn` instead of the non-threaded versions; seems cleaner and allows us to avoid the usage of `@async` when not needed. Note that I included a change in StreamRequest.jl however that wraps all the actual write/read IO operations in a `fetch(@async dostuff())` because this will currently prevent code in this task from migrating across threads, which is important for OpenSSL usage where error handling is done per-thread. I don't love the solution, but it seems ok for now. * I refactored a few of the stream IO functions so that we always know the number of bytes downloaded, whether in memory or written to an IO, so we can log them and use them in verbose logging to give bit-rate calculations * Ok, the big one: I rewrote the internal implementation of ConnectionPool.ConnectionPools.Pod `acquire`/`release` functions; under really heavy workloads, there was a ton of contention on the Pod lock. I also observed at least one "hang" where GDB backtraces seemed to indicate that somehow a task failed/died/hung while trying to make a new connection _while holding the Pod lock_, which then meant that no other requests could ever make progress. The new implementation includes a lock-free "fastpath" where an existing connection that can be re-used doesn't require any lock-taking. It uses a lock-free concurrent Stack implementation copied from JuliaConcurrent/ConcurrentCollections.jl ( doesn't seem actively maintained and it's not much code, so just copied). The rest of the `acquire`/`release` code is now modeled after Base.Event in how releasing always acquires the lock and slow-path acquires also take the lock to ensure fairness and no deadlocks. I've included some benchmark results on a variety of heavy workloads [here](https://everlasting-mahogany-a5f.notion.site/Issue-heavy-load-perf-degradation-1cd275c75037481a9cd6378b8303cfb3) that show some great improvements, a bulk of which are attributable to reducing contention when acquiring/releasing connections during requests. The other key change included in this rewrite is that we ensure we _do not_ hold any locks while _making new connections_ to avoid the possibility of the lock ever getting "stuck", and because it's not necessary: the pod is in charge of just keeping track of numbers and doesn't need to worry about whether the connection was actually made yet or not (if it fails, it will be immediately released back and retried). Overall, the code is also _much_ simpler, which I think is a huge win, because the old code was always pretty scary to have to dig into.
1 parent 75c1b25 commit ee6fe6b

12 files changed

+279
-245
lines changed

src/ConnectionPool.jl

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,21 @@ remotely closed, a connection will be reused.
1919
"""
2020
module ConnectionPool
2121

22-
export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout, set_default_connection_limit!
22+
export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout, set_default_connection_limit!, set_connection_limit!
2323

2424
using Sockets, LoggingExtras, NetworkOptions
2525
using MbedTLS: SSLConfig, SSLContext, setup!, associate!, hostname!, handshake!
2626
using MbedTLS, OpenSSL
2727
using ..IOExtras, ..Conditions, ..Exceptions
2828

29-
const default_connection_limit = Ref(8)
3029
const nolimit = typemax(Int)
3130

32-
set_default_connection_limit!(n) = default_connection_limit[] = n
33-
3431
taskid(t=current_task()) = string(hash(t) & 0xffff, base=16, pad=4)
3532

3633
include("connectionpools.jl")
3734
using .ConnectionPools
35+
set_default_connection_limit!(n) = ConnectionPools.connection_limit[] = n
36+
set_connection_limit!(n) = ConnectionPools.connection_limit[] = n
3837

3938
"""
4039
Connection
@@ -364,15 +363,15 @@ or create a new `Connection` if required.
364363
function newconnection(::Type{T},
365364
host::AbstractString,
366365
port::AbstractString;
367-
connection_limit=default_connection_limit[],
366+
connection_limit=nothing,
368367
forcenew::Bool=false,
369368
idle_timeout=typemax(Int),
370369
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
371370
kw...) where {T <: IO}
372371
return acquire(
373372
getpool(T),
374373
(T, host, port, require_ssl_verification, true);
375-
max_concurrent_connections=Int(connection_limit),
374+
max_concurrent_connections=connection_limit,
376375
forcenew=forcenew,
377376
idle_timeout=Int(idle_timeout)) do
378377
Connection(host, port,
@@ -383,16 +382,21 @@ function newconnection(::Type{T},
383382
end
384383
end
385384

386-
releaseconnection(c::Connection{T}, reuse) where {T} =
385+
function releaseconnection(c::Connection{T}, reuse) where {T}
386+
c.timestamp = time()
387387
release(getpool(T), connectionkey(c), c; return_for_reuse=reuse)
388+
end
388389

389390
function keepalive!(tcp)
390391
Base.iolock_begin()
391-
Base.check_open(tcp)
392-
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint),
393-
tcp.handle, 1, 1)
394-
Base.uv_error("failed to set keepalive on tcp socket", err)
395-
Base.iolock_end()
392+
try
393+
Base.check_open(tcp)
394+
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint),
395+
tcp.handle, 1, 1)
396+
Base.uv_error("failed to set keepalive on tcp socket", err)
397+
finally
398+
Base.iolock_end()
399+
end
396400
return
397401
end
398402

@@ -431,9 +435,14 @@ function getconnection(::Type{TCPSocket},
431435
return if connect_timeout > 0
432436
tcp = Sockets.TCPSocket()
433437
Sockets.connect!(tcp, addr, p)
434-
try_with_timeout(() -> checkconnected(tcp), connect_timeout, () -> close(tcp)) do
435-
Sockets.wait_connected(tcp)
436-
keepalive && keepalive!(tcp)
438+
try
439+
try_with_timeout(connect_timeout) do
440+
Sockets.wait_connected(tcp)
441+
keepalive && keepalive!(tcp)
442+
end
443+
catch
444+
close(tcp)
445+
rethrow()
437446
end
438447
return tcp
439448
else
@@ -545,8 +554,13 @@ function sslupgrade(::Type{IOType}, c::Connection{T},
545554
# if the upgrade fails, an error will be thrown and the original c will be closed
546555
# in ConnectionRequest
547556
tls = if readtimeout > 0
548-
try_with_timeout(() -> shouldtimeout(c, readtimeout), readtimeout, () -> close(c)) do
549-
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)
557+
try
558+
try_with_timeout(readtimeout) do
559+
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)
560+
end
561+
catch
562+
close(c)
563+
rethrow()
550564
end
551565
else
552566
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)

src/Exceptions.jl

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,41 +25,19 @@ macro $(:try)(exes...)
2525
end
2626
end # @eval
2727

28-
function try_with_timeout(f, shouldtimeout, delay, iftimeout=() -> nothing)
29-
@assert delay > 0
30-
cond = Condition()
31-
# execute f async
32-
t = @async try
33-
notify(cond, f())
34-
catch e
35-
@debugv 1 "error executing f in try_with_timeout"
36-
isopen(timer) && notify(cond, e, error = true)
37-
end
38-
# start a timer
39-
timer = Timer(delay; interval=delay / 10) do tm
28+
function try_with_timeout(f, timeout)
29+
ch = Channel(0)
30+
timer = Timer(tm -> close(ch, TimeoutError(timeout)), timeout)
31+
Threads.@spawn begin
4032
try
41-
if shouldtimeout()
42-
@debugv 1 "❗️ Timeout: $delay"
43-
close(tm)
44-
iftimeout()
45-
notify(cond, TimeoutError(delay), error = true)
46-
end
33+
put!(ch, $f())
4734
catch e
48-
@debugv 1 "callback error in try_with_timeout"
49-
close(tm)
50-
notify(cond, e, error = true)
35+
close(ch, CapturedException(e, catch_backtrace()))
36+
finally
37+
close(timer)
5138
end
5239
end
53-
try
54-
res = wait(cond)
55-
@debugv 1 "try_with_timeout finished with: $res"
56-
res
57-
catch e
58-
@debugv 1 "try_with_timeout failed with: $e"
59-
rethrow()
60-
finally
61-
close(timer)
62-
end
40+
return take!(ch)
6341
end
6442

6543
abstract type HTTPError <: Exception end

src/Streams.jl

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ end
281281

282282
@noinline function bufcheck(buf::Base.GenericIOBuffer, n)
283283
requested_buffer_capacity = (buf.append ? buf.size : (buf.ptr - 1)) + n
284-
requested_buffer_capacity > length(buf.data) && throw(ArgumentError("Unable to grow response stream IOBuffer large enough for response body size"))
284+
requested_buffer_capacity > length(buf.data) && throw(ArgumentError("Unable to grow response stream IOBuffer $(length(buf.data)) large enough for response body size: $requested_buffer_capacity"))
285285
end
286286

287287
function Base.readbytes!(http::Stream, buf::Base.GenericIOBuffer, n=bytesavailable(http))
@@ -299,19 +299,25 @@ function Base.readbytes!(http::Stream, buf::Base.GenericIOBuffer, n=bytesavailab
299299
return n
300300
end
301301

302-
Base.read(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer()) = take!(readall!(http, buf))
302+
function Base.read(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer())
303+
readall!(http, buf)
304+
return take!(buf)
305+
end
303306

304307
function readall!(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer())
308+
n = 0
305309
if ntoread(http) == unknown_length
306310
while !eof(http)
307-
readbytes!(http, buf)
311+
n += readbytes!(http, buf)
308312
end
309313
else
314+
# even if we know the length, we still need to read until eof
315+
# because Transfer-Encoding: chunked comes in piece-by-piece
310316
while !eof(http)
311317
readbytes!(http, buf, ntoread(http))
312318
end
313319
end
314-
return buf
320+
return n
315321
end
316322

317323
function Base.readuntil(http::Stream, f::Function)::ByteView

src/clientlayers/ConnectionRequest.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ function connectionlayer(handler)
9191
target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail
9292
end
9393
r = if readtimeout > 0
94-
try_with_timeout(() -> shouldtimeout(io, readtimeout), readtimeout, () -> close(io)) do
94+
try_with_timeout(readtimeout) do
9595
connect_tunnel(io, target_url, req)
9696
end
9797
else
@@ -110,6 +110,9 @@ function connectionlayer(handler)
110110
stream = Stream(req.response, io)
111111
return handler(stream; readtimeout=readtimeout, kw...)
112112
catch e
113+
if e isa TaskFailedException
114+
e = e.task.result
115+
end
113116
@debugv 1 "❗️ ConnectionLayer $e. Closing: $io"
114117
shouldreuse = false
115118
@try Base.IOError close(io)

src/clientlayers/MessageRequest.jl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ using URIs
44
using ..IOExtras, ..Messages, ..Parsers, ..Exceptions
55
using ..Messages, ..Parsers
66
using ..Strings: HTTPVersion
7+
using LoggingExtras
78

89
export messagelayer
910

@@ -23,6 +24,7 @@ function messagelayer(handler)
2324
return function(method::String, url::URI, headers, body; copyheaders::Bool=true, response_stream=nothing, http_version=HTTPVersion(1, 1), kw...)
2425
req = Request(method, resource(url), mkreqheaders(headers, copyheaders), body; url=url, version=http_version, responsebody=response_stream)
2526
local resp
27+
start_time = time()
2628
try
2729
resp = handler(req; response_stream=response_stream, kw...)
2830
catch e
@@ -38,6 +40,12 @@ function messagelayer(handler)
3840
write(resp.body, resp.request.context[:response_body])
3941
end
4042
end
43+
if @isdefined(resp)
44+
end_time = time()
45+
bytes = get(resp.request.context, :nbytes, 0)
46+
mbits_per_second = bytes == 0 ? 0 : (((8 * bytes) / 1e9) / (end_time - start_time))
47+
@debugv 1 "Request complete with bandwidth: $(mbits_per_second) Gbps"
48+
end
4149
end
4250
end
4351
end

src/clientlayers/RetryRequest.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ function retrylayer(handler)
7777
end
7878

7979
isrecoverable(ex) = true
80+
isrecoverable(ex::ConnectError) = ex.error isa Sockets.DNSError && ex.error.code == EAI_AGAIN ? false : true
8081
isrecoverable(ex::StatusError) = retryable(ex.status)
8182

8283
function _retry_check(s, ex, req, check)

src/clientlayers/StreamRequest.jl

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ immediately so that the transmission can be aborted if the `Response` status
1717
indicates that the server does not wish to receive the message body.
1818
[RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5).
1919
"""
20-
function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, kw...)::Response
20+
function streamlayer(stream::Stream; kw...)::Response
21+
return fetch(@async _streamlayer(stream; kw...))
22+
end
23+
24+
function _streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, kw...)::Response
2125
response = stream.message
2226
req = response.request
2327
io = stream.stream
@@ -132,6 +136,7 @@ end
132136
const IOBuffers = Union{IOBuffer, Base.GenericIOBuffer{SubArray{UInt8, 1, Vector{UInt8}, Tuple{UnitRange{Int64}}, true}}}
133137

134138
function readbody!(stream::Stream, res::Response, buf_or_stream)
139+
n = 0
135140
if !iserror(res)
136141
if isbytes(res.body)
137142
if length(res.body) > 0
@@ -144,29 +149,31 @@ function readbody!(stream::Stream, res::Response, buf_or_stream)
144149
# if it's a BufferStream, the response body was gzip encoded
145150
# so using the default write is fastest because it utilizes
146151
# readavailable under the hood, for which BufferStream is optimized
147-
write(body, buf_or_stream)
152+
n = write(body, buf_or_stream)
148153
elseif buf_or_stream isa Stream
149154
# for HTTP.Stream, there's already an optimized read method
150155
# that just needs an IOBuffer to write into
151-
readall!(buf_or_stream, body)
156+
n = readall!(buf_or_stream, body)
152157
else
153158
error("unreachable")
154159
end
155160
else
156161
res.body = read(buf_or_stream)
162+
n = length(res.body)
157163
end
158164
elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream
159165
# optimization for IOBuffer response_stream to avoid temporary allocations
160-
readall!(buf_or_stream, res.body)
166+
n = readall!(buf_or_stream, res.body)
161167
else
162-
write(res.body, buf_or_stream)
168+
n = write(res.body, buf_or_stream)
163169
end
164170
else
165171
# read the response body into the request context so that it can be
166172
# read by the user if they want to or set later if
167173
# we end up not retrying/redirecting/etc.
168174
res.request.context[:response_body] = read(buf_or_stream)
169175
end
176+
res.request.context[:nbytes] = n
170177
end
171178

172179
end # module StreamRequest

src/clientlayers/TimeoutRequest.jl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ function timeoutlayer(handler)
1616
# skip
1717
return handler(stream; kw...)
1818
end
19-
io = stream.stream
20-
return try_with_timeout(() -> shouldtimeout(io, readtimeout), readtimeout, () -> close(io)) do
19+
return try_with_timeout(readtimeout) do
2120
handler(stream; kw...)
2221
end
2322
end

src/concurrentstack.jl

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
@static if VERSION < v"1.7"
2+
3+
mutable struct Node{T}
4+
value::T
5+
next::Union{Node{T},Nothing}
6+
end
7+
8+
Node{T}(value::T) where {T} = Node{T}(value, nothing)
9+
10+
mutable struct ConcurrentStack{T}
11+
lock::ReentrantLock
12+
next::Union{Node{T},Nothing}
13+
end
14+
15+
ConcurrentStack{T}() where {T} = ConcurrentStack{T}(ReentrantLock(), nothing)
16+
17+
function Base.push!(stack::ConcurrentStack{T}, v) where {T}
18+
v === nothing && throw(ArgumentError("cannot push nothing onto a ConcurrentStack"))
19+
v = convert(T, v)
20+
node = Node{T}(v)
21+
lock(stack.lock) do
22+
node.next = stack.next
23+
stack.next = node
24+
end
25+
return stack
26+
end
27+
28+
function Base.pop!(stack::ConcurrentStack)
29+
lock(stack.lock) do
30+
node = stack.next
31+
node === nothing && return nothing
32+
stack.next = node.next
33+
return node.value
34+
end
35+
end
36+
37+
else
38+
39+
mutable struct Node{T}
40+
value::T
41+
@atomic next::Union{Node{T},Nothing}
42+
end
43+
44+
Node{T}(value::T) where {T} = Node{T}(value, nothing)
45+
46+
mutable struct ConcurrentStack{T}
47+
@atomic next::Union{Node{T},Nothing}
48+
end
49+
50+
ConcurrentStack{T}() where {T} = ConcurrentStack{T}(nothing)
51+
52+
function Base.push!(stack::ConcurrentStack{T}, v) where {T}
53+
v === nothing && throw(ArgumentError("cannot push nothing onto a ConcurrentStack"))
54+
v = convert(T, v)
55+
node = Node{T}(v)
56+
next = @atomic stack.next
57+
while true
58+
@atomic node.next = next
59+
next, ok = @atomicreplace(stack.next, next => node)
60+
ok && break
61+
end
62+
return stack
63+
end
64+
65+
function Base.pop!(stack::ConcurrentStack)
66+
while true
67+
node = @atomic stack.next
68+
node === nothing && return nothing
69+
next = @atomic node.next
70+
next, ok = @atomicreplace(stack.next, node => next)
71+
ok && return node.value
72+
end
73+
end
74+
75+
end # @static if VERSION < v"1.7"

0 commit comments

Comments
 (0)