Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change async fns to return Future + Send. #314

Merged
merged 4 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions fastly_compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ serde = { version = "1.0.140", features = ["derive"] }
serde_yaml = "0.8.26"
sxg_rs = { path = "../sxg_rs", features = ["rust_signer"] }
tokio = { version = "1.20.1", features = ["rt"] }

[features]
# Unsupported, but necessary to make `cargo some-cmd --all-features` happy.
wasm = ["sxg_rs/wasm"]
3 changes: 2 additions & 1 deletion fastly_compute/src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ impl FastlyFetcher {
}
}

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl Fetcher for FastlyFetcher {
async fn fetch(&self, request: HttpRequest) -> Result<HttpResponse> {
let request: ::http::request::Request<Vec<u8>> = request.try_into()?;
Expand Down
3 changes: 2 additions & 1 deletion sxg_rs/src/fetcher/mock_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ pub fn create() -> (MockFetcher, MockServer) {
(mock_fetcher, mock_server)
}

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl Fetcher for MockFetcher {
async fn fetch(&self, request: HttpRequest) -> Result<HttpResponse> {
let request_url = request.url.clone();
Expand Down
17 changes: 13 additions & 4 deletions sxg_rs/src/fetcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,23 @@ pub mod js_fetcher;
pub mod mock_fetcher;

use crate::http::{HttpRequest, HttpResponse};
use crate::utils::{MaybeSend, MaybeSync};
use anyhow::{anyhow, Result};
use async_trait::async_trait;

/// An interface for fetching resources from network.
#[async_trait(?Send)]
pub trait Fetcher {
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait Fetcher: MaybeSend + MaybeSync {
async fn fetch(&self, request: HttpRequest) -> Result<HttpResponse>;
/// Uses `Get` method and returns response body.
async fn get(&self, url: &str) -> Result<Vec<u8>> {
// `where Self: Sized` is because of
// https://github.com/rust-lang/rust/issues/51443 and in particular
// https://docs.rs/async-trait/0.1.36/async_trait/#dyn-traits.
async fn get(&self, url: &str) -> Result<Vec<u8>>
where
Self: Sized,
{
let request = HttpRequest {
body: vec![],
headers: vec![],
Expand All @@ -41,7 +49,8 @@ pub const NULL_FETCHER: NullFetcher = NullFetcher {};

pub struct NullFetcher;

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl Fetcher for NullFetcher {
async fn fetch(&self, _request: HttpRequest) -> Result<HttpResponse> {
Err(anyhow!("Not found"))
Expand Down
43 changes: 25 additions & 18 deletions sxg_rs/src/header_integrity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ use crate::fetcher::{Fetcher, NULL_FETCHER};
use crate::headers::Headers;
use crate::http::{HttpRequest, HttpResponse, Method};
use crate::http_cache::{HttpCache, NullCache};
use crate::utils::signed_headers_and_payload;
use crate::utils::{signed_headers_and_payload, MaybeSend, MaybeSync};
use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use once_cell::sync::Lazy;
use std::collections::BTreeSet;
use url::Url;

#[async_trait(?Send)]
pub trait HeaderIntegrityFetcher {
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait HeaderIntegrityFetcher: MaybeSend + MaybeSync {
async fn fetch(&self, url: &str) -> Result<String>;
}

Expand Down Expand Up @@ -55,7 +56,8 @@ static ERROR_RESPONSE: Lazy<HttpResponse> = Lazy::new(|| HttpResponse {
status: 406,
});

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl<'a, C: HttpCache> HeaderIntegrityFetcher for HeaderIntegrityFetcherImpl<'a, C> {
async fn fetch(&self, url: &str) -> Result<String> {
let integrity_response = match self.cache_get(url).await {
Expand Down Expand Up @@ -174,8 +176,8 @@ pub mod tests {
use crate::fetcher::NULL_FETCHER;
use crate::http_cache::NullCache;
use anyhow::{anyhow, Result};
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::Mutex;

static EMPTY_SET: Lazy<BTreeSet<String>> = Lazy::new(BTreeSet::new);

Expand All @@ -197,7 +199,8 @@ pub mod tests {

struct FakeFetcher<'a>(&'a HttpResponse);

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl<'a> Fetcher for FakeFetcher<'a> {
async fn fetch(&self, _request: HttpRequest) -> Result<HttpResponse> {
Ok(self.0.clone())
Expand All @@ -223,30 +226,31 @@ pub mod tests {
);
}

// RefCell is good enough for our single-threaded, single-task unit tests, but tokio::Mutex
// would be necessary for more complex usage.
struct InMemoryCache<'a>(&'a RefCell<HashMap<String, HttpResponse>>);
struct InMemoryCache<'a>(&'a Mutex<HashMap<String, HttpResponse>>);

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HttpCache for InMemoryCache<'_> {
async fn get(&self, url: &str) -> Result<HttpResponse> {
self.0
.try_borrow()?
.try_lock()
.map_err(|_| anyhow!("locking mutex"))?
.get(url)
.cloned()
.ok_or_else(|| anyhow!("not found"))
}
async fn put(&self, url: &str, response: &HttpResponse) -> Result<()> {
self.0
.try_borrow_mut()?
.try_lock()
.map_err(|_| anyhow!("locking mutex"))?
.insert(url.into(), response.clone());
Ok(())
}
}

#[tokio::test]
async fn gets_header_integrity_from_cache() {
let store = RefCell::new(HashMap::new());
let store = Mutex::new(HashMap::new());
let cache = InMemoryCache(&store);
let response = HttpResponse {
body: b"sha256-blah".to_vec(),
Expand All @@ -266,7 +270,7 @@ pub mod tests {
}
#[tokio::test]
async fn gets_error_from_cache() {
let store = RefCell::new(HashMap::new());
let store = Mutex::new(HashMap::new());
let cache = InMemoryCache(&store);
let response = HttpResponse {
body: b"something went wrong".to_vec(),
Expand All @@ -289,7 +293,7 @@ pub mod tests {
}
#[tokio::test]
async fn puts_into_cache() {
let store = RefCell::new(HashMap::new());
let store = Mutex::new(HashMap::new());

let strip_response_headers = BTreeSet::new();
let fetcher = new_fetcher(
Expand All @@ -300,7 +304,7 @@ pub mod tests {

let _ = fetcher.fetch(TEST_URL).await;
assert_eq!(
store.borrow().get(TEST_URL).unwrap().body,
store.lock().unwrap().get(TEST_URL).unwrap().body,
EXPECTED_HEADER_INTEGRITY.as_bytes(),
);
}
Expand All @@ -312,8 +316,11 @@ pub mod tests {
stream::{self, StreamExt},
};
struct OutOfOrderCache<F: Fn() -> BoxFuture<'static, Result<HttpResponse>>>(F);
#[async_trait(?Send)]
impl<F: Fn() -> BoxFuture<'static, Result<HttpResponse>>> HttpCache for OutOfOrderCache<F> {
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl<F: Fn() -> BoxFuture<'static, Result<HttpResponse>> + Send + Sync> HttpCache
for OutOfOrderCache<F>
{
async fn get(&self, url: &str) -> Result<HttpResponse> {
println!("get: url = {}", url);
self.0().await
Expand Down
9 changes: 6 additions & 3 deletions sxg_rs/src/http_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
pub mod js_http_cache;

use crate::http::HttpResponse;
use crate::utils::{MaybeSend, MaybeSync};
use anyhow::{anyhow, Result};
use async_trait::async_trait;

/// An interface for storing HTTP responses in a cache.
#[async_trait(?Send)]
pub trait HttpCache {
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait HttpCache: MaybeSend + MaybeSync {
async fn get(&self, url: &str) -> Result<HttpResponse>;
async fn put(&self, url: &str, response: &HttpResponse) -> Result<()>;
}

pub struct NullCache;

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HttpCache for NullCache {
async fn get(&self, _url: &str) -> Result<HttpResponse> {
Err(anyhow!("No cache entry found in NullCache"))
Expand Down
28 changes: 28 additions & 0 deletions sxg_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,4 +501,32 @@ validity_url_dirname: "//.well-known/sxg-validity"
Some(PresetContent::Direct(HttpResponse { status: 404, .. }))
));
}
#[cfg(not(feature = "wasm"))]
#[test]
fn require_send() {
use std::collections::BTreeSet;
// Require async fns to implement Send, so they can be shared across
// threads. This is required by hyper, as used in http_server. See
// https://blog.rust-lang.org/inside-rust/2019/10/11/AsyncAwait-Not-Send-Error-Improvements.html.
// Adding the requirement directly in this test makes it easier to
// diagnose; compiler errors are more specific than when the
// requirement is indirect via hyper. Values don't matter in this test;
// we're only verifying types.
let worker = new_worker();
let runtime = Runtime::default();
fn is_send<T: Send>(_: T) {}
is_send(worker.serve_preset_content(&runtime, "https://my_domain.com/unknown"));
is_send(worker.create_signed_exchange(
&runtime,
CreateSignedExchangeParams {
fallback_url: "",
cert_origin: "",
payload_body: b"",
payload_headers: Headers::new(vec![], &BTreeSet::new()),
skip_process_link: false,
status_code: 200,
header_integrity_cache: http_cache::NullCache {},
},
));
}
}
19 changes: 11 additions & 8 deletions sxg_rs/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use crate::http_parser::{link::Link, parse_link_header, srcset};
use futures::{stream, stream::StreamExt};
use once_cell::sync::Lazy;
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::iter::once;
use std::sync::Mutex;
use url::{Origin, Url};

// Filters the link header to comply with
Expand All @@ -26,7 +26,7 @@ pub(crate) async fn process_link_header(
let (preloads, allowed_alt_sxgs) = preloads_and_allowed_alt_sxgs(links, fallback_url);

let fallback_origin = fallback_url.origin();
let directives = RefCell::new(vec![]);
let directives = Mutex::new(vec![]);
stream::iter(preloads)
.for_each_concurrent(None, |link| async {
let link = link;
Expand Down Expand Up @@ -82,15 +82,16 @@ pub(crate) async fn process_link_header(
// If all allowed-alt-sxg directives were found, output the preload and allow
// directives.
if all_allow_sxg {
if let Ok(mut directives) = directives.try_borrow_mut() {
if let Ok(mut directives) = directives.try_lock() {
directives.push(link.clone());
directives.extend_from_slice(&allow_directives);
}
}
})
.await;
directives
.take()
.into_inner()
.unwrap_or_default()
.iter()
.map(|link| link.serialize())
.collect::<Vec<String>>()
Expand Down Expand Up @@ -184,7 +185,8 @@ mod tests {

struct FakeIntegrityFetcher(std::result::Result<String, String>);

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HeaderIntegrityFetcher for FakeIntegrityFetcher {
async fn fetch(&self, _url: &str) -> Result<String> {
self.0.clone().map_err(|e| anyhow!(e))
Expand Down Expand Up @@ -354,9 +356,10 @@ mod tests {
async fn fetch_header_integrity_out_of_order() {
use crate::utils::tests::{out_of_order, OutOfOrderState};
use futures::future::BoxFuture;
struct OutOfOrderFetcher<F: Fn() -> BoxFuture<'static, Result<String>>>(F);
#[async_trait(?Send)]
impl<F: Fn() -> BoxFuture<'static, Result<String>>> HeaderIntegrityFetcher
struct OutOfOrderFetcher<F: Fn() -> BoxFuture<'static, Result<String>> + Send + Sync>(F);
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl<F: Fn() -> BoxFuture<'static, Result<String>> + Send + Sync> HeaderIntegrityFetcher
for OutOfOrderFetcher<F>
{
async fn fetch(&self, url: &str) -> Result<String> {
Expand Down
3 changes: 2 additions & 1 deletion sxg_rs/src/signature/mock_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use async_trait::async_trait;

pub struct MockSigner;

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl Signer for MockSigner {
async fn sign(&self, _message: &[u8], format: Format) -> Result<Vec<u8>> {
match format {
Expand Down
6 changes: 4 additions & 2 deletions sxg_rs/src/signature/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod mock_signer;
pub mod rust_signer;

use crate::structured_header::{ParamItem, ShItem, ShParamList};
use crate::utils::{MaybeSend, MaybeSync};
use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use der_parser::ber::{BerObject, BerObjectContent};
Expand All @@ -31,8 +32,9 @@ pub enum Format {
EccAsn1,
}

#[async_trait(?Send)]
pub trait Signer {
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait Signer: MaybeSend + MaybeSync {
/// Signs the message, and returns in the given format.
async fn sign(&self, message: &[u8], format: Format) -> Result<Vec<u8>>;
}
Expand Down
3 changes: 2 additions & 1 deletion sxg_rs/src/signature/rust_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ impl RustSigner {
}
}

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl Signer for RustSigner {
async fn sign(&self, message: &[u8], format: Format) -> Result<Vec<u8>> {
use p256::ecdsa::signature::Signer as _;
Expand Down
9 changes: 6 additions & 3 deletions sxg_rs/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
#[cfg(feature = "wasm")]
pub mod js_storage;

use crate::utils::{MaybeSend, MaybeSync};
use anyhow::Result;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

#[async_trait(?Send)]
pub trait Storage {
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait Storage: MaybeSend + MaybeSync {
async fn read(&self, k: &str) -> Result<Option<String>>;
async fn write(&self, k: &str, v: &str) -> Result<()>;
}
Expand All @@ -35,7 +37,8 @@ impl InMemoryStorage {
}
}

#[async_trait(?Send)]
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl Storage for InMemoryStorage {
async fn read(&self, k: &str) -> Result<Option<String>> {
let guard = self.0.read().await;
Expand Down
Loading