@@ -24,7 +24,12 @@ use http::{
24
24
header:: { AUTHORIZATION , CONTENT_LENGTH } ,
25
25
Method , Request , Uri ,
26
26
} ;
27
- use hyper_util:: client:: legacy:: { Client , ClientBuilder , HttpConnector } ;
27
+ use http_body_util:: BodyExt ;
28
+ use hyper:: body:: Body ;
29
+ use hyper_util:: {
30
+ client:: legacy:: { connect:: HttpConnector , Client } ,
31
+ rt:: TokioExecutor ,
32
+ } ;
28
33
use lading_throttle:: Throttle ;
29
34
use metrics:: { counter, gauge} ;
30
35
use once_cell:: sync:: OnceCell ;
@@ -248,7 +253,7 @@ impl SplunkHec {
248
253
/// Function will panic if it is unable to create HTTP requests for the
249
254
/// target.
250
255
pub async fn spin ( mut self ) -> Result < ( ) , Error > {
251
- let client: Client < HttpConnector , Body > = Client :: builder ( )
256
+ let client = Client :: builder ( TokioExecutor :: new ( ) )
252
257
. pool_max_idle_per_host ( self . parallel_connections as usize )
253
258
. retry_canceled_requests ( false )
254
259
. build_http ( ) ;
@@ -283,10 +288,10 @@ impl SplunkHec {
283
288
let uri = uri. clone( ) ;
284
289
285
290
let blk = rcv. next( ) . await . expect( "failed to advance through blocks" ) ; // actually advance through the blocks
286
- let body = Body :: from ( blk. bytes. clone( ) ) ;
291
+ let body = crate :: full ( blk. bytes. clone( ) ) ;
287
292
let block_length = blk. bytes. len( ) ;
288
293
289
- let request: Request < Body > = Request :: builder( )
294
+ let request = Request :: builder( )
290
295
. method( Method :: POST )
291
296
. uri( uri)
292
297
. header( AUTHORIZATION , format!( "Splunk {}" , self . token) )
@@ -317,15 +322,20 @@ impl SplunkHec {
317
322
}
318
323
}
319
324
320
- async fn send_hec_request (
325
+ async fn send_hec_request < B > (
321
326
permit : SemaphorePermit < ' _ > ,
322
327
block_length : usize ,
323
328
labels : Vec < ( String , String ) > ,
324
329
channel : Channel ,
325
- client : Client < HttpConnector > ,
326
- request : Request < Body > ,
330
+ client : Client < HttpConnector , B > ,
331
+ request : Request < B > ,
327
332
shutdown : lading_signal:: Watcher ,
328
- ) -> Result < ( ) , Error > {
333
+ ) -> Result < ( ) , Error >
334
+ where
335
+ B : Body + Send + ' static + Unpin ,
336
+ B :: Data : Send ,
337
+ B :: Error : Into < Box < dyn std:: error:: Error + Send + Sync > > ,
338
+ {
329
339
counter ! ( "requests_sent" , & labels) . increment ( 1 ) ;
330
340
let work = client. request ( request) ;
331
341
@@ -340,7 +350,7 @@ async fn send_hec_request(
340
350
let mut status_labels = labels. clone( ) ;
341
351
status_labels. push( ( "status_code" . to_string( ) , status. as_u16( ) . to_string( ) ) ) ;
342
352
counter!( "request_ok" , & status_labels) . increment( 1 ) ;
343
- let body_bytes = body. collect( ) . await ?. to_bytes( ) ;
353
+ let body_bytes = body. boxed ( ) . collect( ) . await ?. to_bytes( ) ;
344
354
let hec_ack_response =
345
355
serde_json:: from_slice:: <HecAckResponse >( & body_bytes) . expect( "unable to parse response body" ) ;
346
356
channel. send( ready( hec_ack_response. ack_id) ) . await ?;
0 commit comments