Skip to content

Commit 91c5a7b

Browse files
committed
Lading use workspace hyper
This commit adjusts the lading crate to use the workspace hyper with backports and deprecation warnings enabled. Signed-off-by: Brian L. Troutwine <[email protected]>
1 parent 8356ad3 commit 91c5a7b

File tree

8 files changed

+22
-20
lines changed

8 files changed

+22
-20
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ uuid = { version = "1.11", default-features = false, features = [
3131
"serde",
3232
] }
3333
once_cell = { version = "1.20" }
34+
hyper = { version = "0.14", default-features = false }
3435

3536
[profile.release]
3637
lto = true # Optimize our binary at link stage.

integration/ducks/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ publish = false
1111
anyhow = "1.0"
1212
bytes = { workspace = true }
1313
entropy = "0.4"
14-
hyper = { version = "0.14", features = ["server", "backports", "deprecated"] }
14+
hyper = { workspace = true, features = ["server", "backports", "deprecated"] }
1515
once_cell = { workspace = true }
1616
shared = { path = "../shared" }
1717
sketches-ddsketch = "0.3"

lading/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ futures = "0.3.31"
3737
fuser = { version = "0.15", optional = true }
3838
http = "0.2"
3939
http-serde = "1.1"
40-
hyper = { version = "0.14", features = ["client"] }
40+
hyper = { workspace = true, features = ["client", "backports", "deprecated"] }
4141
is_executable = "1.0.4"
4242
metrics = { workspace = true }
4343
metrics-exporter-prometheus = { workspace = true }

lading/src/blackhole/http.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use std::{net::SocketAddr, time::Duration};
1010

1111
use http::{header::InvalidHeaderValue, status::InvalidStatusCode, HeaderMap};
1212
use hyper::{
13-
body, header,
13+
body::HttpBody,
14+
header,
1415
server::conn::{AddrIncoming, AddrStream},
1516
service::{make_service_fn, service_fn},
1617
Body, Request, Response, Server, StatusCode,
@@ -137,7 +138,7 @@ async fn srv(
137138

138139
let (parts, body) = req.into_parts();
139140

140-
let bytes = body::to_bytes(body).await?;
141+
let bytes = body.collect().await?.to_bytes();
141142

142143
match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) {
143144
Err(response) => Ok(response),

lading/src/blackhole/splunk_hec.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use std::{
1616
};
1717

1818
use hyper::{
19-
body, header,
19+
body::HttpBody,
20+
header,
2021
server::conn::{AddrIncoming, AddrStream},
2122
service::{make_service_fn, service_fn},
2223
Body, Method, Request, Response, Server, StatusCode,
@@ -93,7 +94,7 @@ async fn srv(
9394
metrics::counter!("requests_received", &*labels).increment(1);
9495

9596
let (parts, body) = req.into_parts();
96-
let bytes = body::to_bytes(body).await?;
97+
let bytes = body.collect().await?.to_bytes();
9798

9899
match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) {
99100
Err(response) => Ok(response),

lading/src/blackhole/sqs.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
use std::{fmt::Write, net::SocketAddr};
1010

1111
use hyper::{
12-
body,
12+
body::HttpBody,
1313
server::conn::{AddrIncoming, AddrStream},
1414
service::{make_service_fn, service_fn},
1515
Body, Request, Response, Server, StatusCode,
@@ -243,7 +243,7 @@ async fn srv(
243243
) -> Result<Response<Body>, Error> {
244244
requests_received.increment(1);
245245

246-
let bytes = body::to_bytes(req).await?;
246+
let bytes = req.collect().await?.to_bytes();
247247
bytes_received.increment(bytes.len() as u64);
248248

249249
let action: Action = serde_qs::from_bytes(&bytes)?;

lading/src/generator/splunk_hec.rs

+9-10
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@
1616
1717
mod acknowledgements;
1818

19-
use std::{num::NonZeroU32, thread, time::Duration};
19+
use std::{future::ready, num::NonZeroU32, thread, time::Duration};
2020

2121
use acknowledgements::Channels;
2222
use byte_unit::ByteError;
2323
use http::{
2424
header::{AUTHORIZATION, CONTENT_LENGTH},
2525
Method, Request, Uri,
2626
};
27-
use hyper::{client::HttpConnector, Body, Client};
27+
use hyper::{body::HttpBody, client::HttpConnector, Body, Client};
2828
use lading_throttle::Throttle;
2929
use metrics::{counter, gauge};
3030
use once_cell::sync::OnceCell;
@@ -120,6 +120,9 @@ pub enum Error {
120120
/// Wrapper around [`acknowledgements::Error`]
121121
#[error(transparent)]
122122
Acknowledge(#[from] acknowledgements::Error),
123+
/// Wrapper around [`hyper::Error`].
124+
#[error("HTTP error: {0}")]
125+
Hyper(#[from] hyper::Error),
123126
}
124127

125128
/// Defines a task that emits variant lines to a Splunk HEC server controlling
@@ -337,14 +340,10 @@ async fn send_hec_request(
337340
let mut status_labels = labels.clone();
338341
status_labels.push(("status_code".to_string(), status.as_u16().to_string()));
339342
counter!("request_ok", &status_labels).increment(1);
340-
channel
341-
.send(async {
342-
let body_bytes = hyper::body::to_bytes(body).await.expect("unable to convert response body to bytes");
343-
let hec_ack_response =
344-
serde_json::from_slice::<HecAckResponse>(&body_bytes).expect("unable to parse response body");
345-
hec_ack_response.ack_id
346-
})
347-
.await?;
343+
let body_bytes = body.collect().await?.to_bytes();
344+
let hec_ack_response =
345+
serde_json::from_slice::<HecAckResponse>(&body_bytes).expect("unable to parse response body");
346+
channel.send(ready(hec_ack_response.ack_id)).await?;
348347
}
349348
Err(err) => {
350349
let mut error_labels = labels.clone();

lading/src/generator/splunk_hec/acknowledgements.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::time::Duration;
33

44
use futures::Future;
55
use http::{header::AUTHORIZATION, Method, Request, StatusCode, Uri};
6-
use hyper::{client::HttpConnector, Body, Client};
6+
use hyper::{body::HttpBody, client::HttpConnector, Body, Client};
77
use metrics::counter;
88
use rustc_hash::FxHashMap;
99
use serde::Deserialize;
@@ -202,7 +202,7 @@ async fn ack_request(
202202
let status = parts.status;
203203
counter!("ack_status_request_ok", "channel_id" => channel_id.clone(), "status" => status.to_string()).increment(1);
204204
if status == StatusCode::OK {
205-
let body = hyper::body::to_bytes(body).await?;
205+
let body = body.collect().await?.to_bytes();
206206
let ack_status = serde_json::from_slice::<HecAckStatusResponse>(&body)?;
207207

208208
let mut ack_ids_acked: u32 = 0;

0 commit comments

Comments
 (0)