Skip to content

Commit

Permalink
Merge branch 'main' into nicholas.hulston/fix-span-link-optional-fields
Browse files Browse the repository at this point in the history
  • Loading branch information
nhulston authored Feb 19, 2025
2 parents ade67b1 + 3f188fd commit 818bbed
Show file tree
Hide file tree
Showing 15 changed files with 106 additions and 74 deletions.
2 changes: 1 addition & 1 deletion ddtelemetry/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub struct MetricBuckets {
distributions: HashMap<BucketKey, DDSketch>,
}

#[derive(Default, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct MetricBucketStats {
pub buckets: u32,
pub series: u32,
Expand Down
2 changes: 1 addition & 1 deletion ddtelemetry/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub struct TelemetryWorker {
data: TelemetryWorkerData,
}

#[derive(Default, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct TelemetryWorkerStats {
pub dependencies_stored: u32,
pub dependencies_unflushed: u32,
Expand Down
2 changes: 1 addition & 1 deletion remote-config/src/fetch/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct ConfigFetcherState<S> {
pub expire_unused_files: bool,
}

#[derive(Default, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct ConfigFetcherStateStats {
pub active_files: u32,
}
Expand Down
74 changes: 41 additions & 33 deletions remote-config/src/fetch/multitarget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt::Debug;
use std::hash::Hash;
use std::ops::Add;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
use tokio::select;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -54,7 +54,7 @@ where
fetcher_semaphore: Semaphore,
}

#[derive(Default, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct MultiTargetStats {
known_runtimes: u32,
starting_fetchers: u32,
Expand Down Expand Up @@ -109,6 +109,10 @@ pub trait MultiTargetHandlers<S> {

fn expired(&self, target: &Arc<Target>);

/// Called when a fetcher has no active runtimes.
/// Beware: This function may be called at any time, e.g. another thread might be attempting to
/// call add_runtime() right when no other runtime was left. Be careful with the locking here.
/// Will not be called multiple times, unless this specific case was encountered.
fn dead(&self);
}

Expand Down Expand Up @@ -162,7 +166,12 @@ where
uuid::Uuid::new_v4().to_string()
}

