Skip to content

Commit

Permalink
Add early rejection of IOs if too many Downstairs are inactive (#1565)
Browse files Browse the repository at this point in the history
This aborts the IO before passing it to the Downstairs, so it's not
assigned a `JobId` or put into the `ActiveJobs` map. The most noticeable
change is that writes are now fast-err'd instead of fast-acked if > 1
Downstairs is inactive.
  • Loading branch information
mkeeter authored Jan 28, 2025
1 parent df17b64 commit 86a2ce1
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 3 deletions.
14 changes: 13 additions & 1 deletion upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1891,7 +1891,19 @@ impl DownstairsClient {
self.client_delay_us.load(Ordering::Relaxed)
}

/// Looks up the region UUID
/// Checks whether the client is in a state where it can accept IO
pub(crate) fn is_accepting_io(&self) -> bool {
matches!(
self.state,
DsState::Active
| DsState::LiveRepair
| DsState::Connecting {
mode: ConnectionMode::Offline,
..
}
)
}

pub(crate) fn id(&self) -> Option<Uuid> {
self.region_uuid
}
Expand Down
7 changes: 7 additions & 0 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3326,6 +3326,13 @@ impl Downstairs {
}
}

/// Returns the number of clients that can accept IO
///
/// A client can accept IO if it is in the `Active` or `LiveRepair` state.
pub fn active_client_count(&self) -> usize {
self.clients.iter().filter(|c| c.is_accepting_io()).count()
}

/// Wrapper for marking a single job as done from the given client
///
/// This can be used to test handling of job acks, etc
Expand Down
254 changes: 252 additions & 2 deletions upstairs/src/dummy_downstairs_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use crate::guest::Guest;
use crate::up_main;
use crate::BlockIO;
use crate::Buffer;
use crate::ClientFaultReason;
use crate::ClientStopReason;
use crate::ConnectionMode;
use crate::CrucibleError;
use crate::DsState;
Expand All @@ -33,6 +35,7 @@ use crucible_protocol::JobId;
use crucible_protocol::Message;
use crucible_protocol::ReadBlockContext;
use crucible_protocol::ReadResponseHeader;
use crucible_protocol::SnapshotDetails;
use crucible_protocol::WriteHeader;

use bytes::BytesMut;
Expand Down Expand Up @@ -289,6 +292,35 @@ impl DownstairsHandle {
}
}

/// Awaits a `Message::Flush` and sends a `FlushAck` with an `IoError`
///
/// Returns the flush number for further checks.
///
/// # Panics
/// If a non-flush message arrives
pub async fn err_flush(&mut self) -> u64 {
match self.recv().await.unwrap() {
Message::Flush {
job_id,
flush_number,
upstairs_id,
..
} => {
self.send(Message::FlushAck {
upstairs_id,
session_id: self.upstairs_session_id.unwrap(),
job_id,
result: Err(CrucibleError::IoError("oh no".to_string())),
})
.unwrap();
flush_number
}
m => {
panic!("saw non flush {m:?}");
}
}
}

/// Awaits a `Message::Write { .. }` and sends a `WriteAck`
///
/// Returns the job ID for further checks.
Expand All @@ -311,6 +343,23 @@ impl DownstairsHandle {
}
}

/// Awaits a `Message::Write` and sends a `WriteAck` with `IOError`
pub async fn err_write(&mut self) -> JobId {
match self.recv().await.unwrap() {
Message::Write { header, .. } => {
self.send(Message::WriteAck {
upstairs_id: header.upstairs_id,
session_id: self.upstairs_session_id.unwrap(),
job_id: header.job_id,
result: Err(CrucibleError::IoError("oh no".to_string())),
})
.unwrap();
header.job_id
}
m => panic!("saw non write: {m:?}"),
}
}

