From dcc1aabfcfca3ef41455a572964e70fb1217b539 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Mon, 18 Nov 2024 01:31:31 +0000 Subject: [PATCH] Use flurry's concurrent hashmap --- opentelemetry-sdk/Cargo.toml | 1 + opentelemetry-sdk/src/metrics/internal/mod.rs | 75 +++++++++---------- opentelemetry/src/common.rs | 47 +++++++++++- 3 files changed, 83 insertions(+), 40 deletions(-) diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 39928fecb3..8e36678092 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -29,6 +29,7 @@ tokio = { workspace = true, features = ["rt", "time"], optional = true } tokio-stream = { workspace = true, optional = true } http = { workspace = true, optional = true } tracing = {workspace = true, optional = true} +flurry = "0.5.1" [package.metadata.docs.rs] all-features = true diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 8b6136d7ce..1fa00f4190 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -6,11 +6,10 @@ mod precomputed_sum; mod sum; use core::fmt; -use std::collections::{HashMap, HashSet}; -use std::mem::take; -use std::ops::{Add, AddAssign, DerefMut, Sub}; +use std::collections::HashSet; +use std::ops::{Add, AddAssign, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use aggregate::is_under_cardinality_limit; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; @@ -49,10 +48,12 @@ pub(crate) trait Aggregator { /// updates to the underlying value trackers should be performed. pub(crate) struct ValueMap where - A: Aggregator, + A: Aggregator + Send + Sync, { /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, + trackers: flurry::HashMap, Arc>, + /// Lock to ensure that only one writer can write to the `trackers` map at a time. + write_lock: std::sync::Mutex<()>, /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Indicates whether a value with no attributes has been stored. @@ -65,11 +66,12 @@ where impl ValueMap where - A: Aggregator, + A: Aggregator + Send + Sync, { fn new(config: A::InitConfig) -> Self { ValueMap { - trackers: RwLock::new(HashMap::new()), + trackers: flurry::HashMap::new(), + write_lock: std::sync::Mutex::new(()), has_no_attribute_value: AtomicBool::new(false), no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), @@ -84,51 +86,46 @@ where return; } - let Ok(trackers) = self.trackers.read() else { - return; - }; - // Try to retrieve and update the tracker with the attributes in the provided order first - if let Some(tracker) = trackers.get(attributes) { + let hashmap_ref = self.trackers.pin(); + if let Some(tracker) = hashmap_ref.get(attributes) { tracker.update(value); return; } // Try to retrieve and update the tracker with the attributes sorted. let sorted_attrs = AttributeSet::from(attributes).into_vec(); - if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + if let Some(tracker) = hashmap_ref.get(sorted_attrs.as_slice()) { tracker.update(value); return; } - // Give up the read lock before acquiring the write lock. - drop(trackers); - - let Ok(mut trackers) = self.trackers.write() else { + let Ok(_write_lock) = self.write_lock.lock() else { return; }; // Recheck both the provided and sorted orders after acquiring the write lock // in case another thread has pushed an update in the meantime. - if let Some(tracker) = trackers.get(attributes) { + if let Some(tracker) = hashmap_ref.get(attributes) { tracker.update(value); - } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + } else if let Some(tracker) = hashmap_ref.get(sorted_attrs.as_slice()) { tracker.update(value); } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { let new_tracker = Arc::new(A::create(&self.config)); new_tracker.update(value); // Insert tracker with the attributes in the provided and sorted orders - trackers.insert(attributes.to_vec(), new_tracker.clone()); - trackers.insert(sorted_attrs, new_tracker); + hashmap_ref.insert(attributes.to_vec(), new_tracker.clone()); + hashmap_ref.insert(sorted_attrs, new_tracker); self.count.fetch_add(1, Ordering::SeqCst); - } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { + } else if let Some(overflow_value) = hashmap_ref.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) + { overflow_value.update(value); } else { let new_tracker = A::create(&self.config); new_tracker.update(value); - trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); + hashmap_ref.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); otel_warn!( name: "ValueMap.measure", message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged." ); @@ -146,12 +143,9 @@ where dest.push(map_fn(vec![], &self.no_attribute_tracker)); } - let Ok(trackers) = self.trackers.read() else { - return; - }; - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { + let hashmap_ref = self.trackers.pin(); + for (attrs, tracker) in hashmap_ref.iter() { if seen.insert(Arc::as_ptr(tracker)) { dest.push(map_fn(attrs.clone(), tracker)); } @@ -172,18 +166,23 @@ where )); } - let trackers = match self.trackers.write() { - Ok(mut trackers) => { - self.count.store(0, Ordering::SeqCst); - take(trackers.deref_mut()) - } - Err(_) => todo!(), + let Ok(_write_lock) = self.write_lock.lock() else { + return; }; + self.count.store(0, Ordering::SeqCst); + let trackers = self.trackers.clone(); + self.trackers.pin().clear(); + + drop(_write_lock); + let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.into_iter() { - if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + for (attrs, tracker) in trackers.pin().into_iter() { + if seen.insert(Arc::as_ptr(tracker)) { + dest.push(map_fn( + attrs.to_vec(), + tracker.clone_and_reset(&self.config), + )); } } } diff --git a/opentelemetry/src/common.rs b/opentelemetry/src/common.rs index 85fa6e7f9d..e163377100 100644 --- a/opentelemetry/src/common.rs +++ b/opentelemetry/src/common.rs @@ -1,4 +1,5 @@ use std::borrow::{Borrow, Cow}; +use std::cmp::Ordering; use std::sync::Arc; use std::{fmt, hash}; @@ -163,6 +164,26 @@ pub enum Array { String(Vec), } +impl Eq for Array {} + +impl PartialOrd for Array { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Array { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Array::Bool(a), Array::Bool(b)) => a.cmp(b), + (Array::I64(a), Array::I64(b)) => a.cmp(b), + (Array::F64(a), Array::F64(b)) => a.partial_cmp(b).unwrap(), + (Array::String(a), Array::String(b)) => a.cmp(b), + (_a, _b) => todo!(), + } + } +} + impl fmt::Display for Array { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -229,9 +250,31 @@ pub enum Value { Array(Array), } +impl Eq for Value {} + +impl PartialOrd for Value { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Value { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Value::Bool(a), Value::Bool(b)) => a.cmp(b), + (Value::I64(a), Value::I64(b)) => a.cmp(b), + (Value::F64(a), Value::F64(b)) => a.partial_cmp(b).unwrap(), + (Value::String(a), Value::String(b)) => a.as_str().cmp(b.as_str()), + (Value::Array(a), Value::Array(b)) => a.cmp(b), + // (a, b) => a.as_str().cmp(&b.as_str()), + (_a, _b) => todo!(), + } + } +} + /// Wrapper for string-like values #[non_exhaustive] -#[derive(Clone, PartialEq, Eq, Hash)] +#[derive(Clone, PartialEq, PartialOrd, Ord, Eq, Hash)] pub struct StringValue(OtelString); impl fmt::Debug for StringValue { @@ -375,7 +418,7 @@ impl fmt::Display for Value { } /// A key-value pair describing an attribute. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialOrd, Ord, PartialEq)] #[non_exhaustive] pub struct KeyValue { /// The attribute name