Skip to content

Commit d2cadce

Browse files
Make caching stall tests serial
Also temp ignore the active RUSTSECs until the internal dependency bumps are synced.
1 parent 181b3ad commit d2cadce

4 files changed

Lines changed: 176 additions & 146 deletions

File tree

.cargo/audit.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[advisories]
2+
ignore = ["RUSTSEC-2026-0097", "RUSTSEC-2026-0098", "RUSTSEC-2026-0099"]

pingora-proxy/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ futures-util = "0.3"
5353
serde = { version = "1.0", features = ["derive"] }
5454
serde_json = "1.0"
5555
serde_yaml = "0.9"
56+
serial_test = "3.0"
5657

5758
[target.'cfg(unix)'.dev-dependencies]
5859
hyperlocal = "0.8"
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// Copyright 2026 Cloudflare, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod utils;
16+
17+
use utils::server_utils::init;
18+
19+
use reqwest::StatusCode;
20+
use serial_test::serial;
21+
use std::time::Duration;
22+
use tokio::time::sleep;
23+
24+
// Tests to run in serial, due to interference or resource consumption.
25+
26+
#[tokio::test]
27+
#[serial]
28+
async fn test_caching_when_downstream_stalls() {
29+
use std::net::ToSocketAddrs;
30+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
31+
use tokio::net::TcpStream;
32+
33+
init();
34+
let url = "http://127.0.0.1:6148/unique/test_caching_when_downstream_stalls/download/";
35+
36+
// Connection 1: read 10KiB then stall, holding the cache lock while
37+
// the proxy populates cache from upstream.
38+
let slow_task = tokio::spawn(async move {
39+
let addr = "127.0.0.1:6148".to_socket_addrs().unwrap().next().unwrap();
40+
let mut stream = TcpStream::connect(&addr).await.unwrap();
41+
42+
let request = concat!(
43+
"GET /unique/test_caching_when_downstream_stalls/download/ HTTP/1.1\r\n",
44+
"Host: 127.0.0.1:6148\r\n",
45+
"x-lock: true\r\n",
46+
"x-set-cache-control: public, max-age=60\r\n",
47+
"\r\n",
48+
);
49+
stream.write_all(request.as_bytes()).await.unwrap();
50+
51+
let mut buf = [0; 10 * 1024];
52+
let mut b = &mut buf[..];
53+
while !b.is_empty() {
54+
let n = stream.read(b).await.unwrap();
55+
b = &mut b[n..]
56+
}
57+
58+
// Hold the stalled connection open long enough
59+
sleep(Duration::from_secs(10)).await;
60+
});
61+
62+
// Give connection 1 time to acquire the cache lock
63+
sleep(Duration::from_secs(1)).await;
64+
65+
// Connection 2: should get a cache hit once the proxy finishes
66+
// populating cache from upstream (independent of stall).
67+
let start = tokio::time::Instant::now();
68+
let res = reqwest::Client::new()
69+
.get(url)
70+
.header("x-lock", "true")
71+
.header("x-set-cache-control", "public, max-age=60")
72+
.timeout(Duration::from_secs(8))
73+
.send()
74+
.await
75+
.unwrap();
76+
77+
assert_eq!(res.status(), StatusCode::OK);
78+
let headers = res.headers();
79+
assert_eq!(headers["x-cache-status"], "hit");
80+
81+
// If the cache was populated fast enough (before connection 2 arrived),
82+
// there is no lock contention and x-cache-lock-time-ms is absent.
83+
// If there was contention, the wait should be short.
84+
if let Some(lock_ms) = headers.get("x-cache-lock-time-ms") {
85+
let ms: u64 = lock_ms.to_str().unwrap().parse().unwrap();
86+
assert!(
87+
ms < 2000,
88+
"lock wait {ms}ms should be well under the 2s timeout"
89+
);
90+
}
91+
92+
assert_eq!(
93+
res.text().await.unwrap(),
94+
String::from("A").repeat(4 * 1024 * 1024)
95+
);
96+
97+
let elapsed = start.elapsed();
98+
assert!(
99+
elapsed < Duration::from_secs(5),
100+
"second request took {elapsed:?}, should be fast"
101+
);
102+
103+
// Don't wait for the slow connection
104+
slow_task.abort();
105+
}
106+
107+
// Same as test_caching_when_downstream_stalls but the proxy connects
108+
// to the origin over H2 (via the x-h2 header).
109+
//
110+
#[tokio::test]
111+
#[serial]
112+
async fn test_caching_h2_upstream_when_downstream_stalls() {
113+
use std::net::ToSocketAddrs;
114+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
115+
use tokio::net::TcpStream;
116+
117+
init();
118+
let url =
119+
"http://127.0.0.1:6148/unique/test_caching_h2_upstream_when_downstream_stalls/download/";
120+
121+
let slow_task = tokio::spawn(async move {
122+
let addr = "127.0.0.1:6148".to_socket_addrs().unwrap().next().unwrap();
123+
let mut stream = TcpStream::connect(&addr).await.unwrap();
124+
125+
let request = concat!(
126+
"GET /unique/test_caching_h2_upstream_when_downstream_stalls/download/ HTTP/1.1\r\n",
127+
"Host: 127.0.0.1:6148\r\n",
128+
"x-h2: true\r\n",
129+
"x-lock: true\r\n",
130+
"x-set-cache-control: public, max-age=60\r\n",
131+
"\r\n",
132+
);
133+
stream.write_all(request.as_bytes()).await.unwrap();
134+
135+
let mut buf = [0; 10 * 1024];
136+
let mut b = &mut buf[..];
137+
while !b.is_empty() {
138+
let n = stream.read(b).await.unwrap();
139+
b = &mut b[n..]
140+
}
141+
142+
sleep(Duration::from_secs(10)).await;
143+
});
144+
145+
sleep(Duration::from_secs(1)).await;
146+
147+
let start = tokio::time::Instant::now();
148+
let res = reqwest::Client::new()
149+
.get(url)
150+
.header("x-h2", "true")
151+
.header("x-lock", "true")
152+
.header("x-set-cache-control", "public, max-age=60")
153+
.timeout(Duration::from_secs(8))
154+
.send()
155+
.await
156+
.unwrap();
157+
158+
assert_eq!(res.status(), StatusCode::OK);
159+
let headers = res.headers();
160+
assert_eq!(headers["x-cache-status"], "hit");
161+
assert_eq!(
162+
res.text().await.unwrap(),
163+
String::from("A").repeat(4 * 1024 * 1024)
164+
);
165+
166+
let elapsed = start.elapsed();
167+
assert!(
168+
elapsed < Duration::from_secs(5),
169+
"second request took {elapsed:?}, should be fast (upstream-speed-bound)"
170+
);
171+
172+
slow_task.abort();
173+
}

