Skip to content

Commit ffc96bc

Browse files
quinnjvtjnash
andauthored
Ensure read/readavailable for BufferStream are threadsafe (#57211)
It looks like these methods were just missed while overloading for BufferStream. There's also `readbytes!` where the current implementation will fallback to the `LibuvStream` implementation that is currently not threadsafe. What's the best approach there since the implementation is quite a bit more involved? Just duplicate the code but for BufferStream? Should we take the BufferStream lock and invoke the LibuvStream method? Open to ideas there. Also open to suggestions for having tests here? Not easy to simulate the data race of writing and calling readavailable. The fix here will unblock JuliaWeb/HTTP.jl#1213 (I'll probably do some compat shim there until this is fully released). Thanks to @oscardssmith for rubber ducking this issue with me. Probably most helpfully reviewed by @vtjnash. --------- Co-authored-by: Jameson Nash <[email protected]>
1 parent 2f0a523 commit ffc96bc

File tree

1 file changed

+58
-0
lines changed

1 file changed

+58
-0
lines changed

base/stream.jl

+58
Original file line numberDiff line numberDiff line change
@@ -1559,6 +1559,64 @@ function wait_readnb(s::BufferStream, nb::Int)
15591559
end
15601560
end
15611561

1562+
function readavailable(this::BufferStream)
1563+
bytes = lock(this.cond) do
1564+
wait_readnb(this, 1)
1565+
buf = this.buffer
1566+
@assert buf.seekable == false
1567+
take!(buf)
1568+
end
1569+
return bytes
1570+
end
1571+
1572+
function read(stream::BufferStream)
1573+
bytes = lock(stream.cond) do
1574+
wait_close(stream)
1575+
take!(stream.buffer)
1576+
end
1577+
return bytes
1578+
end
1579+
1580+
function readbytes!(s::BufferStream, a::Vector{UInt8}, nb::Int)
1581+
sbuf = s.buffer
1582+
@assert sbuf.seekable == false
1583+
@assert sbuf.maxsize >= nb
1584+
1585+
function wait_locked(s, buf, nb)
1586+
while bytesavailable(buf) < nb
1587+
s.readerror === nothing || throw(s.readerror)
1588+
isopen(s) || break
1589+
s.status != StatusEOF || break
1590+
wait_readnb(s, nb)
1591+
end
1592+
end
1593+
1594+
bytes = lock(s.cond) do
1595+
if nb <= SZ_UNBUFFERED_IO # Under this limit we are OK with copying the array from the stream's buffer
1596+
wait_locked(s, sbuf, nb)
1597+
end
1598+
if bytesavailable(sbuf) >= nb
1599+
nread = readbytes!(sbuf, a, nb)
1600+
else
1601+
initsize = length(a)
1602+
newbuf = PipeBuffer(a, maxsize=nb)
1603+
newbuf.size = newbuf.offset # reset the write pointer to the beginning
1604+
nread = try
1605+
s.buffer = newbuf
1606+
write(newbuf, sbuf)
1607+
wait_locked(s, newbuf, nb)
1608+
bytesavailable(newbuf)
1609+
finally
1610+
s.buffer = sbuf
1611+
end
1612+
_take!(a, _unsafe_take!(newbuf))
1613+
length(a) >= initsize || resize!(a, initsize)
1614+
end
1615+
return nread
1616+
end
1617+
return bytes
1618+
end
1619+
15621620
show(io::IO, s::BufferStream) = print(io, "BufferStream(bytes waiting=", bytesavailable(s.buffer), ", isopen=", isopen(s), ")")
15631621

15641622
function readuntil(s::BufferStream, c::UInt8; keep::Bool=false)

0 commit comments

Comments
 (0)