@@ -33,10 +33,15 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi
33
33
try
34
34
@sync begin
35
35
if iofunction === nothing
36
+ # use a lock here for request.context changes (this is currently the only places
37
+ # where multiple threads may modify/change context at the same time)
38
+ lock = ReentrantLock ()
36
39
Threads. @spawn try
37
- writebody (stream, req)
40
+ writebody (stream, req, lock )
38
41
finally
39
- req. context[:write_duration_ms ] = get (req. context, :write_duration_ms , 0.0 ) + ((time () - write_start) * 1000 )
42
+ Base. @lock lock begin
43
+ req. context[:write_duration_ms ] = get (req. context, :write_duration_ms , 0.0 ) + ((time () - write_start) * 1000 )
44
+ end
40
45
@debugv 2 " client closewrite"
41
46
closewrite (stream)
42
47
end
@@ -48,7 +53,9 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi
48
53
readbody (stream, response, decompress)
49
54
end
50
55
finally
51
- req. context[:read_duration_ms ] = get (req. context, :read_duration_ms , 0.0 ) + ((time () - read_start) * 1000 )
56
+ Base. @lock lock begin
57
+ req. context[:read_duration_ms ] = get (req. context, :read_duration_ms , 0.0 ) + ((time () - read_start) * 1000 )
58
+ end
52
59
@debugv 2 " client closeread"
53
60
closeread (stream)
54
61
end
@@ -77,14 +84,16 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi
77
84
return response
78
85
end
79
86
80
- function writebody (stream:: Stream , req:: Request )
87
+ function writebody (stream:: Stream , req:: Request , lock )
81
88
if ! isbytes (req. body)
82
89
n = writebodystream (stream, req. body)
83
90
closebody (stream)
84
91
else
85
92
n = write (stream, req. body)
86
93
end
87
- stream. message. request. context[:nbytes_written ] = n
94
+ Base. @lock lock begin
95
+ req. context[:nbytes_written ] = n
96
+ end
88
97
return n
89
98
end
90
99
0 commit comments