fn remove_target(self: &Arc<Self>, runtime_id: &str, target: &Arc<Target>) {
fn remove_target(
self: &Arc<Self>,
runtime_id: &str,
target: &Arc<Target>,
runtimes: MutexGuard<HashMap<String, RuntimeInfo<N>>>,
) {
let mut services = self.services.lock().unwrap();
// "goto" like handling to drop the known_service borrow and be able to change services
'service_handling: {
Expand All @@ -181,7 +190,8 @@ where
let target = target.clone();
tokio::spawn(async move {
future.await;
this.remove_target(runtime_id.as_str(), &target);
let runtimes = this.runtimes.lock().unwrap();
this.remove_target(runtime_id.as_str(), &target, runtimes);
});
return;
}
Expand Down Expand Up @@ -213,7 +223,7 @@ where
_ => {
if *known_service.fetcher.runtime_id.lock().unwrap() == runtime_id {
'changed_rt_id: {
for (id, runtime) in self.runtimes.lock().unwrap().iter() {
for (id, runtime) in runtimes.iter() {
if runtime.targets.len() == 1
&& runtime.targets.contains_key(target)
{
Expand Down Expand Up @@ -366,7 +376,7 @@ where
notify_target,
targets: HashMap::from([(target.clone(), 1)]),
};
self.add_target(info.targets.len() > 1, e.key(), target.clone());
self.add_target(false, e.key(), target.clone());
e.insert(info);
}
}
Expand All @@ -375,31 +385,29 @@ where

pub fn delete_runtime(self: &Arc<Self>, runtime_id: &str, target: &Arc<Target>) {
trace!("Removing remote config runtime: {target:?} with runtime id {runtime_id}");
{
let mut runtimes = self.runtimes.lock().unwrap();
let last_removed = {
let info = match runtimes.get_mut(runtime_id) {
None => return,
Some(i) => i,
};
match info.targets.entry(target.clone()) {
Entry::Occupied(mut e) => {
if *e.get() == 1 {
e.remove();
} else {
*e.get_mut() -= 1;
return;
}
let mut runtimes = self.runtimes.lock().unwrap();
let last_removed = {
let info = match runtimes.get_mut(runtime_id) {
None => return,
Some(i) => i,
};
match info.targets.entry(target.clone()) {
Entry::Occupied(mut e) => {
if *e.get() == 1 {
e.remove();
} else {
*e.get_mut() -= 1;
return;
}
Entry::Vacant(_) => unreachable!("Missing target runtime"),
}
info.targets.is_empty()
};
if last_removed {
runtimes.remove(runtime_id);
Entry::Vacant(_) => unreachable!("Missing target runtime"),
}
info.targets.is_empty()
};
if last_removed {
runtimes.remove(runtime_id);
}
Self::remove_target(self, runtime_id, target);
Self::remove_target(self, runtime_id, target, runtimes);
}

/// Sets the apply state on a stored file.
Expand Down Expand Up @@ -510,18 +518,18 @@ where

this.storage.storage.expired(&fetcher.target);

{
let is_dead = {
// scope lock before await
trace!(
"Remove {:?} from services map at fetcher end",
fetcher.target
);
let mut services = this.services.lock().unwrap();
services.remove(&fetcher.target);
if services.is_empty() && this.pending_async_insertions.load(Ordering::Relaxed) == 0
{
this.storage.storage.dead();
}
services.is_empty() && this.pending_async_insertions.load(Ordering::Relaxed) == 0
};
if is_dead {
this.storage.storage.dead();
}
remove_completer.complete(()).await;
});
Expand Down Expand Up @@ -570,7 +578,7 @@ where
(starting, active, inactive, removing)
};
MultiTargetStats {
known_runtimes: self.runtimes.lock().unwrap().len() as u32,
known_runtimes: self.active_runtimes() as u32,
starting_fetchers,
active_fetchers,
inactive_fetchers,
Expand Down
2 changes: 1 addition & 1 deletion remote-config/src/fetch/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
run_id: Arc<RunnersGeneration>,
}

#[derive(Default, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct RefcountingStorageStats {
pub inactive_files: u32,
pub fetcher: ConfigFetcherStateStats,
Expand Down
3 changes: 2 additions & 1 deletion sidecar/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ where
let server = SidecarServer::default();
let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel::<()>(1);

let watchdog_handle = Watchdog::from_receiver(shutdown_complete_rx).spawn_watchdog();
let watchdog_handle =
Watchdog::from_receiver(shutdown_complete_rx).spawn_watchdog(server.clone());
let telemetry_handle = self_telemetry(server.clone(), watchdog_handle);

listener(Box::new({
Expand Down
2 changes: 1 addition & 1 deletion sidecar/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ unsafe impl<K, V> Sync for TemporarilyRetainedMap<K, V> where
{
}

#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct TemporarilyRetainedMapStats {
pub elements: u32,
pub live_counters: u32,
Expand Down
2 changes: 1 addition & 1 deletion sidecar/src/service/debugger_diagnostics_bookkeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Drop for DebuggerDiagnosticsBookkeeper {
}
}

#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct DebuggerDiagnosticsBookkeeperStats {
runtime_ids: u32,
total_probes: u32,
Expand Down
13 changes: 12 additions & 1 deletion sidecar/src/service/remote_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,18 @@ impl RemoteConfigs {
e.insert(ShmRemoteConfigs::new(
invariants.clone(),
Box::new(move || {
this.lock().unwrap().remove(&invariants);
// try_lock: if the lock is held _right now_, it means that an insertion is
// in progress. In that case we can just ignore the Err() and leave it.
// Otherwise we have to check whether it's actually really dead and possibly
// re-insert.

if let Ok(mut unlocked) = this.try_lock() {
if let Some(active) = unlocked.remove(&invariants) {
if !active.is_dead() {
unlocked.insert(invariants.clone(), active);
}
}
}
}),
poll_interval,
))
Expand Down
14 changes: 8 additions & 6 deletions sidecar/src/service/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,16 @@ impl SessionInfo {
}
}

let invariants = self.get_remote_config_invariants();
let version = invariants
.as_ref()
.map(|i| i.tracer_version.as_str())
.unwrap_or("0.0.0");
if let Some(runtime) = self.lock_runtimes().get(runtime_id) {
if let Some(app) = runtime.lock_applications().get_mut(&queue_id) {
let (tags, new_tags) = app.get_debugger_tags(&version, runtime_id);
let (tags, new_tags) = {
let invariants = self.get_remote_config_invariants();
let version = invariants
.as_ref()
.map(|i| i.tracer_version.as_str())
.unwrap_or("0.0.0");
app.get_debugger_tags(&version, runtime_id)
};
let sender = match debugger_type {
DebuggerType::Diagnostics => app.debugger_diagnostics_payload_sender.clone(),
DebuggerType::Logs => app.debugger_logs_payload_sender.clone(),
Expand Down
37 changes: 18 additions & 19 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ fn no_response() -> NoResponse {
future::ready(())
}

#[derive(Serialize, Deserialize)]
struct SidecarStats {
#[derive(Debug, Serialize, Deserialize)]
pub struct SidecarStats {
trace_flusher: TraceFlusherStats,
sessions: u32,
session_counter_size: u32,
Expand Down Expand Up @@ -306,7 +306,7 @@ impl SidecarServer {
}
}

async fn compute_stats(&self) -> SidecarStats {
pub async fn compute_stats(&self) -> SidecarStats {
let mut telemetry_stats_errors = 0;
let telemetry_stats = join_all({
let sessions = self.lock_sessions();
Expand Down Expand Up @@ -943,25 +943,24 @@ impl SidecarInterface for SidecarServer {
let notify_target = RemoteConfigNotifyTarget {
pid: session.pid.load(Ordering::Relaxed),
};
let invariants = session
.get_remote_config_invariants()
.as_ref()
.expect("Expecting remote config invariants to be set early")
.clone();
let runtime_info = session.get_runtime(&instance_id.runtime_id);
let mut applications = runtime_info.lock_applications();
let app = applications.entry(queue_id).or_default();
app.remote_config_guard = Some(
self.remote_configs.add_runtime(
session
.get_remote_config_invariants()
.as_ref()
.expect("Expecting remote config invariants to be set early")
.clone(),
*session.remote_config_interval.lock().unwrap(),
instance_id.runtime_id,
notify_target,
env_name.clone(),
service_name,
app_version.clone(),
global_tags.clone(),
),
);
app.remote_config_guard = Some(self.remote_configs.add_runtime(
invariants,
*session.remote_config_interval.lock().unwrap(),
instance_id.runtime_id,
notify_target,
env_name.clone(),
service_name,
app_version.clone(),
global_tags.clone(),
));
app.set_metadata(env_name, app_version, global_tags);

no_response()
Expand Down
2 changes: 1 addition & 1 deletion sidecar/src/service/telemetry/enqueued_telemetry_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use std::iter::Sum;
use std::ops::Add;

#[derive(Default, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
/// `EnqueuedTelemetryStats`contains the count of stored and unflushed dependencies, configurations,
/// and integrations. It also keeps track of the count of metrics, points, actions, and computed
/// dependencies.
Expand Down
2 changes: 1 addition & 1 deletion sidecar/src/service/tracing/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const DEFAULT_MIN_FORCE_DROP_SIZE_BYTES: u32 = 10_000_000;
/// `TraceFlusherStats` holds stats of the trace flusher like the count of allocated shared memory
/// for agent config, agent config writers, last used entries in agent configs, and the size of send
/// data.
#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct TraceFlusherStats {
pub(crate) agent_config_allocated_shm: u32,
pub(crate) agent_config_writers: u32,
Expand Down
17 changes: 12 additions & 5 deletions sidecar/src/shm_remote_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ struct ConfigFileStorage {
/// All writers
writers: Arc<Mutex<HashMap<Arc<Target>, RemoteConfigWriter>>>,
#[allow(clippy::type_complexity)]
on_dead: Arc<Mutex<Option<Box<dyn FnOnce() + Sync + Send>>>>,
on_dead: Arc<Mutex<Option<Box<dyn Fn() + Sync + Send>>>>,
}

struct StoredShmFile {
Expand Down Expand Up @@ -260,7 +260,7 @@ impl MultiTargetHandlers<StoredShmFile> for ConfigFileStorage {
.on_dead
.lock()
.unwrap()
.take()
.as_ref()
.expect("The MultiTargetHandler must not be used anymore once on_dead is called"))(
);
}
Expand Down Expand Up @@ -303,9 +303,13 @@ pub struct ShmRemoteConfigs<N: NotifyTarget + 'static>(
// pertaining to that env refcounting RemoteConfigIdentifier tuples by their unique runtime_id

impl<N: NotifyTarget + 'static> ShmRemoteConfigs<N> {
/// The on_dead arg will be called when a fetcher has no active runtimes.
/// Beware: This function may be called at any time, e.g. another thread might be attempting to
/// call add_runtime() right when no other runtime was left. Be careful with the locking here.
/// Will not be called multiple times, unless this specific case was encountered.
pub fn new(
invariants: ConfigInvariants,
on_dead: Box<dyn FnOnce() + Sync + Send>,
on_dead: Box<dyn Fn() + Sync + Send>,
interval: Duration,
) -> Self {
let storage = ConfigFileStorage {
Expand Down Expand Up @@ -641,10 +645,13 @@ mod tests {
let server = RemoteConfigServer::spawn();

let (on_dead, on_dead_completer) = ManualFuture::new();
let on_dead_completer = Arc::new(Mutex::new(Some(on_dead_completer)));
let shm = ShmRemoteConfigs::new(
server.dummy_invariants(),
Box::new(|| {
tokio::spawn(on_dead_completer.complete(()));
Box::new(move || {
if let Some(completer) = on_dead_completer.lock().unwrap().take() {
tokio::spawn(completer.complete(()));
}
}),
Duration::from_millis(10),
);
Expand Down
Loading

0 comments on commit 818bbed

Please sign in to comment.