pingora-proxy/tests/test_upstream.rs

Lines changed: 0 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -2913,152 +2913,6 @@ mod test_cache {
29132913
assert_eq!(res.text().await.unwrap(), "hello world");
29142914
}
29152915

2916-
#[tokio::test]
2917-
async fn test_caching_when_downstream_stalls() {
2918-
use std::net::ToSocketAddrs;
2919-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2920-
use tokio::net::TcpStream;
2921-
2922-
init();
2923-
let url = "http://127.0.0.1:6148/unique/test_caching_when_downstream_stalls/download/";
2924-
2925-
// Connection 1: read 10KiB then stall, holding the cache lock while
2926-
// the proxy populates cache from upstream.
2927-
let slow_task = tokio::spawn(async move {
2928-
let addr = "127.0.0.1:6148".to_socket_addrs().unwrap().next().unwrap();
2929-
let mut stream = TcpStream::connect(&addr).await.unwrap();
2930-
2931-
let request = concat!(
2932-
"GET /unique/test_caching_when_downstream_stalls/download/ HTTP/1.1\r\n",
2933-
"Host: 127.0.0.1:6148\r\n",
2934-
"x-lock: true\r\n",
2935-
"x-set-cache-control: public, max-age=60\r\n",
2936-
"\r\n",
2937-
);
2938-
stream.write_all(request.as_bytes()).await.unwrap();
2939-
2940-
let mut buf = [0; 10 * 1024];
2941-
let mut b = &mut buf[..];
2942-
while !b.is_empty() {
2943-
let n = stream.read(b).await.unwrap();
2944-
b = &mut b[n..]
2945-
}
2946-
2947-
// Hold the stalled connection open long enough
2948-
sleep(Duration::from_secs(10)).await;
2949-
});
2950-
2951-
// Give connection 1 time to acquire the cache lock
2952-
sleep(Duration::from_secs(1)).await;
2953-
2954-
// Connection 2: should get a cache hit once the proxy finishes
2955-
// populating cache from upstream (independent of stall).
2956-
let start = tokio::time::Instant::now();
2957-
let res = reqwest::Client::new()
2958-
.get(url)
2959-
.header("x-lock", "true")
2960-
.header("x-set-cache-control", "public, max-age=60")
2961-
.timeout(Duration::from_secs(8))
2962-
.send()
2963-
.await
2964-
.unwrap();
2965-
2966-
assert_eq!(res.status(), StatusCode::OK);
2967-
let headers = res.headers();
2968-
assert_eq!(headers["x-cache-status"], "hit");
2969-
2970-
// If the cache was populated fast enough (before connection 2 arrived),
2971-
// there is no lock contention and x-cache-lock-time-ms is absent.
2972-
// If there was contention, the wait should be short.
2973-
if let Some(lock_ms) = headers.get("x-cache-lock-time-ms") {
2974-
let ms: u64 = lock_ms.to_str().unwrap().parse().unwrap();
2975-
assert!(
2976-
ms < 2000,
2977-
"lock wait {ms}ms should be well under the 2s timeout"
2978-
);
2979-
}
2980-
2981-
assert_eq!(
2982-
res.text().await.unwrap(),
2983-
String::from("A").repeat(4 * 1024 * 1024)
2984-
);
2985-
2986-
let elapsed = start.elapsed();
2987-
assert!(
2988-
elapsed < Duration::from_secs(5),
2989-
"second request took {elapsed:?}, should be fast"
2990-
);
2991-
2992-
// Don't wait for the slow connection
2993-
slow_task.abort();
2994-
}
2995-
2996-
// Same as test_caching_when_downstream_stalls but the proxy connects
2997-
// to the origin over H2 (via the x-h2 header).
2998-
//
2999-
#[tokio::test]
3000-
async fn test_caching_h2_upstream_when_downstream_stalls() {
3001-
use std::net::ToSocketAddrs;
3002-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
3003-
use tokio::net::TcpStream;
3004-
3005-
init();
3006-
let url = "http://127.0.0.1:6148/unique/test_caching_h2_upstream_when_downstream_stalls/download/";
3007-
3008-
let slow_task = tokio::spawn(async move {
3009-
let addr = "127.0.0.1:6148".to_socket_addrs().unwrap().next().unwrap();
3010-
let mut stream = TcpStream::connect(&addr).await.unwrap();
3011-
3012-
let request = concat!(
3013-
"GET /unique/test_caching_h2_upstream_when_downstream_stalls/download/ HTTP/1.1\r\n",
3014-
"Host: 127.0.0.1:6148\r\n",
3015-
"x-h2: true\r\n",
3016-
"x-lock: true\r\n",
3017-
"x-set-cache-control: public, max-age=60\r\n",
3018-
"\r\n",
3019-
);
3020-
stream.write_all(request.as_bytes()).await.unwrap();
3021-
3022-
let mut buf = [0; 10 * 1024];
3023-
let mut b = &mut buf[..];
3024-
while !b.is_empty() {
3025-
let n = stream.read(b).await.unwrap();
3026-
b = &mut b[n..]
3027-
}
3028-
3029-
sleep(Duration::from_secs(10)).await;
3030-
});
3031-
3032-
sleep(Duration::from_secs(1)).await;
3033-
3034-
let start = tokio::time::Instant::now();
3035-
let res = reqwest::Client::new()
3036-
.get(url)
3037-
.header("x-h2", "true")
3038-
.header("x-lock", "true")
3039-
.header("x-set-cache-control", "public, max-age=60")
3040-
.timeout(Duration::from_secs(8))
3041-
.send()
3042-
.await
3043-
.unwrap();
3044-
3045-
assert_eq!(res.status(), StatusCode::OK);
3046-
let headers = res.headers();
3047-
assert_eq!(headers["x-cache-status"], "hit");
3048-
assert_eq!(
3049-
res.text().await.unwrap(),
3050-
String::from("A").repeat(4 * 1024 * 1024)
3051-
);
3052-
3053-
let elapsed = start.elapsed();
3054-
assert!(
3055-
elapsed < Duration::from_secs(5),
3056-
"second request took {elapsed:?}, should be fast (upstream-speed-bound)"
3057-
);
3058-
3059-
slow_task.abort();
3060-
}
3061-
30622916
async fn send_vary_req_with_headers_with_dups(
30632917
url: &str,
30642918
vary_field: &str,

0 commit comments

Comments
 (0)