Skip to content

Commit 3f188fd

Browse files
authored
Fix possible deadlock with remote configuration expiring during insertion (#888)
* Fix possible deadlock with remote configuration expiring during insertion In this case the dead handler (internally holding a lock to the service map) attempting to remove (needs to acquire the configurations map) while add_runtime (which holds a lock to the configurations map) is called (needs to acquire the services map). Also doing a trivial re-order in session_info to not retain remote config invariants across runtimes and application locks. Signed-off-by: Bob Weinand <[email protected]> * Print memory statistics on memory exceeded Signed-off-by: Bob Weinand <[email protected]> * Fix possible deadlock with simultaneous remote config insertion and removal The services lock must never be acquired before the runtimes lock. Signed-off-by: Bob Weinand <[email protected]>
1 parent f7543b9 commit 3f188fd

File tree

15 files changed

+106
-74
lines changed

15 files changed

+106
-74
lines changed

ddtelemetry/src/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub struct MetricBuckets {
6363
distributions: HashMap<BucketKey, DDSketch>,
6464
}
6565

66-
#[derive(Default, Serialize, Deserialize)]
66+
#[derive(Debug, Default, Serialize, Deserialize)]
6767
pub struct MetricBucketStats {
6868
pub buckets: u32,
6969
pub series: u32,

ddtelemetry/src/worker/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ pub struct TelemetryWorker {
127127
data: TelemetryWorkerData,
128128
}
129129

130-
#[derive(Default, Serialize, Deserialize)]
130+
#[derive(Debug, Default, Serialize, Deserialize)]
131131
pub struct TelemetryWorkerStats {
132132
pub dependencies_stored: u32,
133133
pub dependencies_unflushed: u32,

remote-config/src/fetch/fetcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ pub struct ConfigFetcherState<S> {
8383
pub expire_unused_files: bool,
8484
}
8585

86-
#[derive(Default, Serialize, Deserialize)]
86+
#[derive(Debug, Default, Serialize, Deserialize)]
8787
pub struct ConfigFetcherStateStats {
8888
pub active_files: u32,
8989
}

remote-config/src/fetch/multitarget.rs

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::fmt::Debug;
1717
use std::hash::Hash;
1818
use std::ops::Add;
1919
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
20-
use std::sync::{Arc, Mutex};
20+
use std::sync::{Arc, Mutex, MutexGuard};
2121
use std::time::Duration;
2222
use tokio::select;
2323
use tokio::sync::Semaphore;
@@ -54,7 +54,7 @@ where
5454
fetcher_semaphore: Semaphore,
5555
}
5656

57-
#[derive(Default, Serialize, Deserialize)]
57+
#[derive(Debug, Default, Serialize, Deserialize)]
5858
pub struct MultiTargetStats {
5959
known_runtimes: u32,
6060
starting_fetchers: u32,
@@ -109,6 +109,10 @@ pub trait MultiTargetHandlers<S> {
109109

110110
fn expired(&self, target: &Arc<Target>);
111111

112+
/// Called when a fetcher has no active runtimes.
113+
/// Beware: This function may be called at any time, e.g. another thread might be attempting to
114+
/// call add_runtime() right when no other runtime was left. Be careful with the locking here.
115+
/// Will not be called multiple times, unless this specific case was encountered.
112116
fn dead(&self);
113117
}
114118

@@ -162,7 +166,12 @@ where
162166
uuid::Uuid::new_v4().to_string()
163167
}
164168

165-
fn remove_target(self: &Arc<Self>, runtime_id: &str, target: &Arc<Target>) {
169+
fn remove_target(
170+
self: &Arc<Self>,
171+
runtime_id: &str,
172+
target: &Arc<Target>,
173+
runtimes: MutexGuard<HashMap<String, RuntimeInfo<N>>>,
174+
) {
166175
let mut services = self.services.lock().unwrap();
167176
// "goto" like handling to drop the known_service borrow and be able to change services
168177
'service_handling: {
@@ -181,7 +190,8 @@ where
181190
let target = target.clone();
182191
tokio::spawn(async move {
183192
future.await;
184-
this.remove_target(runtime_id.as_str(), &target);
193+
let runtimes = this.runtimes.lock().unwrap();
194+
this.remove_target(runtime_id.as_str(), &target, runtimes);
185195
});
186196
return;
187197
}
@@ -213,7 +223,7 @@ where
213223
_ => {
214224
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
215225
'changed_rt_id: {
216-
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
226+
for (id, runtime) in runtimes.iter() {
217227
if runtime.targets.len() == 1
218228
&& runtime.targets.contains_key(target)
219229
{
@@ -366,7 +376,7 @@ where
366376
notify_target,
367377
targets: HashMap::from([(target.clone(), 1)]),
368378
};
369-
self.add_target(info.targets.len() > 1, e.key(), target.clone());
379+
self.add_target(false, e.key(), target.clone());
370380
e.insert(info);
371381
}
372382
}
@@ -375,31 +385,29 @@ where
375385

376386
pub fn delete_runtime(self: &Arc<Self>, runtime_id: &str, target: &Arc<Target>) {
377387
trace!("Removing remote config runtime: {target:?} with runtime id {runtime_id}");
378-
{
379-
let mut runtimes = self.runtimes.lock().unwrap();
380-
let last_removed = {
381-
let info = match runtimes.get_mut(runtime_id) {
382-
None => return,
383-
Some(i) => i,
384-
};
385-
match info.targets.entry(target.clone()) {
386-
Entry::Occupied(mut e) => {
387-
if *e.get() == 1 {
388-
e.remove();
389-
} else {
390-
*e.get_mut() -= 1;
391-
return;
392-
}
388+
let mut runtimes = self.runtimes.lock().unwrap();
389+
let last_removed = {
390+
let info = match runtimes.get_mut(runtime_id) {
391+
None => return,
392+
Some(i) => i,
393+
};
394+
match info.targets.entry(target.clone()) {
395+
Entry::Occupied(mut e) => {
396+
if *e.get() == 1 {
397+
e.remove();
398+
} else {
399+
*e.get_mut() -= 1;
400+
return;
393401
}
394-
Entry::Vacant(_) => unreachable!("Missing target runtime"),
395402
}
396-
info.targets.is_empty()
397-
};
398-
if last_removed {
399-
runtimes.remove(runtime_id);
403+
Entry::Vacant(_) => unreachable!("Missing target runtime"),
400404
}
405+
info.targets.is_empty()
406+
};
407+
if last_removed {
408+
runtimes.remove(runtime_id);
401409
}
402-
Self::remove_target(self, runtime_id, target);
410+
Self::remove_target(self, runtime_id, target, runtimes);
403411
}
404412

405413
/// Sets the apply state on a stored file.
@@ -510,18 +518,18 @@ where
510518

511519
this.storage.storage.expired(&fetcher.target);
512520

513-
{
521+
let is_dead = {
514522
// scope lock before await
515523
trace!(
516524
"Remove {:?} from services map at fetcher end",
517525
fetcher.target
518526
);
519527
let mut services = this.services.lock().unwrap();
520528
services.remove(&fetcher.target);
521-
if services.is_empty() && this.pending_async_insertions.load(Ordering::Relaxed) == 0
522-
{
523-
this.storage.storage.dead();
524-
}
529+
services.is_empty() && this.pending_async_insertions.load(Ordering::Relaxed) == 0
530+
};
531+
if is_dead {
532+
this.storage.storage.dead();
525533
}
526534
remove_completer.complete(()).await;
527535
});
@@ -570,7 +578,7 @@ where
570578
(starting, active, inactive, removing)
571579
};
572580
MultiTargetStats {
573-
known_runtimes: self.runtimes.lock().unwrap().len() as u32,
581+
known_runtimes: self.active_runtimes() as u32,
574582
starting_fetchers,
575583
active_fetchers,
576584
inactive_fetchers,

remote-config/src/fetch/shared.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ where
140140
run_id: Arc<RunnersGeneration>,
141141
}
142142

143-
#[derive(Default, Serialize, Deserialize)]
143+
#[derive(Debug, Default, Serialize, Deserialize)]
144144
pub struct RefcountingStorageStats {
145145
pub inactive_files: u32,
146146
pub fetcher: ConfigFetcherStateStats,

sidecar/src/entry.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ where
9393
let server = SidecarServer::default();
9494
let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel::<()>(1);
9595

96-
let watchdog_handle = Watchdog::from_receiver(shutdown_complete_rx).spawn_watchdog();
96+
let watchdog_handle =
97+
Watchdog::from_receiver(shutdown_complete_rx).spawn_watchdog(server.clone());
9798
let telemetry_handle = self_telemetry(server.clone(), watchdog_handle);
9899

99100
listener(Box::new({

sidecar/src/log.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ unsafe impl<K, V> Sync for TemporarilyRetainedMap<K, V> where
6666
{
6767
}
6868

69-
#[derive(Serialize, Deserialize)]
69+
#[derive(Debug, Serialize, Deserialize)]
7070
pub struct TemporarilyRetainedMapStats {
7171
pub elements: u32,
7272
pub live_counters: u32,

sidecar/src/service/debugger_diagnostics_bookkeeper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl Drop for DebuggerDiagnosticsBookkeeper {
136136
}
137137
}
138138

139-
#[derive(Serialize, Deserialize)]
139+
#[derive(Debug, Serialize, Deserialize)]
140140
pub struct DebuggerDiagnosticsBookkeeperStats {
141141
runtime_ids: u32,
142142
total_probes: u32,

sidecar/src/service/remote_configs.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,18 @@ impl RemoteConfigs {
115115
e.insert(ShmRemoteConfigs::new(
116116
invariants.clone(),
117117
Box::new(move || {
118-
this.lock().unwrap().remove(&invariants);
118+
// try_lock: if the lock is held _right now_, it means that an insertion is
119+
// in progress. In that case we can just ignore the Err() and leave it.
120+
// Otherwise we have to check whether it's actually really dead and possibly
121+
// re-insert.
122+
123+
if let Ok(mut unlocked) = this.try_lock() {
124+
if let Some(active) = unlocked.remove(&invariants) {
125+
if !active.is_dead() {
126+
unlocked.insert(invariants.clone(), active);
127+
}
128+
}
129+
}
119130
}),
120131
poll_interval,
121132
))

sidecar/src/service/session_info.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -272,14 +272,16 @@ impl SessionInfo {
272272
}
273273
}
274274

275-
let invariants = self.get_remote_config_invariants();
276-
let version = invariants
277-
.as_ref()
278-
.map(|i| i.tracer_version.as_str())
279-
.unwrap_or("0.0.0");
280275
if let Some(runtime) = self.lock_runtimes().get(runtime_id) {
281276
if let Some(app) = runtime.lock_applications().get_mut(&queue_id) {
282-
let (tags, new_tags) = app.get_debugger_tags(&version, runtime_id);
277+
let (tags, new_tags) = {
278+
let invariants = self.get_remote_config_invariants();
279+
let version = invariants
280+
.as_ref()
281+
.map(|i| i.tracer_version.as_str())
282+
.unwrap_or("0.0.0");
283+
app.get_debugger_tags(&version, runtime_id)
284+
};
283285
let sender = match debugger_type {
284286
DebuggerType::Diagnostics => app.debugger_diagnostics_payload_sender.clone(),
285287
DebuggerType::Logs => app.debugger_logs_payload_sender.clone(),

sidecar/src/service/sidecar_server.rs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ fn no_response() -> NoResponse {
6363
future::ready(())
6464
}
6565

66-
#[derive(Serialize, Deserialize)]
67-
struct SidecarStats {
66+
#[derive(Debug, Serialize, Deserialize)]
67+
pub struct SidecarStats {
6868
trace_flusher: TraceFlusherStats,
6969
sessions: u32,
7070
session_counter_size: u32,
@@ -306,7 +306,7 @@ impl SidecarServer {
306306
}
307307
}
308308

309-
async fn compute_stats(&self) -> SidecarStats {
309+
pub async fn compute_stats(&self) -> SidecarStats {
310310
let mut telemetry_stats_errors = 0;
311311
let telemetry_stats = join_all({
312312
let sessions = self.lock_sessions();
@@ -943,25 +943,24 @@ impl SidecarInterface for SidecarServer {
943943
let notify_target = RemoteConfigNotifyTarget {
944944
pid: session.pid.load(Ordering::Relaxed),
945945
};
946+
let invariants = session
947+
.get_remote_config_invariants()
948+
.as_ref()
949+
.expect("Expecting remote config invariants to be set early")
950+
.clone();
946951
let runtime_info = session.get_runtime(&instance_id.runtime_id);
947952
let mut applications = runtime_info.lock_applications();
948953
let app = applications.entry(queue_id).or_default();
949-
app.remote_config_guard = Some(
950-
self.remote_configs.add_runtime(
951-
session
952-
.get_remote_config_invariants()
953-
.as_ref()
954-
.expect("Expecting remote config invariants to be set early")
955-
.clone(),
956-
*session.remote_config_interval.lock().unwrap(),
957-
instance_id.runtime_id,
958-
notify_target,
959-
env_name.clone(),
960-
service_name,
961-
app_version.clone(),
962-
global_tags.clone(),
963-
),
964-
);
954+
app.remote_config_guard = Some(self.remote_configs.add_runtime(
955+
invariants,
956+
*session.remote_config_interval.lock().unwrap(),
957+
instance_id.runtime_id,
958+
notify_target,
959+
env_name.clone(),
960+
service_name,
961+
app_version.clone(),
962+
global_tags.clone(),
963+
));
965964
app.set_metadata(env_name, app_version, global_tags);
966965

967966
no_response()

sidecar/src/service/telemetry/enqueued_telemetry_stats.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
55
use std::iter::Sum;
66
use std::ops::Add;
77

8-
#[derive(Default, Serialize, Deserialize)]
8+
#[derive(Debug, Default, Serialize, Deserialize)]
99
/// `EnqueuedTelemetryStats`contains the count of stored and unflushed dependencies, configurations,
1010
/// and integrations. It also keeps track of the count of metrics, points, actions, and computed
1111
/// dependencies.

sidecar/src/service/tracing/trace_flusher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const DEFAULT_MIN_FORCE_DROP_SIZE_BYTES: u32 = 10_000_000;
3030
/// `TraceFlusherStats` holds stats of the trace flusher like the count of allocated shared memory
3131
/// for agent config, agent config writers, last used entries in agent configs, and the size of send
3232
/// data.
33-
#[derive(Serialize, Deserialize)]
33+
#[derive(Debug, Serialize, Deserialize)]
3434
pub(crate) struct TraceFlusherStats {
3535
pub(crate) agent_config_allocated_shm: u32,
3636
pub(crate) agent_config_writers: u32,

sidecar/src/shm_remote_config.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ struct ConfigFileStorage {
119119
/// All writers
120120
writers: Arc<Mutex<HashMap<Arc<Target>, RemoteConfigWriter>>>,
121121
#[allow(clippy::type_complexity)]
122-
on_dead: Arc<Mutex<Option<Box<dyn FnOnce() + Sync + Send>>>>,
122+
on_dead: Arc<Mutex<Option<Box<dyn Fn() + Sync + Send>>>>,
123123
}
124124

125125
struct StoredShmFile {
@@ -260,7 +260,7 @@ impl MultiTargetHandlers<StoredShmFile> for ConfigFileStorage {
260260
.on_dead
261261
.lock()
262262
.unwrap()
263-
.take()
263+
.as_ref()
264264
.expect("The MultiTargetHandler must not be used anymore once on_dead is called"))(
265265
);
266266
}
@@ -303,9 +303,13 @@ pub struct ShmRemoteConfigs<N: NotifyTarget + 'static>(
303303
// pertaining to that env refcounting RemoteConfigIdentifier tuples by their unique runtime_id
304304

305305
impl<N: NotifyTarget + 'static> ShmRemoteConfigs<N> {
306+
/// The on_dead arg will be called when a fetcher has no active runtimes.
307+
/// Beware: This function may be called at any time, e.g. another thread might be attempting to
308+
/// call add_runtime() right when no other runtime was left. Be careful with the locking here.
309+
/// Will not be called multiple times, unless this specific case was encountered.
306310
pub fn new(
307311
invariants: ConfigInvariants,
308-
on_dead: Box<dyn FnOnce() + Sync + Send>,
312+
on_dead: Box<dyn Fn() + Sync + Send>,
309313
interval: Duration,
310314
) -> Self {
311315
let storage = ConfigFileStorage {
@@ -641,10 +645,13 @@ mod tests {
641645
let server = RemoteConfigServer::spawn();
642646

643647
let (on_dead, on_dead_completer) = ManualFuture::new();
648+
let on_dead_completer = Arc::new(Mutex::new(Some(on_dead_completer)));
644649
let shm = ShmRemoteConfigs::new(
645650
server.dummy_invariants(),
646-
Box::new(|| {
647-
tokio::spawn(on_dead_completer.complete(()));
651+
Box::new(move || {
652+
if let Some(completer) = on_dead_completer.lock().unwrap().take() {
653+
tokio::spawn(completer.complete(()));
654+
}
648655
}),
649656
Duration::from_millis(10),
650657
);

0 commit comments

Comments
 (0)