/// Awaits a `Message::Barrier { .. }` and sends a `BarrierAck`
///
/// Returns the job ID for further checks.
Expand Down Expand Up @@ -358,7 +407,7 @@ impl DownstairsHandle {
job_id,
blocks: Ok(vec![block]),
},
data: data.clone(),
data,
})
.unwrap();
job_id
Expand Down Expand Up @@ -811,7 +860,7 @@ async fn run_live_repair(mut harness: TestHarness) {
job_id,
blocks: Ok(vec![block]),
},
data: data.clone(),
data,
}) {
Ok(()) => panic!("DS1 should be disconnected"),
Err(e) => {
Expand Down Expand Up @@ -3055,3 +3104,204 @@ async fn test_bytes_based_barrier() {
harness.ds2.ack_flush().await;
harness.ds3.ack_flush().await;
}

fn assert_faulted(s: &DsState) {
match s {
DsState::Stopping(ClientStopReason::Fault(
ClientFaultReason::RequestedFault,
))
| DsState::Connecting {
mode: ConnectionMode::Faulted,
..
} => (),
_ => panic!("invalid state: expected faulted, got {s:?}"),
}
}

/// Test for early rejection of writes if > 1 Downstairs is unavailable
#[tokio::test]
async fn fast_write_rejection() {
let mut harness = TestHarness::new().await;

let write_buf = BytesMut::from(vec![1; 4096].as_slice());
harness
.guest
.write(BlockIndex(0), write_buf.clone())
.await
.unwrap();

harness.ds1().err_write().await;
harness.ds2.ack_write().await;
harness.ds3.ack_write().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_faulted(&ds[ClientId::new(0)]);
assert_eq!(ds[ClientId::new(1)], DsState::Active);
assert_eq!(ds[ClientId::new(2)], DsState::Active);

// Send a second write, which should still work (because we have 2/3 ds)
harness
.guest
.write(BlockIndex(0), write_buf.clone())
.await
.unwrap();
harness.ds2.err_write().await;
harness.ds3.ack_write().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_faulted(&ds[ClientId::new(0)]);
assert_faulted(&ds[ClientId::new(1)]);
assert_eq!(ds[ClientId::new(2)], DsState::Active);

// Subsequent writes should be rejected immediately
let r = harness.guest.write(BlockIndex(0), write_buf.clone()).await;
assert!(
matches!(r, Err(CrucibleError::IoError(..))),
"expected IoError, got {r:?}"
);
}

/// Make sure reads work with only 1x Downstairs
#[tokio::test]
async fn read_with_one_fault() {
let mut harness = TestHarness::new().await;

// Use a write to fault DS0 (XXX why do read errors not fault a DS?)
let write_buf = BytesMut::from(vec![1; 4096].as_slice());
harness
.guest
.write(BlockIndex(0), write_buf.clone())
.await
.unwrap();
harness.ds1().err_write().await;
harness.ds2.ack_write().await;
harness.ds3.ack_write().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_faulted(&ds[ClientId::new(0)]);
assert_eq!(ds[ClientId::new(1)], DsState::Active);
assert_eq!(ds[ClientId::new(2)], DsState::Active);

// Check that reads still work
let h = harness.spawn(|guest| async move {
let mut buffer = Buffer::new(1, 512);
guest.read(BlockIndex(0), &mut buffer).await.unwrap();
});
harness.ds2.ack_read().await;
h.await.unwrap(); // we have > 1x reply, so the read will return
harness.ds3.ack_read().await;

// Take out DS2 next
harness
.guest
.write(BlockIndex(0), write_buf.clone())
.await
.unwrap();
harness.ds2.err_write().await;
harness.ds3.ack_write().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_faulted(&ds[ClientId::new(0)]);
assert_faulted(&ds[ClientId::new(1)]);
assert_eq!(ds[ClientId::new(2)], DsState::Active);

// Reads still work with 1x Downstairs
let h = harness.spawn(|guest| async move {
let mut buffer = Buffer::new(1, 512);
guest.read(BlockIndex(0), &mut buffer).await.unwrap();
});
harness.ds3.ack_read().await;
h.await.unwrap(); // we have > 1x reply, so the read will return
}

/// Test early rejection of reads with 0x running Downstairs
#[tokio::test]
async fn fast_read_rejection() {
let mut harness = TestHarness::new().await;

// Use a write to fault DS0 (XXX why do read errors not fault a DS?)
let write_buf = BytesMut::from(vec![1; 4096].as_slice());
harness
.guest
.write(BlockIndex(0), write_buf.clone())
.await
.unwrap();
harness.ds1().err_write().await;
harness.ds2.err_write().await;
harness.ds3.err_write().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_faulted(&ds[ClientId::new(0)]);
assert_faulted(&ds[ClientId::new(1)]);
assert_faulted(&ds[ClientId::new(2)]);

// Reads should return errors immediately
let mut buffer = Buffer::new(1, 512);
match harness.guest.read(BlockIndex(0), &mut buffer).await {
Err(CrucibleError::IoError(s)) => {
assert!(s.contains("too many inactive clients"))
}
r => panic!("expected IoError, got {r:?}"),
}
}

/// Test for early rejection of flushes
#[tokio::test]
async fn fast_flush_rejection() {
let mut harness = TestHarness::new().await;

let h = harness.spawn(|guest| async move {
guest.flush(None).await.unwrap();
});
harness.ds1().err_flush().await;
harness.ds2.ack_flush().await;
harness.ds3.ack_flush().await;
h.await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_faulted(&ds[ClientId::new(0)]);
assert_eq!(ds[ClientId::new(1)], DsState::Active);
assert_eq!(ds[ClientId::new(2)], DsState::Active);

// A flush with snapshot should fail immediately
match harness
.guest
.flush(Some(SnapshotDetails {
snapshot_name: "hiiiii".to_string(),
}))
.await
{
Err(CrucibleError::IoError(s)) => {
assert!(s.contains("too many inactive clients"))
}
r => panic!("expected IoError, got {r:?}"),
}

// A non-snapshot flush should still succeed
let h = harness.spawn(|guest| async move {
guest.flush(None).await.unwrap();
});
harness.ds2.ack_flush().await;
harness.ds3.ack_flush().await;
h.await.unwrap();

// Use a flush to take out another downstairs
let h = harness.spawn(|guest| async move { guest.flush(None).await });
harness.ds2.ack_flush().await;
harness.ds3.err_flush().await;
let r = h.await.unwrap();
assert!(r.is_err());
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let ds = harness.guest.downstairs_state().await.unwrap();
assert_faulted(&ds[ClientId::new(0)]);
assert_eq!(ds[ClientId::new(1)], DsState::Active);
assert_faulted(&ds[ClientId::new(2)]);

// Subsequent flushes should fail immediately
match harness.guest.flush(None).await {
Err(CrucibleError::IoError(s)) => {
assert!(s.contains("too many inactive clients"))
}
r => panic!("expected IoError, got {r:?}"),
}
}
5 changes: 5 additions & 0 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,11 @@ impl DownstairsIO {

let bad_job = match &self.work {
IOop::Read { .. } => wc.done == 0,
// Flushes with snapshots must be good on all 3x Downstairs
IOop::Flush {
snapshot_details: Some(..),
..
} => wc.skipped + wc.error > 0,
IOop::Write { .. }
| IOop::WriteUnwritten { .. }
| IOop::Flush { .. }
Expand Down
28 changes: 28 additions & 0 deletions upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,15 @@ impl Upstairs {
done.send_err(CrucibleError::UpstairsInactive);
return;
}

let n = self.downstairs.active_client_count();
let required = if snapshot_details.is_some() { 3 } else { 2 };
if n < required {
done.send_err(CrucibleError::IoError(format!(
"too many inactive clients: need {required}, got {n}"
)));
return;
}
self.submit_flush(Some(done), snapshot_details, Some(io_guard));
}
BlockOp::ReplaceDownstairs { id, old, new, done } => {
Expand Down Expand Up @@ -1351,6 +1360,17 @@ impl Upstairs {
return;
}

let n = self.downstairs.active_client_count();
if n < 1 {
res.send_err((
data,
CrucibleError::IoError(format!(
"too many inactive clients: need 1, got {n}"
)),
));
return;
}

/*
* Get the next ID for the guest work struct we will make at the
* end. This ID is also put into the IO struct we create that
Expand Down Expand Up @@ -1467,6 +1487,14 @@ impl Upstairs {
return None;
}

let n = self.downstairs.active_client_count();
if n < 2 {
res.send_err(CrucibleError::IoError(format!(
"too many inactive clients: need 2, got {n}"
)));
return None;
}

/*
* Verify IO is in range for our region
*/
Expand Down

0 comments on commit 86a2ce1

Please sign in to comment.