Skip to content

Commit 6ccef70

Browse files
committed
Handle streaming vs. chunked
1 parent 98a7aff commit 6ccef70

File tree

2 files changed

+38
-33
lines changed

2 files changed

+38
-33
lines changed

requests/src/requests/Model.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ trait RequestBlob{
7777
object RequestBlob{
7878
object EmptyRequestBlob extends RequestBlob{
7979
def write(out: java.io.OutputStream): Unit = ()
80+
override def contentLength = Some(0)
8081
}
8182

8283
implicit class ByteSourceRequestBlob[T](x: T)(implicit f: T => geny.Writable) extends RequestBlob{

requests/src/requests/Requester.scala

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package requests
22

3-
import java.io.*
4-
import java.net.http.*
5-
import java.net.{UnknownHostException as _, *}
3+
import java.io._
4+
import java.net.http._
5+
import java.net.{UnknownHostException => _, _}
66
import java.nio.ByteBuffer
77
import java.time.Duration
88
import java.util.concurrent.Flow
99
import java.util.function.Supplier
1010
import java.util.zip.{GZIPInputStream, InflaterInputStream}
1111

12-
import scala.collection.JavaConverters.*
12+
import scala.collection.JavaConverters._
1313
import scala.collection.mutable
1414
import scala.concurrent.{ExecutionException, Future}
1515

@@ -200,26 +200,6 @@ case class Requester(verb: String,
200200
.connectTimeout(Duration.ofMillis(connectTimeout))
201201
.build()
202202

203-
val requestBodyInputStream = new PipedInputStream()
204-
205-
val requestBuilder =
206-
HttpRequest.newBuilder()
207-
.uri(url1.toURI)
208-
.method(upperCaseVerb, HttpRequest.BodyPublishers.ofInputStream(new Supplier[InputStream] {
209-
override def get() = requestBodyInputStream
210-
}))
211-
.timeout(Duration.ofMillis(readTimeout))
212-
213-
for ((k, v) <- blobHeaders) requestBuilder.header(k, v)
214-
215-
for ((k, v) <- sess.headers) requestBuilder.header(k, v)
216-
217-
for ((k, v) <- headers) requestBuilder.header(k, v)
218-
219-
for ((k, v) <- compress.headers) requestBuilder.header(k, v)
220-
221-
auth.header.foreach(requestBuilder.header("Authorization", _))
222-
223203
val sessionCookieValues = for {
224204
c <- (sess.cookies ++ cookies).valuesIterator
225205
if !c.hasExpired
@@ -228,20 +208,44 @@ case class Requester(verb: String,
228208
} yield (c.getName, c.getValue)
229209

230210
val allCookies = sessionCookieValues ++ cookieValues
231-
if (allCookies.nonEmpty) {
232-
requestBuilder.header(
233-
"Cookie",
234-
allCookies
211+
212+
val allHeaders =
213+
blobHeaders ++
214+
sess.headers ++
215+
headers ++
216+
compress.headers ++
217+
auth.header.map("Authorization" -> _) ++
218+
(if (allCookies.isEmpty) None
219+
else Some("Cookie" -> allCookies
235220
.map { case (k, v) => s"""$k="$v"""" }
236221
.mkString("; ")
237-
)
238-
}
222+
))
223+
val allHeadersFlat = allHeaders.toList.flatMap { case (k, v) => Seq(k, v) }
224+
225+
val requestBodyInputStream = new PipedInputStream()
226+
val requestBodyOutputStream = new PipedOutputStream(requestBodyInputStream)
227+
228+
val bodyPublisher: HttpRequest.BodyPublisher =
229+
HttpRequest.BodyPublishers.ofInputStream(new Supplier[InputStream] {
230+
override def get() = requestBodyInputStream
231+
})
239232

240-
val request = requestBuilder.build()
233+
val requestBuilder =
234+
HttpRequest.newBuilder()
235+
.uri(url1.toURI)
236+
.timeout(Duration.ofMillis(readTimeout))
237+
.headers(allHeadersFlat: _*)
238+
.method(upperCaseVerb,
239+
data.contentLength match {
240+
case Some(0) => HttpRequest.BodyPublishers.noBody()
241+
case Some(n) if compress == Compress.None => HttpRequest.BodyPublishers.fromPublisher(bodyPublisher, n)
242+
case _ => bodyPublisher
243+
}
244+
)
241245

242-
val fut = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream())
246+
val fut = httpClient.sendAsync(requestBuilder.build(), HttpResponse.BodyHandlers.ofInputStream())
243247

244-
usingOutputStream(compress.wrap(new PipedOutputStream(requestBodyInputStream))) { os =>
248+
usingOutputStream(compress.wrap(requestBodyOutputStream)) { os =>
245249
data.write(os)
246250
}
247251

0 commit comments

Comments
 (0)