Skip to content

Commit 044d8c8

Browse files
committed
checkpoint
Signed-off-by: Brian L. Troutwine <[email protected]>
1 parent 8316502 commit 044d8c8

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

lading/src/generator/splunk_hec/acknowledgements.rs

+13-8
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
use core::slice;
22
use std::time::Duration;
33

4+
use bytes::Bytes;
45
use futures::Future;
56
use http::{header::AUTHORIZATION, Method, Request, StatusCode, Uri};
6-
use hyper::{body::HttpBody, client::HttpConnector, Body, Client};
7+
use http_body_util::{combinators::BoxBody, BodyExt};
8+
use hyper_util::{
9+
client::legacy::{connect::HttpConnector, Client},
10+
rt::TokioExecutor,
11+
};
712
use metrics::counter;
813
use rustc_hash::FxHashMap;
914
use serde::Deserialize;
@@ -87,7 +92,7 @@ impl Channels {
8792
token: String,
8893
ack_settings: AckSettings,
8994
) {
90-
let client: Client<HttpConnector, Body> = Client::builder()
95+
let client = Client::builder(TokioExecutor::new())
9196
.retry_canceled_requests(false)
9297
.build_http();
9398

@@ -130,7 +135,7 @@ impl<'a, V> Iterator for Iter<'a, V> {
130135
struct AckService {
131136
pub(crate) ack_uri: Uri,
132137
pub(crate) token: String,
133-
pub(crate) client: Client<HttpConnector, Body>,
138+
pub(crate) client: Client<HttpConnector, BoxBody<Bytes, hyper::Error>>,
134139
pub(crate) ack_settings: AckSettings,
135140
}
136141

@@ -167,11 +172,11 @@ impl AckService {
167172
if ack_ids.is_empty() {
168173
debug!("tick expired with no acks");
169174
} else {
170-
let body = Body::from(
175+
let body = crate::full(
171176
serde_json::json!({ "acks": ack_ids.keys().collect::<Vec<&u64>>() })
172177
.to_string(),
173178
);
174-
let request: Request<Body> = Request::builder()
179+
let request = Request::builder()
175180
.method(Method::POST)
176181
.uri(self.ack_uri.clone())
177182
.header(AUTHORIZATION, format!("Splunk {}", self.token))
@@ -191,8 +196,8 @@ impl AckService {
191196
}
192197

193198
async fn ack_request(
194-
client: Client<HttpConnector>,
195-
request: Request<Body>,
199+
client: Client<HttpConnector, BoxBody<Bytes, hyper::Error>>,
200+
request: Request<BoxBody<Bytes, hyper::Error>>,
196201
channel_id: String,
197202
ack_ids: &mut FxHashMap<AckId, u64>,
198203
) -> Result<(), Error> {
@@ -202,7 +207,7 @@ async fn ack_request(
202207
let status = parts.status;
203208
counter!("ack_status_request_ok", "channel_id" => channel_id.clone(), "status" => status.to_string()).increment(1);
204209
if status == StatusCode::OK {
205-
let body = body.collect().await?.to_bytes();
210+
let body = body.boxed().collect().await?.to_bytes();
206211
let ack_status = serde_json::from_slice::<HecAckStatusResponse>(&body)?;
207212

208213
let mut ack_ids_acked: u32 = 0;

0 commit comments

Comments
 (0)