diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc1bfde..6038193 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,8 +23,15 @@ jobs: - uses: Swatinem/rust-cache@v2 with: key: windows-latest-test + - uses: cargo-bins/cargo-binstall@main + - name: Test Setup + run: | + make certs + cargo binstall http-server - name: Test - run: cargo test --features task,subscriber,fixture,task_unstable,io,sync,future,net,tls,rust_tls,timer,fs,zero_copy,mmap,retry + run: | + Start-Process cmd -Args /c,"http-server --tls --tls-key certs/test-certs/server.key --tls-cert certs/test-certs/server.crt --tls-key-algorithm pkcs8" + cargo test --features task,subscriber,fixture,task_unstable,io,sync,future,net,tls,rust_tls,timer,fs,zero_copy,mmap,retry,http-client-json,__skip-http-client-cert-verification test: name: Check ${{ matrix.check }} on (${{ matrix.os }}) runs-on: ${{ matrix.os }} @@ -54,6 +61,7 @@ jobs: - uses: Swatinem/rust-cache@v2 with: key: ${{ matrix.os }}-${{ matrix.check }} + - uses: cargo-bins/cargo-binstall@main - name: Clippy if: ${{ matrix.check == 'clippy' }} run: make check-clippy @@ -68,7 +76,7 @@ jobs: if: ${{ matrix.check == 'test' && matrix.os == 'macos-latest'}} timeout-minutes: 15 # macos-latest is by default on openssl 1.x - run: make PFX_OPTS="" test-all + run: make PFX_OPTS="" CERT_OPTS=cert-patch-macos test-all - name: cargo audit if: ${{ matrix.check == 'audit' }} timeout-minutes: 15 @@ -77,7 +85,7 @@ jobs: token: ${{ secrets.GITHUB_TOKEN }} wasm_test: - name: ${{ matrix.test.name }} on (${{ matrix.os }}) + name: Wasm test on (${{ matrix.os }}) runs-on: ${{ matrix.os }} strategy: # fail-fast: false @@ -98,6 +106,7 @@ jobs: toolchain: ${{ matrix.rust }} profile: minimal target: wasm32-unknown-unknown + - uses: cargo-bins/cargo-binstall@main - name: Run Wasm Test on Linux if: matrix.test == 'linux-wasm' run: | diff --git a/Cargo.lock b/Cargo.lock index fb2f17f..526ba39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,16 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -172,6 +182,7 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" dependencies = [ + "async-attributes", "async-channel", "async-global-executor", "async-io", @@ -416,6 +427,7 @@ dependencies = [ "futures-lite", "futures-timer", "futures-util", + "http", "hyper", "lazy_static", "log", @@ -423,12 +435,15 @@ dependencies = [ "native-tls", "nix", "num_cpus", + "once_cell", "openssl", "openssl-sys", "pin-project", "pin-utils", "portpicker", "rustls-pemfile", + "serde", + "serde_json", "socket2 0.5.3", "thiserror", "tokio", @@ -438,6 +453,7 @@ dependencies = [ "tracing-wasm", "wasm-bindgen-futures", "wasm-bindgen-test", + "webpki-roots", "ws_stream_wasm", ] @@ -1324,6 +1340,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "ryu" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" + [[package]] name = "schannel" version = "0.1.22" @@ -1390,6 +1412,37 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" +[[package]] +name = "serde" +version = "1.0.193" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.193" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "serde_json" +version = "1.0.108" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -1840,6 +1893,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index cb7518b..7c67bd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,8 +55,16 @@ http-client = [ "dep:tokio", "dep:hyper", "dep:anyhow", + "dep:webpki-roots", + "dep:http", + "dep:serde", + "dep:serde_json", + "dep:once_cell", ] tokio1 = ["async-std/tokio1"] +http-client-json = ["http-client"] +# useful for tests +__skip-http-client-cert-verification = [] [dependencies] log = "0.4.0" @@ -73,6 +81,11 @@ cfg-if = { version = "1.0.0", optional = true } tokio = { version = "1.33.0", default-features = false, optional = true } hyper = { version = "0.14.27", default-features = false, features = ["client", "http1", "http2"], optional = true } anyhow = { version = "1.0.75", optional = true } +http = { version = "0.2.9", optional = true } +webpki-roots = { version = "0.25.2", optional = true } +serde = { version = "1.0.189", optional = true } +serde_json = { version = "1.0.107", optional = true } +once_cell = { version = "1.18.0", optional = true } thiserror = "1.0.20" fluvio-test-derive = { path = "async-test-derive", version = "0.1.1", optional = true } @@ -98,6 +111,7 @@ async-std = { version = "1.12.0", default-features = false, features = ["unstabl ws_stream_wasm = "0.7.3" [dev-dependencies] +async-std = { version = "1.12.0", features = ["attributes"] } bytes = "1.0.0" lazy_static = "1.2.0" num_cpus = "1.10.1" @@ -107,7 +121,8 @@ tokio-util = { version = "0.7.0", features = ["codec", "compat"] } tokio = { version = "1.17.0", features = ["macros"] } flv-util = { version = "0.5.0", features = ["fixture"] } fluvio-test-derive = { path = "async-test-derive", version = "0.1.0" } -fluvio-future = { path = ".", features = ["net", "fixture", "timer", "fs", "retry"] } +fluvio-future = { path = ".", features = ["net", "fixture", "timer", "fs", "retry", "http-client-json", "__skip-http-client-cert-verification"] } +serde = { version = "1.0.189", features = ["derive"] } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] portpicker = "0.1.1" diff --git a/Makefile b/Makefile index 3ba3143..4e6ad94 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ RUST_DOCKER_IMAGE=rust:latest -PFX_OPTS ?= "-legacy" +CERT_OPTS ?= +PFX_OPTS ?= "" build-all: cargo build --all-features @@ -8,8 +9,20 @@ build-all: certs: make -C certs generate-certs PFX_OPTS=${PFX_OPTS} -test-all: certs test-derive - cargo test --all-features +cert-patch-macos: + sed -i '' 's/RSA PRIVATE KEY/PRIVATE KEY/' certs/test-certs/server-hs.key + +.PHONY: test-all run-test-all +test-all: certs test-derive setup-http-server run-test-all +run-test-all: + TEST_PORT=$$(cat tmp-PORT) cargo test --all-features + $(MAKE) teardown-http-server + +.PHONY: test-http run-test-http +test-http: certs setup-http-server run-test-http +run-test-http: + TEST_PORT=$$(cat tmp-PORT) cargo test --all-features test_http_client + $(MAKE) teardown-http-server install-wasm-pack: which wasm-pack || curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh @@ -54,6 +67,20 @@ install-clippy: install-wasm32: rustup target add wasm32-unknown-unknown +setup-http-server: certs $(CERT_OPTS) + cargo binstall http-server + cargo binstall -y portpicker-cli + portpicker > tmp-PORT + echo Picked port $$(cat tmp-PORT) + http-server --tls \ + --tls-key certs/test-certs/server-hs.key \ + --tls-cert certs/test-certs/server.crt \ + --tls-key-algorithm pkcs8 -p $$(cat tmp-PORT) & + +teardown-http-server: + killall http-server + rm -f tmp-PORT + check-clippy: install-clippy install-wasm32 cargo clippy --all-features cargo check --target wasm32-unknown-unknown --all-features diff --git a/certs/Makefile b/certs/Makefile index 84429a1..b425352 100644 --- a/certs/Makefile +++ b/certs/Makefile @@ -50,7 +50,7 @@ generate-intermediate-ca-crt: generate-intermediate-ca-csr generate-server-key: openssl genrsa -out test-certs/server.key 4096 - + cp test-certs/server.key test-certs/server-hs.key generate-server-csr: generate-server-key openssl req -new -key test-certs/server.key \ diff --git a/src/fs/mmap.rs b/src/fs/mmap.rs index f9f7239..413c441 100644 --- a/src/fs/mmap.rs +++ b/src/fs/mmap.rs @@ -149,7 +149,7 @@ mod tests { #[test_async] async fn test_mmap_write_slice() -> Result<(), IoError> { let index_path = temp_dir().join("test.index"); - ensure_clean_file(&index_path.clone()); + ensure_clean_file(index_path.clone()); let result = MemoryMappedMutFile::create(&index_path, 3).await; assert!(result.is_ok()); @@ -176,7 +176,7 @@ mod tests { #[test_async] async fn test_mmap_write_pair_slice() -> Result<(), IoError> { let index_path = temp_dir().join("pairslice.index"); - ensure_clean_file(&index_path.clone()); + ensure_clean_file(index_path.clone()); let result = MemoryMappedMutFile::create(&index_path, 24).await; assert!(result.is_ok()); @@ -205,7 +205,7 @@ mod tests { #[test_async] async fn test_mmap_write_with_pos() -> Result<(), IoError> { let index_path = temp_dir().join("testpos.index"); - ensure_clean_file(&index_path.clone()); + ensure_clean_file(index_path.clone()); let (mut mm_file, _) = MemoryMappedMutFile::create(&index_path, 10).await?; @@ -227,7 +227,7 @@ mod tests { #[test_async] async fn test_empty_index_read_only() -> Result<(), IoError> { let index_path = temp_dir().join("zerosized.index"); - ensure_clean_file(&index_path.clone()); + ensure_clean_file(index_path.clone()); { let file = OpenOptions::new() .create(true) diff --git a/src/http_client/client.rs b/src/http_client/client.rs new file mode 100644 index 0000000..75976d7 --- /dev/null +++ b/src/http_client/client.rs @@ -0,0 +1,94 @@ +use std::{str::FromStr, sync::Arc}; + +use anyhow::Result; +use async_rustls::rustls::{OwnedTrustAnchor, RootCertStore}; +use hyper::Uri; +use once_cell::sync::Lazy; + +use super::{ + async_std_compat::{self, CompatConnector}, + request::RequestBuilder, +}; + +type HyperClient = Arc>; + +#[derive(Clone)] +pub struct Client { + pub(crate) hyper: HyperClient, +} + +#[cfg_attr(feature = "__skip-http-client-cert-verification", allow(dead_code))] +static ROOT_CERT_STORE: Lazy = Lazy::new(|| { + let mut store = RootCertStore::empty(); + store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| { + OwnedTrustAnchor::from_subject_spki_name_constraints( + ta.subject, + ta.spki, + ta.name_constraints, + ) + })); + store +}); + +impl Default for Client { + fn default() -> Self { + let tls = async_rustls::rustls::ClientConfig::builder().with_safe_defaults(); + + #[cfg(not(feature = "__skip-http-client-cert-verification"))] + let tls = tls.with_root_certificates(ROOT_CERT_STORE.to_owned()); + + #[cfg(feature = "__skip-http-client-cert-verification")] + let tls = + tls.with_custom_certificate_verifier(Arc::new(no_verifier::NoCertificateVerification)); + + let tls = tls.with_no_client_auth(); + + let https = async_std_compat::CompatConnector::new(tls); + + let client: hyper::Client<_, hyper::Body> = hyper::Client::builder() + .executor(async_std_compat::CompatExecutor) + .build(https); + + Client { + hyper: Arc::new(client), + } + } +} + +impl Client { + pub fn new() -> Self { + Self::default() + } + + pub fn get(&self, uri: impl AsRef) -> Result { + let uri = Uri::from_str(uri.as_ref())?; + let req = http::request::Builder::new().uri(uri); + Ok(RequestBuilder::new(self.clone(), req)) + } +} + +#[cfg(feature = "__skip-http-client-cert-verification")] +mod no_verifier { + use std::time::SystemTime; + + use async_rustls::rustls::{ + client::{ServerCertVerified, ServerCertVerifier}, + Certificate, Error, ServerName, + }; + + pub struct NoCertificateVerification; + + impl ServerCertVerifier for NoCertificateVerification { + fn verify_server_cert( + &self, + _end_entity: &Certificate, + _intermediates: &[Certificate], + _server_name: &ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: SystemTime, + ) -> Result { + Ok(ServerCertVerified::assertion()) + } + } +} diff --git a/src/http_client/mod.rs b/src/http_client/mod.rs index 68bafef..9562a53 100644 --- a/src/http_client/mod.rs +++ b/src/http_client/mod.rs @@ -1 +1,13 @@ mod async_std_compat; +pub mod client; +mod request; + +pub use client::Client; +pub use hyper::StatusCode; +pub use request::ResponseExt; + +use hyper::{Body, Response}; + +pub async fn get(uri: impl AsRef) -> Result, anyhow::Error> { + Client::new().get(uri)?.send().await +} diff --git a/src/http_client/request.rs b/src/http_client/request.rs new file mode 100644 index 0000000..6a40c89 --- /dev/null +++ b/src/http_client/request.rs @@ -0,0 +1,83 @@ +use std::{fmt, future::Future, pin::Pin}; + +use anyhow::{anyhow, Error, Result}; +use futures_util::TryFutureExt; +use http::{request::Builder, HeaderName, HeaderValue}; +use hyper::{body::Bytes, Body, Response}; + +use super::client::Client; + +pub struct RequestBuilder { + client: Client, + req_builder: Builder, +} + +impl RequestBuilder { + pub fn new(client: Client, req_builder: Builder) -> Self { + Self { + client, + req_builder, + } + } + + pub fn header(mut self, key: K, value: V) -> RequestBuilder + where + HeaderName: TryFrom, + >::Error: Into, + HeaderValue: TryFrom, + >::Error: Into, + { + self.req_builder = self.req_builder.header(key, value); + self + } + + pub async fn send(self) -> Result> { + let req = self + .req_builder + .header(http::header::USER_AGENT, "fluvio-mini-http/0.1") + .body(hyper::Body::empty()) + .map_err(|err| anyhow!("hyper error: {err:?}"))?; + self.client + .hyper + .request(req) + .map_err(|err| anyhow!("request error: {err:?}")) + .await + } +} + +// TODO: prefer static-dispatch once AFIT got stabilized in Rust v1.75 +type ResponseExtFuture = Pin + Send + 'static>>; + +pub trait ResponseExt { + fn bytes(self) -> ResponseExtFuture>; + + #[cfg(feature = "http-client-json")] + fn json(self) -> ResponseExtFuture> + where + Self: Sized + Send + 'static, + { + let fut = async move { + let bytes = self.bytes().await?; + serde_json::from_slice(&bytes).map_err(|err| anyhow!("serialization error: {err:?}")) + }; + + Box::pin(fut) + } +} + +impl ResponseExt for Response +where + T: hyper::body::HttpBody + Send + 'static, + T::Data: Send, + T::Error: fmt::Debug, +{ + fn bytes(self) -> ResponseExtFuture> { + let fut = async move { + hyper::body::to_bytes(self.into_body()) + .map_err(|err| anyhow!("{err:?}")) + .await + }; + + Box::pin(fut) + } +} diff --git a/src/native_tls.rs b/src/native_tls.rs index 112dc4d..f9933e8 100644 --- a/src/native_tls.rs +++ b/src/native_tls.rs @@ -138,7 +138,6 @@ mod connector { let socket_opts = SocketOpts { keepalive: Some(Default::default()), nodelay: Some(true), - ..Default::default() }; let tcp_stream = stream_with_opts(addr, Some(socket_opts)).await?; let fd = tcp_stream.as_connection_fd(); diff --git a/src/openssl/connector.rs b/src/openssl/connector.rs index 298ab83..3402a31 100644 --- a/src/openssl/connector.rs +++ b/src/openssl/connector.rs @@ -41,7 +41,7 @@ pub mod certs { // copied from https://github.com/sfackler/rust-native-tls/blob/master/src/imp/openssl.rs mod identity_impl { - use openssl::error::ErrorStack; + use crate::openssl::TlsError::CertReadError; use openssl::pkcs12::Pkcs12; use openssl::pkey::{PKey, Private}; use openssl::x509::X509; @@ -54,13 +54,21 @@ pub mod certs { } impl Identity { - pub fn from_pkcs12(buf: &[u8], pass: &str) -> Result { + pub fn from_pkcs12(buf: &[u8], pass: &str) -> anyhow::Result { let pkcs12 = Pkcs12::from_der(buf)?; - let parsed = pkcs12.parse(pass)?; + let parsed = pkcs12 + .parse2(pass) + .map_err(|_| CertReadError(String::from("Couldn't read pkcs12")))?; + let pkey = parsed + .pkey + .ok_or(CertReadError(String::from("Missing private key")))?; + let cert = parsed + .cert + .ok_or(CertReadError(String::from("Missing cert")))?; Ok(Identity { - pkey: parsed.pkey, - cert: parsed.cert, - chain: parsed.chain.into_iter().flatten().collect(), + pkey, + cert, + chain: parsed.ca.into_iter().flatten().collect(), }) } @@ -273,7 +281,6 @@ impl TcpDomainConnector for TlsAnonymousConnector { let socket_opts = SocketOpts { keepalive: Some(Default::default()), nodelay: Some(true), - ..Default::default() }; let tcp_stream = stream_with_opts(domain, Some(socket_opts)).await?; let fd = tcp_stream.as_connection_fd(); diff --git a/src/openssl/error.rs b/src/openssl/error.rs index c782807..9bc0910 100644 --- a/src/openssl/error.rs +++ b/src/openssl/error.rs @@ -6,6 +6,9 @@ use super::async_to_sync_wrapper::AsyncToSyncWrapper; #[derive(Error, Debug)] pub enum Error { + #[error("CertReadError: {0}")] + CertReadError(String), + #[error("OpenSslError: {0}")] OpenSslError(#[from] openssl::error::Error), diff --git a/src/retry.rs b/src/retry.rs index 25391f9..f486015 100644 --- a/src/retry.rs +++ b/src/retry.rs @@ -408,7 +408,7 @@ mod test { .timeout(Duration::from_millis(300)) .await; - assert!(matches!(retry_result, Err(_))); + assert!(retry_result.is_err()); assert!(executed_retries < 10); } diff --git a/test-data/http-client/ip.json b/test-data/http-client/ip.json new file mode 100644 index 0000000..11301d8 --- /dev/null +++ b/test-data/http-client/ip.json @@ -0,0 +1 @@ +{"origin": "192.0.0.1"} diff --git a/tests/test_http_client.rs b/tests/test_http_client.rs new file mode 100644 index 0000000..44085ff --- /dev/null +++ b/tests/test_http_client.rs @@ -0,0 +1,81 @@ +#[cfg(all(any(unix, windows), feature = "http-client"))] +#[cfg(test)] +mod test_http_client { + use anyhow::{Error, Result}; + + use fluvio_future::http_client::{self, ResponseExt, StatusCode}; + use fluvio_future::test_async; + + static DEF_PORT: &str = "7878"; + static SERVER: &str = "https://127.0.0.1"; + static ENV_TEST_PORT: &str = "TEST_PORT"; + + fn https_server_url() -> Result { + let port = std::env::var(ENV_TEST_PORT).unwrap_or(DEF_PORT.to_string()); + let port: u16 = port.parse()?; + let port = port + 1; // http -> https + Ok(format!("{SERVER}:{port}")) + } + + #[test_async] + async fn simple_test() -> Result<(), Error> { + let server_url = https_server_url()?; + let res = http_client::get(&server_url).await; + + let failmsg = + format!("failed to get http-server, did you install and run it? {server_url}"); + let status = res.expect(&failmsg).status(); + assert_eq!(status, StatusCode::OK); + Ok(()) + } + + #[test_async] + async fn get_and_deserialize_to_struct() -> Result<(), Error> { + use std::net::{IpAddr, Ipv4Addr}; + + use serde::Deserialize; + + let server_url = https_server_url()?; + + #[allow(dead_code)] + #[derive(Deserialize, Debug, PartialEq)] + struct Ip { + origin: IpAddr, + } + + let failmsg = + format!("failed to get http-server, did you install and run it? {server_url}"); + let json = http_client::get(format!("{server_url}/test-data/http-client/ip.json")) + .await + .expect(&failmsg) + .json::() + .await + .expect("failed to parse IP address"); + + assert_eq!( + json, + Ip { + origin: IpAddr::V4(Ipv4Addr::new(192, 0, 0, 1)) + } + ); + Ok(()) + } + + // ignored tests used for live local dev sanity check + // cargo test live -- --ignored + #[test_async(ignore)] + async fn live_https() -> Result<(), Error> { + let res = http_client::get("https://hub.infinyon.cloud").await; + + assert!(res.is_ok()); + Ok(()) + } + + #[test_async(ignore)] + async fn live_http_not_supported() -> Result<(), Error> { + let res = http_client::get("http://hub.infinyon.cloud").await; + + assert!(res.is_err()); + Ok(()) + } +}