Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

All notable changes to this project will be documented in this file.

## Unreleased

**🚀 Features**

* Add `Session::h1_set_same_connection_followup` for following an HTTP/1.1 redirect on a single pooled upstream connection; disables cache for the skipped intermediate hop when caching is enabled.

**📚 Documentation**

* `ProxyHttp::response_filter`: point to the new follow-up API for same-connection redirect handling.

## [0.8.0](https://github.com/cloudflare/pingora/compare/0.7.0...0.8.0) - 2026-03-02


Expand Down
4 changes: 4 additions & 0 deletions pingora-core/src/upstreams/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ impl Peer for BasicPeer {
fn reuse_hash(&self) -> u64 {
let mut hasher = AHasher::default();
self._address.hash(&mut hasher);
// If TLS is used, the SNI must be part of the reuse key to avoid
// cross-host connection reuse (different SNI/hostname on same IP:port).
// When SNI is empty (non-TLS), this keeps the historical behavior.
self.sni.hash(&mut hasher);
hasher.finish()
}

Expand Down
70 changes: 63 additions & 7 deletions pingora-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ use pingora_core::server::ShutdownWatch;
use pingora_core::upstreams::peer::{HttpPeer, Peer};
use pingora_error::{Error, ErrorSource, ErrorType::*, OrErr, Result};

fn is_benign_upstream_retry_log(e: &Error) -> bool {
matches!(&e.etype, HTTPStatus(code) if (300..400).contains(code))
&& e.context
.as_ref()
.is_some_and(|c| c.as_str().starts_with("redirect_follow_hop:"))
}

const TASK_BUFFER_SIZE: usize = 4;

mod proxy_cache;
Expand Down Expand Up @@ -480,6 +487,8 @@ pub struct Session {
upstream_body_bytes_received: usize,
/// Upstream write pending time. Set by proxy layer (HTTP/1.x only).
upstream_write_pending_time: Duration,
h1_skip_downstream_response: bool,
h1_same_connection_followup: Option<RequestHeader>,
/// Flag that is set when the shutdown process has begun.
shutdown_flag: Arc<AtomicBool>,
}
Expand All @@ -502,6 +511,8 @@ impl Session {
downstream_modules_ctx: downstream_modules.build_ctx(),
upstream_body_bytes_received: 0,
upstream_write_pending_time: Duration::ZERO,
h1_skip_downstream_response: false,
h1_same_connection_followup: None,
shutdown_flag,
}
}
Expand Down Expand Up @@ -742,6 +753,23 @@ impl Session {
self.upstream_write_pending_time = d;
}

/// Queue a follow-up request on the same HTTP/1.1 upstream connection after the current
/// response body is consumed, without sending that response downstream. Use from
/// `response_filter` (e.g. after a 3xx with a same-origin `Location`). Disables caching for
/// the skipped hop when cache is enabled.
pub fn h1_set_same_connection_followup(&mut self, request: RequestHeader) {
if self.cache.enabled() {
self.cache
.disable(NoCacheReason::Custom("H1SameConnectionRedirect"));
}
self.h1_skip_downstream_response = true;
self.h1_same_connection_followup = Some(request);
}

pub(crate) fn h1_skip_downstream_for_upstream_response(&self) -> bool {
self.h1_skip_downstream_response
}

/// Is the proxy process in the process of shutting down (e.g. due to graceful upgrade)?
pub fn is_process_shutting_down(&self) -> bool {
self.shutdown_flag.load(Ordering::Acquire)
Expand Down Expand Up @@ -956,13 +984,23 @@ where
break;
}
// only log error that will be retried here, the final error will be logged below
warn!(
"Fail to proxy: {}, tries: {}, retry: {}, {}",
proxy_error.as_ref().unwrap(),
retries,
retry,
self.inner.request_summary(&session, &ctx)
);
if is_benign_upstream_retry_log(proxy_error.as_ref().unwrap()) {
debug!(
"Redirect follow: {}, tries: {}, retry: {}, {}",
proxy_error.as_ref().unwrap(),
retries,
retry,
self.inner.request_summary(&session, &ctx)
);
} else {
warn!(
"Fail to proxy: {}, tries: {}, retry: {}, {}",
proxy_error.as_ref().unwrap(),
retries,
retry,
self.inner.request_summary(&session, &ctx)
);
}
}
None => {
proxy_error = None;
Expand Down Expand Up @@ -1483,3 +1521,21 @@ where
Service::new(name, proxy)
}
}

#[cfg(test)]
mod h1_same_connection_tests {
use super::{RequestHeader, Session};
use http::Method;
use pingora_core::protocols::Stream;

#[tokio::test]
async fn followup_skips_intermediate_response_downstream() {
let input = b"GET /a HTTP/1.1\r\nHost: t\r\n\r\n";
let io = tokio_test::io::Builder::new().read(&input[..]).build();
let mut session = Session::new_h1(Box::new(io) as Stream);
assert!(session.read_request().await.unwrap());
let next = RequestHeader::build(Method::GET, b"/b", None).unwrap();
session.h1_set_same_connection_followup(next);
assert!(session.h1_skip_downstream_for_upstream_response());
}
}
30 changes: 24 additions & 6 deletions pingora-proxy/src/proxy_h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,28 @@ where
return (false, false, Some(e));
}

let (server_session_reuse, client_session_reuse, error) =
self.proxy_1to1(session, client_session, peer, ctx).await;
let mut server_session_reuse;
let mut client_session_reuse;
let mut error: Option<Box<Error>>;

loop {
if let Some(next_req) = session.h1_same_connection_followup.take() {
*session.req_header_mut() = next_req;
session.h1_skip_downstream_response = false;
}

(server_session_reuse, client_session_reuse, error) =
self.proxy_1to1(session, client_session, peer, ctx).await;

if error.is_none() && session.h1_same_connection_followup.is_some() {
continue;
}
break;
}

// Record upstream response body bytes received (payload only) for logging consumers.
let upstream_bytes_total = client_session.body_bytes_received();
session.set_upstream_body_bytes_received(upstream_bytes_total);

// Record upstream write pending time for this session only (delta from baseline).
let current_write_pending = client_session.stream().get_write_pending_time();
let upstream_write_pending = current_write_pending.saturating_sub(initial_write_pending);
session.set_upstream_write_pending_time(upstream_write_pending);
Expand Down Expand Up @@ -324,15 +338,19 @@ where
}
// check error and abort
// otherwise the error is surfaced via write_response_tasks()
if !serve_from_cache.should_send_to_downstream() {
if !serve_from_cache.should_send_to_downstream()
|| session.h1_skip_downstream_for_upstream_response()
{
if let HttpTask::Failed(e) = task {
return Err(e);
}
}
filtered_tasks.push(task);
}

if !serve_from_cache.should_send_to_downstream() {
if !serve_from_cache.should_send_to_downstream()
|| session.h1_skip_downstream_for_upstream_response()
{
// TODO: need to derive response_done from filtered_tasks in case downstream failed already
return Ok(None);
}
Expand Down
3 changes: 3 additions & 0 deletions pingora-proxy/src/proxy_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ pub trait ProxyHttp {
///
/// The modification is after caching. This filter is called for all responses including
/// responses served from cache.
///
/// To follow a redirect on the same HTTP/1.1 upstream connection (without using the outer
/// proxy retry path), use [`crate::Session::h1_set_same_connection_followup`].
async fn response_filter(
&self,
_session: &mut Session,
Expand Down
Loading