Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http_server: Set error header, persist OCSP. #344

Merged
merged 2 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions http_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ edition = "2018"
anyhow = "1.0.62"
async-trait = "0.1.57"
clap = { version = "3.2.17", features = ["derive"] }
fs2 = "0.4.3"
http = "0.2.8"
hyper-reverse-proxy = { git = "https://github.com/felipenoris/hyper-reverse-proxy", rev = "96a398de8522fac07a5e15bd0699f6cd7fa84bce" }
hyper-rustls = "0.23.0"
Expand Down
115 changes: 88 additions & 27 deletions http_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use clap::Parser;
use fs2::FileExt;
use hyper::{
server::{conn::AddrStream, Server},
service::{make_service_fn, service_fn},
Expand All @@ -24,14 +25,18 @@ use hyper_reverse_proxy::ReverseProxy;
use hyper_trust_dns::{RustlsHttpsConnector, TrustDnsResolver};
use std::boxed::Box;
use std::convert::TryInto;
use std::fs::File;
use std::io::{Read, Write};
use std::net::IpAddr;
use std::net::SocketAddr;
use std::path::PathBuf;
use sxg_rs::{
crypto::CertificateChain,
fetcher::Fetcher,
headers::AcceptFilter,
http::{HttpRequest, HttpResponse, Method},
process_html::ProcessHtmlOption,
storage::Storage,
PresetContent,
};
use url::Url;
Expand All @@ -51,6 +56,11 @@ struct Args {
/// The bind address (ip:port), such as 0.0.0.0:8080.
#[clap(short = 'a', long, default_value = "127.0.0.1:8080")]
bind_addr: String,

/// Path to the directory (must exist) where ACME and OCSP files will be
/// created to manage state.
#[clap(short, long)]
directory: String,
}

type HttpsClient = hyper::Client<
Expand Down Expand Up @@ -112,7 +122,10 @@ struct SelfFetcher {
#[async_trait]
impl Fetcher for SelfFetcher {
async fn fetch(&self, request: HttpRequest) -> Result<HttpResponse> {
let response: Response<Body> = handle(self.client_ip, request, &self.backend).await?;
// TODO: Don't compute header-integrity for resources that are too
// large (see https://twifkak.com/link_tag.large.html).
// Passing "" as directory because links should not refer to preset content.
let response: Response<Body> = handle(self.client_ip, request, &self.backend, "").await?;
// TODO: Do something streaming.
resp_to_vec_body(response).await?.try_into()
}
Expand All @@ -124,21 +137,17 @@ async fn generate_sxg_response(
fallback_url: &str,
payload: HttpResponse,
) -> Result<Response<Body>> {
// TODO: Also transform with is_sxg=false on fallback.
let payload = WORKER.process_html(payload, ProcessHtmlOption { is_sxg: true });

let cert_origin = Url::parse(fallback_url)?.origin().ascii_serialization();
// TODO: Instead of SelfFetcher, make the HeaderIntegrityFetcher a param of
// create_signed_exchange, then make an impl that fetches SXGs (from any
// domain) and computes their header-integrity.
let subresource_fetcher = SelfFetcher {
client_ip,
backend: backend.into(),
};
let runtime = sxg_rs::runtime::Runtime {
now: std::time::SystemTime::now(),
sxg_signer: Box::new(WORKER.create_rust_signer()?),
fetcher: Box::new(subresource_fetcher),
sxg_signer: Box::new(WORKER.create_rust_signer()?),
..Default::default()
};
let sxg = WORKER
Expand All @@ -160,16 +169,56 @@ async fn generate_sxg_response(
Ok(sxg.map(Body::from))
}

async fn serve_preset_content(url: &str) -> Option<PresetContent> {
/// Persistent storage mechanism for OCSP responses & ACME certs. Takes a path
/// to a directory where it will create files for them.
pub struct FileStorage(PathBuf);

#[async_trait]
impl Storage for FileStorage {
async fn read(&self, k: &str) -> Result<Option<String>> {
let path = self.0.join(k);
// This is vulnerable to a TOCTOU bug, where the file is created by
// some current process in between this check and the establishment of
// the lock below. We can't do better, because lock_shared requires the
// file be open in the first place. However, this seems OK. OCSP/ACME
// storage don't require perfect synchronization.
if path.exists() {
let mut f = File::open(path)?;
// Don't do any early returns (e.g. `?`) between lock and unlock.
f.lock_shared()?;
let mut v = String::new();
let ok = f.read_to_string(&mut v);
let _ = f.unlock();
match ok {
Ok(_) => Ok(Some(v)),
Err(e) => Err(anyhow!("error reading file {k}: {e}")),
}
} else {
Ok(None)
}
}
async fn write(&self, k: &str, v: &str) -> Result<()> {
let path = self.0.join(k);
let mut f = File::create(path)?;
// Don't do any early returns (e.g. `?`) between lock and unlock.
f.lock_exclusive()?;
let ret = write!(f, "{}", v);
let _ = f.unlock();
ret.map_err(|e| anyhow!("error writing file {k}: {e}"))
}
}

async fn serve_preset_content(url: &str, directory: &str) -> Option<PresetContent> {
let ocsp_fetcher = HttpsFetcher(&HTTPS_CLIENT);
// TODO: Create a Storage impl that persists across restarts (and maybe
// also between replicas), per
// https://gist.github.com/sleevi/5efe9ef98961ecfb4da8 rule #1. Filesystem
// support should be sufficient.
// Using a Storage impl that persists across restarts (and between
// replicas, if using a networked filesystem), per
// https://gist.github.com/sleevi/5efe9ef98961ecfb4da8 rule #1.
let runtime = sxg_rs::runtime::Runtime {
now: std::time::SystemTime::now(),
sxg_signer: Box::new(WORKER.create_rust_signer().ok()?),
fetcher: Box::new(ocsp_fetcher),
// TODO: Parameterize path.
storage: Box::new(FileStorage(directory.into())),
sxg_signer: Box::new(WORKER.create_rust_signer().ok()?),
..Default::default()
};
WORKER.serve_preset_content(&runtime, url).await
Expand All @@ -189,16 +238,19 @@ enum HandleAction {
// I guess hyper::Client doesn't synthesize :authority from the Host header.
// We can't work around this because http::header::HeaderMap panics with
// InvalidHeaderName when given ":authority" as a key.
async fn handle_impl(client_ip: IpAddr, req: HttpRequest, backend: &str) -> Result<HandleAction> {
// TODO: Proxy unsigned if SXG fails.
async fn handle_impl(
client_ip: IpAddr,
req: HttpRequest,
backend: &str,
directory: &str,
) -> Result<HandleAction> {
// TODO: If over 8MB or MICE fails midstream, send the consumed portion and stream the rest.
// TODO: Wrap errors with additional context before returning.
// TODO: Additional work necessary for ACME support?
let fallback_url: String;
let sxg_payload;
let req_url =
url::Url::parse(&format!("https://{}/", WORKER.config().html_host))?.join(&req.url)?;
match serve_preset_content(&format!("{}", req_url)).await {
match serve_preset_content(&format!("{}", req_url), directory).await {
Some(PresetContent::Direct(response)) => {
let response: Response<Vec<u8>> = response.try_into()?;
return Ok(HandleAction::Respond(response.map(Body::from)));
Expand Down Expand Up @@ -256,42 +308,50 @@ async fn proxy_unsigned(
Ok(payload.map(Body::from))
}

fn set_error_header(err: impl core::fmt::Display, mut resp: Response<Body>) -> Response<Body> {
if let Ok(val) = format!("{err}").try_into() {
resp.headers_mut().insert("sxg-rs-error", val);
}
resp
}

async fn handle(
client_ip: IpAddr,
req: HttpRequest,
backend: &str,
directory: &str,
) -> Result<Response<Body>, http::Error> {
match handle_impl(client_ip, req.clone(), backend).await {
match handle_impl(client_ip, req.clone(), backend, directory).await {
Ok(HandleAction::Respond(resp)) => Ok(resp),
Ok(HandleAction::Sign { url, payload }) => {
generate_sxg_response(client_ip, backend, &url, payload.clone())
.await
.or_else(|_| {
// TODO: Annotate response with error as header.
.or_else(|e| {
let sxg: Result<Response<Vec<u8>>> = payload.try_into();
match sxg {
Ok(sxg) => Ok(sxg.map(Body::from)),
Ok(sxg) => Ok(set_error_header(e, sxg.map(Body::from))),
Err(e) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e))),
}
})
}
Err(_) => {
// TODO: Annotate response with error as header.
proxy_unsigned(client_ip, req, backend).await.or_else(|e| {
Err(e) => proxy_unsigned(client_ip, req, backend)
.await
.map(|r| set_error_header(e, r))
.or_else(|e| {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(format!("{:?}", e)))
})
}
}),
}
}

async fn handle_or_error(
client_ip: IpAddr,
req: Request<Body>,
backend: String,
directory: String,
) -> Result<Response<Body>, http::Error> {
let req: Result<Request<Vec<u8>>> = req_to_vec_body(req).await;
let req: Result<HttpRequest> = req.and_then(|r| r.try_into());
Expand All @@ -303,7 +363,7 @@ async fn handle_or_error(
.body(Body::from(format!("{:?}", e)));
}
};
handle(client_ip, req.clone(), &backend).await
handle(client_ip, req.clone(), &backend, &directory).await
}

#[tokio::main]
Expand All @@ -314,9 +374,10 @@ async fn main() {
let make_svc = make_service_fn(|conn: &AddrStream| {
let remote_addr = conn.remote_addr().ip();
let backend = args.backend.clone();
let directory = args.directory.clone();
async move {
Ok::<_, http::Error>(service_fn(move |req| {
handle_or_error(remote_addr, req, backend.to_owned())
handle_or_error(remote_addr, req, backend.to_owned(), directory.to_owned())
}))
}
});
Expand Down