Skip to content

Commit cf260c1

Browse files
authored
Merge pull request #205 from pwrdrvr/issue-185/improve-close-handling
Issue-185 - Extract close into a function
2 parents 7ee62e8 + 5d0dd92 commit cf260c1

File tree

14 files changed

+619
-123
lines changed

14 files changed

+619
-123
lines changed

.github/workflows/build.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ jobs:
4747
rust-tests:
4848
needs: check-access
4949
runs-on: ubuntu-latest
50+
timeout-minutes: 10
5051
env:
5152
CARGO_TERM_COLOR: always
5253
steps:

extension/src/lambda_request.rs

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use rand::Rng;
22
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
33
use std::sync::Arc;
4+
use tokio::task::JoinHandle;
45

56
use rand::SeedableRng;
67
use tokio::{
@@ -12,8 +13,8 @@ use tokio_rustls::client::TlsStream;
1213
use crate::app_client::AppClient;
1314
use crate::endpoint::Endpoint;
1415
use crate::lambda_request_error::LambdaRequestError;
15-
use crate::messages;
16-
use crate::ping;
16+
use crate::messages::{self, ExitReason};
17+
use crate::ping::{self, send_close_request, PingResult};
1718
use crate::prelude::*;
1819
use crate::router_channel::RouterChannel;
1920
use crate::router_client::RouterClient;
@@ -43,6 +44,7 @@ pub struct LambdaRequest {
4344
requests_in_flight: Arc<AtomicUsize>,
4445
pub count: Arc<AtomicUsize>,
4546
start_time: u64,
47+
last_active_grace_period_ms: u64,
4648
}
4749

4850
impl LambdaRequest {
@@ -60,6 +62,7 @@ impl LambdaRequest {
6062
channel_count: u8,
6163
router_endpoint: Endpoint,
6264
deadline_ms: u64,
65+
last_active_grace_period_ms: u64,
6366
) -> Self {
6467
LambdaRequest {
6568
count: Arc::new(AtomicUsize::new(0)),
@@ -76,6 +79,7 @@ impl LambdaRequest {
7679
rng: rand::rngs::StdRng::from_entropy(),
7780
requests_in_flight: Arc::new(AtomicUsize::new(0)),
7881
start_time: current_time_millis(),
82+
last_active_grace_period_ms,
7983
}
8084
}
8185

@@ -86,7 +90,7 @@ impl LambdaRequest {
8690
router_client: RouterClient,
8791
) -> Result<messages::ExitReason, LambdaRequestError> {
8892
// Send the ping requests in background
89-
let ping_task = tokio::task::spawn(ping::send_ping_requests(
93+
let mut ping_task = Some(tokio::task::spawn(ping::send_ping_requests(
9094
Arc::clone(&self.last_active),
9195
Arc::clone(&self.goaway_received),
9296
router_client.clone(),
@@ -97,7 +101,8 @@ impl LambdaRequest {
97101
self.deadline_ms,
98102
self.cancel_token.clone(),
99103
Arc::clone(&self.requests_in_flight),
100-
));
104+
self.last_active_grace_period_ms,
105+
)));
101106

102107
// Startup the request channels
103108
let channel_futures = (0..self.channel_count)
@@ -169,7 +174,11 @@ impl LambdaRequest {
169174
if err.is_fatal() {
170175
// Try to clean up the ping task
171176
self.cancel_token.cancel();
172-
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), ping_task).await;
177+
let _ = tokio::time::timeout(
178+
std::time::Duration::from_secs(1),
179+
ping_task.take().unwrap(),
180+
)
181+
.await;
173182

174183
return Err(err);
175184
}
@@ -189,12 +198,31 @@ impl LambdaRequest {
189198
err
190199
);
191200

201+
// If we got here we had a panic so we need to ask the router to
202+
// shut down this Lambda invoke
203+
let _ = send_close_request(
204+
Arc::clone(&self.goaway_received),
205+
router_client,
206+
Arc::clone(&self.pool_id),
207+
Arc::clone(&self.lambda_id),
208+
self.router_endpoint.clone(),
209+
)
210+
.await;
211+
192212
// Set the goaway signal so other tasks stop
193213
self.goaway_received.store(true, Ordering::Release);
194214

195215
// Try to clean up the ping task
196216
self.cancel_token.cancel();
197-
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), ping_task).await;
217+
let _ =
218+
tokio::time::timeout(std::time::Duration::from_secs(1), ping_task.take().unwrap()).await;
219+
220+
// Shutdown the ping loop
221+
// Note: we don't really care what happens here because
222+
// we're already in a fatal error state
223+
let _ = self
224+
.wait_for_ping_loop(ping_task.take().unwrap(), exit_reason)
225+
.await;
198226

199227
// This is a bit of a lie: we don't know if the app connection is unreachable
200228
// but we do know we're in an non-deteministic state and that this
@@ -203,18 +231,36 @@ impl LambdaRequest {
203231
}
204232
}
205233

206-
// Wait for the ping loop to exit
234+
// Shutdown the ping loop
235+
exit_reason = self
236+
.wait_for_ping_loop(ping_task.take().unwrap(), exit_reason)
237+
.await
238+
.unwrap_or(exit_reason)
239+
.worse(exit_reason);
240+
241+
Ok(exit_reason)
242+
}
243+
244+
pub async fn wait_for_ping_loop(
245+
&mut self,
246+
ping_task: JoinHandle<Option<PingResult>>,
247+
exit_reason: ExitReason,
248+
) -> Result<ExitReason, LambdaRequestError> {
249+
// Tell the ping loop to stop
207250
self.cancel_token.cancel();
251+
252+
// Wait for the ping loop to exit
208253
match ping_task.await {
209254
Ok(result) => {
210255
// Ping task completed successfully
211256

212257
// If the ping task knows why we exited, use that reason
213258
if let Some(ping_result) = result {
214259
if let Some(ping_result) = ping_result.into() {
215-
exit_reason = exit_reason.worse(ping_result);
260+
return Ok(exit_reason.worse(ping_result));
216261
}
217262
}
263+
Ok(exit_reason)
218264
}
219265
Err(e) => {
220266
log::error!(
@@ -223,11 +269,9 @@ impl LambdaRequest {
223269
e
224270
);
225271
// We'll lump this in as a generic router connection error
226-
return Err(LambdaRequestError::RouterUnreachable.into());
272+
Err(LambdaRequestError::RouterUnreachable)
227273
}
228274
}
229-
230-
Ok(exit_reason)
231275
}
232276

233277
pub fn elapsed(&self) -> u64 {

extension/src/lambda_service.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ impl LambdaService {
188188
channel_count,
189189
request.router_endpoint,
190190
deadline_ms,
191+
self.options.last_active_grace_period_ms,
191192
);
192193

193194
//
@@ -205,7 +206,7 @@ impl LambdaService {
205206
exit_reason
206207
);
207208

208-
resp.exit_reason = exit_reason;
209+
resp.exit_reason = resp.exit_reason.worse(exit_reason);
209210
}
210211
Err(err) => {
211212
log::error!(

extension/src/options.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub struct Options {
4444
pub local_env: bool,
4545
pub force_deadline_secs: Option<Duration>,
4646
pub async_init_timeout: Duration,
47+
pub last_active_grace_period_ms: u64,
4748
}
4849

4950
impl Options {
@@ -81,6 +82,11 @@ impl Options {
8182
local_env: provider.get_var("LAMBDA_DISPATCH_FORCE_DEADLINE").is_ok(),
8283
force_deadline_secs,
8384
async_init_timeout,
85+
last_active_grace_period_ms: provider
86+
.get_var("LAMBDA_DISPATCH_LAST_ACTIVE_GRACE_PERIOD_MS")
87+
.ok()
88+
.and_then(|v| v.parse().ok())
89+
.unwrap_or(250),
8490
}
8591
}
8692
}

extension/src/ping.rs

Lines changed: 74 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,62 @@ impl From<PingResult> for Option<messages::ExitReason> {
4646
}
4747
}
4848

49+
pub async fn send_close_request(
50+
goaway_received: Arc<AtomicBool>,
51+
router_client: RouterClient,
52+
pool_id: PoolId,
53+
lambda_id: LambdaId,
54+
router_endpoint: Endpoint,
55+
) {
56+
let scheme = router_endpoint.scheme();
57+
let host = router_endpoint.host();
58+
let port = router_endpoint.port();
59+
let host_header = router_endpoint.host_header();
60+
61+
let close_url = format!(
62+
"{}://{}:{}/api/chunked/close/{}",
63+
scheme.as_ref(),
64+
host,
65+
port,
66+
lambda_id
67+
);
68+
69+
// Send Close request to router
70+
let (mut close_tx, close_recv) = mpsc::channel::<Result<Frame<Bytes>>>(1);
71+
let boxed_close_body = BodyExt::boxed(StreamBody::new(close_recv));
72+
let close_req = Request::builder()
73+
.uri(&close_url)
74+
.method("GET")
75+
.header(hyper::header::DATE, fmt_http_date(SystemTime::now()))
76+
.header("X-Pool-Id", pool_id.as_ref())
77+
.header("X-Lambda-Id", lambda_id.as_ref());
78+
let close_req = match &host_header {
79+
Cow::Borrowed(v) => close_req.header(hyper::header::HOST, *v),
80+
Cow::Owned(v) => close_req.header(hyper::header::HOST, v),
81+
};
82+
let close_req = close_req.body(boxed_close_body).unwrap();
83+
84+
let router_result = router_client.request(close_req).await;
85+
close_tx.close().await.unwrap();
86+
match router_result {
87+
Ok(mut router_res) => {
88+
// Rip through and discard so the response stream is closed
89+
while router_res.frame().await.is_some() {}
90+
}
91+
Err(err) => {
92+
log::error!(
93+
"PoolId: {}, LambdaId: {} - PingLoop - Close request failed: {:?}",
94+
pool_id,
95+
lambda_id,
96+
err
97+
);
98+
}
99+
}
100+
101+
// Now mark that we are going away, after router has responded to our close request
102+
goaway_received.store(true, Ordering::Release);
103+
}
104+
49105
pub async fn send_ping_requests(
50106
last_active: Arc<AtomicU64>,
51107
goaway_received: Arc<AtomicBool>,
@@ -57,10 +113,11 @@ pub async fn send_ping_requests(
57113
deadline_ms: u64,
58114
cancel_token: tokio_util::sync::CancellationToken,
59115
requests_in_flight: Arc<AtomicUsize>,
116+
last_active_grace_period_ms: u64,
60117
) -> Option<PingResult> {
61118
let mut ping_result = None;
62119
let start_time = time::current_time_millis();
63-
let mut last_ping_time = start_time;
120+
let mut last_ping_time = 0;
64121

65122
let scheme = router_endpoint.scheme();
66123
let host = router_endpoint.host();
@@ -76,17 +133,9 @@ pub async fn send_ping_requests(
76133
port,
77134
lambda_id
78135
);
79-
let close_url = format!(
80-
"{}://{}:{}/api/chunked/close/{}",
81-
scheme.as_ref(),
82-
host,
83-
port,
84-
lambda_id
85-
);
86136

87137
while !goaway_received.load(std::sync::atomic::Ordering::Acquire) && !cancel_token.is_cancelled()
88138
{
89-
let last_active_grace_period_ms = 250;
90139
let close_before_deadline_ms = 15000;
91140
let last_active = last_active.load(Ordering::Acquire);
92141
let last_active_ago_ms = if last_active == 0 {
@@ -137,39 +186,14 @@ pub async fn send_ping_requests(
137186
}
138187

139188
// Send Close request to router
140-
let (mut close_tx, close_recv) = mpsc::channel::<Result<Frame<Bytes>>>(1);
141-
let boxed_close_body = BodyExt::boxed(StreamBody::new(close_recv));
142-
let close_req = Request::builder()
143-
.uri(&close_url)
144-
.method("GET")
145-
.header(hyper::header::DATE, fmt_http_date(SystemTime::now()))
146-
.header("X-Pool-Id", pool_id.as_ref())
147-
.header("X-Lambda-Id", lambda_id.as_ref());
148-
let close_req = match &host_header {
149-
Cow::Borrowed(v) => close_req.header(hyper::header::HOST, *v),
150-
Cow::Owned(v) => close_req.header(hyper::header::HOST, v),
151-
};
152-
let close_req = close_req.body(boxed_close_body).unwrap();
153-
154-
let router_result = router_client.request(close_req).await;
155-
close_tx.close().await.unwrap();
156-
match router_result {
157-
Ok(mut router_res) => {
158-
// Rip through and discard so the response stream is closed
159-
while router_res.frame().await.is_some() {}
160-
}
161-
Err(err) => {
162-
log::error!(
163-
"PoolId: {}, LambdaId: {} - PingLoop - Close request failed: {:?}",
164-
pool_id,
165-
lambda_id,
166-
err
167-
);
168-
}
169-
}
170-
171-
// Now mark that we are going away, after router has responded to our close request
172-
goaway_received.store(true, Ordering::Release);
189+
send_close_request(
190+
Arc::clone(&goaway_received),
191+
router_client.clone(),
192+
Arc::clone(&pool_id),
193+
Arc::clone(&lambda_id),
194+
router_endpoint.clone(),
195+
)
196+
.await;
173197
break;
174198
}
175199

@@ -207,6 +231,8 @@ pub async fn send_ping_requests(
207231
lambda_id
208232
);
209233
ping_result.get_or_insert(PingResult::GoAway);
234+
235+
// This is from a goaway, so we do not need to ask the router to close our invoke
210236
goaway_received.store(true, Ordering::Release);
211237
break;
212238
}
@@ -225,6 +251,9 @@ pub async fn send_ping_requests(
225251
lambda_id,
226252
parts.status
227253
);
254+
255+
// TODO: This is not from a goaway, so we need to ask
256+
// the router to close our invoke
228257
goaway_received.store(true, Ordering::Release);
229258
break;
230259
}
@@ -237,6 +266,9 @@ pub async fn send_ping_requests(
237266
lambda_id,
238267
err
239268
);
269+
270+
// TODO: This is not from a goaway, so we need to ask
271+
// the router to close our invoke
240272
goaway_received.store(true, Ordering::Release);
241273
break;
242274
}

0 commit comments

Comments
 (0)