1
1
use core:: slice;
2
2
use std:: time:: Duration ;
3
3
4
+ use bytes:: Bytes ;
4
5
use futures:: Future ;
5
6
use http:: { header:: AUTHORIZATION , Method , Request , StatusCode , Uri } ;
6
- use hyper:: { body:: HttpBody , client:: HttpConnector , Body , Client } ;
7
+ use http_body_util:: { combinators:: BoxBody , BodyExt } ;
8
+ use hyper_util:: {
9
+ client:: legacy:: { connect:: HttpConnector , Client } ,
10
+ rt:: TokioExecutor ,
11
+ } ;
7
12
use metrics:: counter;
8
13
use rustc_hash:: FxHashMap ;
9
14
use serde:: Deserialize ;
@@ -87,7 +92,7 @@ impl Channels {
87
92
token : String ,
88
93
ack_settings : AckSettings ,
89
94
) {
90
- let client: Client < HttpConnector , Body > = Client :: builder ( )
95
+ let client = Client :: builder ( TokioExecutor :: new ( ) )
91
96
. retry_canceled_requests ( false )
92
97
. build_http ( ) ;
93
98
@@ -130,7 +135,7 @@ impl<'a, V> Iterator for Iter<'a, V> {
130
135
struct AckService {
131
136
pub ( crate ) ack_uri : Uri ,
132
137
pub ( crate ) token : String ,
133
- pub ( crate ) client : Client < HttpConnector , Body > ,
138
+ pub ( crate ) client : Client < HttpConnector , BoxBody < Bytes , hyper :: Error > > ,
134
139
pub ( crate ) ack_settings : AckSettings ,
135
140
}
136
141
@@ -167,11 +172,11 @@ impl AckService {
167
172
if ack_ids. is_empty( ) {
168
173
debug!( "tick expired with no acks" ) ;
169
174
} else {
170
- let body = Body :: from (
175
+ let body = crate :: full (
171
176
serde_json:: json!( { "acks" : ack_ids. keys( ) . collect:: <Vec <& u64 >>( ) } )
172
177
. to_string( ) ,
173
178
) ;
174
- let request: Request < Body > = Request :: builder( )
179
+ let request = Request :: builder( )
175
180
. method( Method :: POST )
176
181
. uri( self . ack_uri. clone( ) )
177
182
. header( AUTHORIZATION , format!( "Splunk {}" , self . token) )
@@ -191,8 +196,8 @@ impl AckService {
191
196
}
192
197
193
198
async fn ack_request (
194
- client : Client < HttpConnector > ,
195
- request : Request < Body > ,
199
+ client : Client < HttpConnector , BoxBody < Bytes , hyper :: Error > > ,
200
+ request : Request < BoxBody < Bytes , hyper :: Error > > ,
196
201
channel_id : String ,
197
202
ack_ids : & mut FxHashMap < AckId , u64 > ,
198
203
) -> Result < ( ) , Error > {
@@ -202,7 +207,7 @@ async fn ack_request(
202
207
let status = parts. status ;
203
208
counter ! ( "ack_status_request_ok" , "channel_id" => channel_id. clone( ) , "status" => status. to_string( ) ) . increment ( 1 ) ;
204
209
if status == StatusCode :: OK {
205
- let body = body. collect ( ) . await ?. to_bytes ( ) ;
210
+ let body = body. boxed ( ) . collect ( ) . await ?. to_bytes ( ) ;
206
211
let ack_status = serde_json:: from_slice :: < HecAckStatusResponse > ( & body) ?;
207
212
208
213
let mut ack_ids_acked: u32 = 0 ;
0 commit comments