Skip to content

Commit 63eded0

Browse files
committed
enhancement(antithesis): Expand SDK assertions, hammer sketch
This commit expands the SDK asserts in the SUT, allowing us to flag more invariant flaws in runs. Of particular interest this commit introduces a new driver, the 'sketchburst'. The goal of this driver is to transmit sketch load in a way that 'bursts' load that will fall into a small number of ddsketch bins. This commit is also a demonstration of what antithesis sdk assertions without some manner of shim or other way of tidying them up looks like in a codebase, per recent review discussions.
1 parent bc9e3b9 commit 63eded0

37 files changed

Lines changed: 1022 additions & 124 deletions

File tree

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/agent-data-plane/src/main.rs

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,30 @@ async fn main() -> Result<(), GenericError> {
5050
#[cfg(feature = "antithesis")]
5151
antithesis_sdk::antithesis_init();
5252

53+
// Report any panic to Antithesis before the process aborts. The antithesis build sets
54+
// `panic = "abort"`, and the captured container logs do not surface ADP's stderr, so the default
55+
// panic message is invisible in triage. Route the panic payload and site through the SDK, which
56+
// writes to sdk.jsonl -- a channel that survives the abort -- then chain to the default hook so a
57+
// local build still prints normally.
58+
#[cfg(feature = "antithesis")]
59+
{
60+
let default_hook = std::panic::take_hook();
61+
std::panic::set_hook(Box::new(move |info| {
62+
let location = info.location().map_or_else(String::new, |l| l.to_string());
63+
let payload = info.payload();
64+
let message = payload
65+
.downcast_ref::<&str>()
66+
.map(|s| (*s).to_string())
67+
.or_else(|| payload.downcast_ref::<String>().cloned())
68+
.unwrap_or_else(|| "<non-string panic payload>".to_string());
69+
antithesis_sdk::assert_unreachable!(
70+
"agent-data-plane panicked",
71+
&serde_json::json!({ "message": message, "location": location })
72+
);
73+
default_hook(info);
74+
}));
75+
}
76+
5377
let cli: Cli = argh::from_env();
5478

5579
// Print version and exit early without requiring config.
@@ -61,14 +85,27 @@ async fn main() -> Result<(), GenericError> {
6185
// Load our "bootstrap" configuration -- static configuration on disk or from environment variables -- so we can
6286
// initialize basic subsystems before executing the given subcommand.
6387
let bootstrap_config_path = cli.config_file.unwrap_or_else(PlatformSettings::get_config_file_path);
64-
let bootstrap_config = ConfigurationLoader::default()
88+
let loaded = ConfigurationLoader::default()
6589
.with_key_aliases(KEY_ALIASES)
6690
.from_yaml(&bootstrap_config_path)
67-
.error_context("Failed to load Datadog Agent configuration file during bootstrap.")?
68-
.add_providers([DatadogRemapper::new()])
69-
.from_environment(PlatformSettings::get_env_var_prefix())
70-
.error_context("Environment variable prefix should not be empty.")?
71-
.bootstrap_generic();
91+
.error_context("Failed to load Datadog Agent configuration file during bootstrap.")
92+
.and_then(|loader| {
93+
loader
94+
.add_providers([DatadogRemapper::new()])
95+
.from_environment(PlatformSettings::get_env_var_prefix())
96+
.error_context("Environment variable prefix should not be empty.")
97+
});
98+
99+
// Classify a graceful config rejection (process exit 1) versus a clean boot. A false evaluation
100+
// carries the rejection reason, so triage shows which sampled `datadog.yaml` ADP refused and why.
101+
#[cfg(feature = "antithesis")]
102+
antithesis_sdk::assert_always_or_unreachable!(
103+
loaded.is_ok(),
104+
"agent-data-plane boots under sampled config",
105+
&serde_json::json!({ "phase": "config_load", "error": loaded.as_ref().err().map(|e| format!("{e:?}")) })
106+
);
107+
108+
let bootstrap_config = loaded?.bootstrap_generic();
72109

73110
// Translate the bootstrap configuration into ADP's logging configuration, applying ADP-specific rules
74111
// (per-subagent log file key, never sharing a file with the Core Agent).
@@ -157,6 +194,14 @@ async fn run_inner(
157194
}
158195
Err(e) => {
159196
error!("{:?}", e);
197+
// Same property as the config-load gate: a run-setup error is also a graceful
198+
// exit-1 under this sampled config, distinguished by `phase` in the details.
199+
#[cfg(feature = "antithesis")]
200+
antithesis_sdk::assert_always_or_unreachable!(
201+
false,
202+
"agent-data-plane boots under sampled config",
203+
&serde_json::json!({ "phase": "run_setup", "error": format!("{e:?}") })
204+
);
160205
Some(1)
161206
}
162207
};

lib/ddsketch/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ repository = { workspace = true }
99
workspace = true
1010

1111
[features]
12+
antithesis = ["dep:antithesis_sdk", "antithesis_sdk/full"]
1213
ddsketch_extended = []
1314
serde = ["dep:serde", "smallvec/serde"]
1415

1516
[dependencies]
17+
antithesis_sdk = { workspace = true, optional = true }
1618
datadog-protos = { workspace = true }
1719
float-cmp = { workspace = true, features = ["ratio"] }
1820
ordered-float = { workspace = true }

lib/ddsketch/src/agent/sketch.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,9 @@ impl DDSketch {
186186
}
187187

188188
fn adjust_basic_stats(&mut self, v: f64, n: u64) {
189+
#[cfg(feature = "antithesis")]
190+
antithesis_sdk::assert_always!(v.is_finite(), "DDSketch sample is finite at insert");
191+
189192
if v < self.min {
190193
self.min = v;
191194
}
@@ -197,6 +200,9 @@ impl DDSketch {
197200
self.count += n;
198201
self.sum += v * n as f64;
199202

203+
#[cfg(feature = "antithesis")]
204+
antithesis_sdk::assert_always!(self.sum.is_finite(), "sketch sum stays finite after insert");
205+
200206
if n == 1 {
201207
self.avg += (v - self.avg) / self.count as f64;
202208
} else {
@@ -711,6 +717,20 @@ fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
711717

712718
// Drop the removed prefix, leaving exactly bin_limit bins.
713719
bins.drain(0..num_to_remove);
720+
721+
// We only reach here when the sketch exceeded `bin_limit` and collapsed; the drain leaves exactly `bin_limit`
722+
// bins. This is the one place every mutating method routes through, so asserting here guards the bin-count bound
723+
// for insert / insert_n / merge / interpolation alike. The reachability anchor proves a real corpus actually hits
724+
// the collapse, else the bound passes vacuously.
725+
#[cfg(feature = "antithesis")]
726+
{
727+
antithesis_sdk::assert_reachable!("trim_left collapsed bins");
728+
antithesis_sdk::assert_always_less_than_or_equal_to!(
729+
bins.len(),
730+
bin_limit,
731+
"sketch bin count within bin_limit"
732+
);
733+
}
714734
}
715735

716736
#[allow(clippy::cast_possible_truncation)]

lib/saluki-components/Cargo.toml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,16 @@ workspace = true
1212
default = []
1313
config-test-support = []
1414
fips = ["saluki-io/fips"]
15-
antithesis = ["dep:antithesis_sdk", "antithesis_sdk/full"]
15+
antithesis = [
16+
"dep:antithesis_sdk",
17+
"antithesis_sdk/full",
18+
"ddsketch/antithesis",
19+
"saluki-config/antithesis",
20+
"saluki-context/antithesis",
21+
"saluki-core/antithesis",
22+
"saluki-io/antithesis",
23+
"stringtheory/antithesis",
24+
]
1625

1726
[dependencies]
1827
antithesis_sdk = { workspace = true, optional = true }

lib/saluki-components/src/sources/dogstatsd/mod.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,28 +1677,59 @@ async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &Source
16771677
// Dispatch any eventd events, if present.
16781678
if event_buffer.has_event_type(EventType::EventD) {
16791679
let eventd_events = event_buffer.extract(Event::is_eventd);
1680-
if let Err(e) = source_context
1681-
.dispatcher()
1682-
.buffered_named("events")
1680+
let events_output = source_context.dispatcher().buffered_named("events");
1681+
1682+
// The `events` output is always wired in the DSD topology, so a missing output is a structural invariant
1683+
// violation that crashes this fail-stop component. Surface it to Antithesis too.
1684+
#[cfg(feature = "antithesis")]
1685+
if events_output.is_err() {
1686+
antithesis_sdk::assert_unreachable!("dsd 'events' output missing at dispatch", &serde_json::json!({}));
1687+
}
1688+
1689+
if let Err(e) = events_output
16831690
.expect("events output should always exist")
16841691
.send_all(eventd_events)
16851692
.await
16861693
{
16871694
error!(%listen_addr, error = %e, "Failed to dispatch eventd events.");
1695+
1696+
// Silent-loss anchor: dispatch failure increments no counter, so this is the only in-SUT signal that the
1697+
// failure path ran. Keeps the no-silent-loss properties non-vacuous.
1698+
#[cfg(feature = "antithesis")]
1699+
antithesis_sdk::assert_sometimes!(
1700+
true,
1701+
"dsd dispatch failed mid-buffer",
1702+
&serde_json::json!({ "stream": "events" })
1703+
);
16881704
}
16891705
}
16901706

16911707
// Dispatch any service check events, if present.
16921708
if event_buffer.has_event_type(EventType::ServiceCheck) {
16931709
let service_check_events = event_buffer.extract(Event::is_service_check);
1694-
if let Err(e) = source_context
1695-
.dispatcher()
1696-
.buffered_named("service_checks")
1710+
let service_checks_output = source_context.dispatcher().buffered_named("service_checks");
1711+
1712+
#[cfg(feature = "antithesis")]
1713+
if service_checks_output.is_err() {
1714+
antithesis_sdk::assert_unreachable!(
1715+
"dsd 'service_checks' output missing at dispatch",
1716+
&serde_json::json!({})
1717+
);
1718+
}
1719+
1720+
if let Err(e) = service_checks_output
16971721
.expect("service checks output should always exist")
16981722
.send_all(service_check_events)
16991723
.await
17001724
{
17011725
error!(%listen_addr, error = %e, "Failed to dispatch service check events.");
1726+
1727+
#[cfg(feature = "antithesis")]
1728+
antithesis_sdk::assert_sometimes!(
1729+
true,
1730+
"dsd dispatch failed mid-buffer",
1731+
&serde_json::json!({ "stream": "service_checks" })
1732+
);
17021733
}
17031734
}
17041735

@@ -1710,6 +1741,13 @@ async fn dispatch_events(mut event_buffer: EventsBuffer, source_context: &Source
17101741
.await
17111742
{
17121743
error!(%listen_addr, error = %e, "Failed to dispatch metric events.");
1744+
1745+
#[cfg(feature = "antithesis")]
1746+
antithesis_sdk::assert_sometimes!(
1747+
true,
1748+
"dsd dispatch failed mid-buffer",
1749+
&serde_json::json!({ "stream": "metrics" })
1750+
);
17131751
}
17141752
}
17151753
}

lib/saluki-components/src/sources/dogstatsd/replay/reader.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,16 @@ impl TrafficCaptureReader {
9393
// The writer emits a zero-length prefix to mark the start of the tagger state trailer; treat
9494
// that (and any size that would overrun the buffer) as the end of the record stream.
9595
if size == 0 || self.offset + size > self.contents.len() {
96+
// A zero-length prefix is the legitimate trailer marker. A non-zero `size` that overruns the buffer is a
97+
// corrupt/oversized length prefix being silently read as clean EOF, which drops every following
98+
// well-formed record. Surface the corrupt case as distinct from a real trailer.
99+
#[cfg(feature = "antithesis")]
100+
antithesis_sdk::assert_always_or_unreachable!(
101+
size == 0,
102+
"replay read_next stopped at the real trailer, not on a corrupt length prefix",
103+
&serde_json::json!({ "size": size, "offset": self.offset, "len": self.contents.len() })
104+
);
105+
96106
return Ok(None);
97107
}
98108

lib/saluki-components/src/transforms/aggregate/mod.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,8 +569,27 @@ impl AggregationState {
569569
}
570570

571571
fn insert(&mut self, timestamp: u64, metric: Metric) -> bool {
572+
// The context map is hard-capped at `context_limit` and no path grows it past the cap. This is the one
573+
// non-advisory runtime memory bound, so we assert it as an invariant under Antithesis. The numeric form hands
574+
// the search the margin to the limit as a gradient.
575+
#[cfg(feature = "antithesis")]
576+
antithesis_sdk::assert_always_less_than_or_equal_to!(
577+
self.contexts.len(),
578+
self.context_limit,
579+
"aggregate context map within context_limit",
580+
&serde_json::json!({ "len": self.contexts.len(), "limit": self.context_limit })
581+
);
582+
572583
// If we haven't seen this context yet, and it would put us over the limit to insert it, then return early.
573584
if !self.contexts.contains_key(metric.context()) && self.contexts.len() >= self.context_limit {
585+
// Anti-vacuity anchor: prove a run actually reaches the cap, else the invariant above passes trivially.
586+
#[cfg(feature = "antithesis")]
587+
antithesis_sdk::assert_sometimes!(
588+
true,
589+
"aggregate context limit breached",
590+
&serde_json::json!({ "limit": self.context_limit })
591+
);
592+
574593
self.context_limit_breached = true;
575594
return false;
576595
}
@@ -632,11 +651,45 @@ impl AggregationState {
632651
if self.last_flush != 0 {
633652
let start = align_to_bucket_start(self.last_flush, bucket_width_secs);
634653

654+
// Clock-skew guards. Bucketing reads the wall clock while the flush cadence is monotonic, so a wall-clock
655+
// jump is not bounded by the flush interval. A backward jump empties the zero-value range (a silent counter
656+
// gap); a forward jump makes the loop below run once per bucket across the whole jumped span — O(jump) work
657+
// and allocation. Assert before the loop so a flood fails fast rather than after the damage is done.
658+
#[cfg(feature = "antithesis")]
659+
{
660+
// Generous versus the normal cadence (default 15s flush over a 10s bucket yields 1-2 buckets); a bound
661+
// this large trips only on a multi-hour wall-clock jump, never on a slow-but-sane flush.
662+
const MAX_ZERO_VALUE_BUCKETS_PER_FLUSH: u64 = 10_000;
663+
antithesis_sdk::assert_always!(
664+
current_time >= self.last_flush,
665+
"aggregate flush wall-clock did not move backward",
666+
&serde_json::json!({ "current_time": current_time, "last_flush": self.last_flush })
667+
);
668+
antithesis_sdk::assert_always_less_than_or_equal_to!(
669+
current_time.saturating_sub(self.last_flush) / bucket_width_secs.get(),
670+
MAX_ZERO_VALUE_BUCKETS_PER_FLUSH,
671+
"aggregate zero-value bucket span bounded across a flush",
672+
&serde_json::json!({
673+
"current_time": current_time,
674+
"last_flush": self.last_flush,
675+
"bucket_width_secs": bucket_width_secs.get()
676+
})
677+
);
678+
}
679+
635680
for bucket_start in (start..current_time).step_by(bucket_width_secs.get() as usize) {
636681
if is_bucket_closed(current_time, bucket_start, bucket_width_secs, flush_open_buckets) {
637682
zero_value_buckets.push((bucket_start, MetricValues::counter((bucket_start, 0.0))));
638683
}
639684
}
685+
686+
// Anti-vacuity anchor: prove the idle-counter zero-value path actually runs in some timeline.
687+
#[cfg(feature = "antithesis")]
688+
antithesis_sdk::assert_sometimes!(
689+
!zero_value_buckets.is_empty(),
690+
"aggregate flush generated zero-value counter buckets",
691+
&serde_json::json!({ "count": zero_value_buckets.len() })
692+
);
640693
}
641694

642695
// Iterate over each context we're tracking, and flush any values that are in buckets which are now closed.

lib/saluki-config/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ repository = { workspace = true }
88
[lints]
99
workspace = true
1010

11+
[features]
12+
antithesis = ["dep:antithesis_sdk", "antithesis_sdk/full"]
13+
1114
[dependencies]
15+
antithesis_sdk = { workspace = true, optional = true }
1216
figment = { workspace = true, features = ["env"] }
1317
saluki-error = { workspace = true }
1418
serde = { workspace = true }

lib/saluki-config/src/dynamic/watcher.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ impl FieldUpdateWatcher {
5959
// Ignore other key changes.
6060
Ok(_) => continue,
6161
Err(broadcast::error::RecvError::Lagged(_)) => {
62+
// A Lagged drop on the bounded broadcast means a config update was lost with no re-read, so live
63+
// filtering can stay permanently stale. Surface it.
64+
#[cfg(feature = "antithesis")]
65+
antithesis_sdk::assert_unreachable!(
66+
"filter config update Lagged-dropped with no re-read",
67+
&serde_json::json!({ "key": self.key.to_string() })
68+
);
6269
warn!(
6370
"FieldUpdateWatcher dropped events for key: {}. Continuing to wait for the next event.",
6471
self.key

0 commit comments

Comments
 (0)