Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

Commit 76b98c6

Browse files
authored
feat: add misragries data structure for statistics (#91)
This pull request implements the Misra-Gries Summary data structure as described in [Cormode et al. paper](https://people.csail.mit.edu/rrw/6.045-2017/encalgs-mg.pdf). We **refined** the algorithm to ensure that K elements will always be returned, as long as K <= N. As such, this data structure returns **all** elements with frequency f >= (n/k), and may include additional leftovers (i.e. best guesses). It supports a fully parallelizable, memory-bounded MCV computation scheme, through an easy API. Includes unit tests on i32s. Next steps: Second sweep to gather the associated frequency of each MCV idendified by the Misra Gries Summary.
1 parent 0efc874 commit 76b98c6

File tree

2 files changed

+203
-0
lines changed

2 files changed

+203
-0
lines changed

optd-gungnir/src/stats.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod hyperloglog;
2+
pub mod misragries;
23
pub mod murmur2;
34
pub mod tdigest;

optd-gungnir/src/stats/misragries.rs

+202
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
//! Implementation of the Misra-Gries Summary data structure as described in
2+
//! the Cormode et al. paper: "Misra-Gries Summaries" (2014).
3+
//! We further refine the algorithm to ensure that K elements will always be
4+
//! returned, as long as K <= N.
5+
//! For more details, refer to:
6+
//! https://people.csail.mit.edu/rrw/6.045-2017/encalgs-mg.pdf
7+
8+
use std::{cmp::min, collections::HashMap, hash::Hash};
9+
10+
use itertools::Itertools;
11+
12+
/// The Misra-Gries structure to approximate the k most frequent elements in
13+
/// a stream of N elements. It will always identify elements with frequency
14+
/// f >= (n/k), and include additional leftovers.
15+
pub struct MisraGries<T: PartialEq + Eq + Hash + Clone> {
16+
frequencies: HashMap<T, i32>, // The approximated frequencies of an element T.
17+
k: u16, // The max size of our frequencies hashmap.
18+
}
19+
20+
// Self-contained implementation of the Misra-Gries data structure.
21+
impl<T> MisraGries<T>
22+
where
23+
T: PartialEq + Eq + Hash + Clone,
24+
{
25+
/// Creates and initializes a new empty Misra-Gries.
26+
pub fn new(k: u16) -> Self {
27+
assert!(k > 0);
28+
29+
MisraGries::<T> {
30+
frequencies: HashMap::with_capacity(k as usize),
31+
k,
32+
}
33+
}
34+
35+
// Returns the (key, val) pair of the least frequent element.
36+
// If more than one such element exists, returns an arbitrary one.
37+
// NOTE: Panics if no frequencies exist.
38+
fn find_least_frequent(&self) -> (T, i32) {
39+
let (key, occ) = self.frequencies.iter().min_by_key(|(_, occ)| *occ).unwrap();
40+
(key.clone(), *occ)
41+
}
42+
43+
// Inserts an element occ times into the `self` Misra-Gries structure.
44+
fn insert_element(&mut self, elem: T, occ: i32) {
45+
match self.frequencies.get_mut(&elem) {
46+
Some(freq) => *freq += occ, // Hit.
47+
None => {
48+
if self.frequencies.len() < self.k as usize {
49+
self.frequencies.insert(elem, occ); // Discovery phase.
50+
} else {
51+
// Check if there *will* be an evictable frequency; decrement & insert.
52+
let (smallest_freq_key, smallest_freq) = self.find_least_frequent();
53+
54+
let decr = min(smallest_freq, occ);
55+
if decr > 0 {
56+
for freq in self.frequencies.values_mut() {
57+
*freq -= decr;
58+
}
59+
}
60+
61+
let delta = smallest_freq - occ;
62+
if delta < 0 {
63+
self.frequencies.remove(&smallest_freq_key);
64+
self.frequencies.insert(elem, -delta);
65+
}
66+
}
67+
}
68+
}
69+
}
70+
71+
/// Digests an array of data into the Misra-Gries structure.
72+
pub fn aggregate(&mut self, data: &[T]) {
73+
data.iter()
74+
.for_each(|key| self.insert_element(key.clone(), 1));
75+
}
76+
77+
/// Merges another MisraGries into the current one.
78+
/// Particularly useful for parallel execution.
79+
pub fn merge(&mut self, other: &MisraGries<T>) {
80+
assert!(self.k == other.k);
81+
82+
other
83+
.frequencies
84+
.iter()
85+
.for_each(|(key, occ)| self.insert_element(key.clone(), *occ));
86+
}
87+
88+
/// Returns all elements with frequency f >= (n/k),
89+
/// and may include additional leftovers.
90+
pub fn most_frequent_keys(&self) -> Vec<&T> {
91+
self.frequencies.keys().collect_vec()
92+
}
93+
}
94+
95+
// Start of unit testing section.
96+
#[cfg(test)]
97+
mod tests {
98+
use std::sync::atomic::{AtomicUsize, Ordering};
99+
use std::sync::{Arc, Mutex};
100+
101+
use super::MisraGries;
102+
use crossbeam::thread;
103+
use rand::seq::SliceRandom;
104+
use rand::{rngs::StdRng, SeedableRng};
105+
106+
#[test]
107+
fn aggregate_full_size() {
108+
let data = vec![0, 1, 2, 3];
109+
let mut misra_gries = MisraGries::<i32>::new(data.len() as u16);
110+
111+
misra_gries.aggregate(&data);
112+
113+
for key in misra_gries.most_frequent_keys() {
114+
assert!(data.contains(key));
115+
}
116+
}
117+
118+
#[test]
119+
fn aggregate_half_size() {
120+
let data = vec![0, 1, 2, 3];
121+
let data_dup = [data.as_slice(), data.as_slice()].concat();
122+
123+
let mut misra_gries = MisraGries::<i32>::new(data.len() as u16);
124+
125+
misra_gries.aggregate(&data_dup);
126+
127+
for key in misra_gries.most_frequent_keys() {
128+
assert!(data.contains(key));
129+
}
130+
}
131+
132+
// Generates a shuffled array of n distinct elements following a Zipfian
133+
// distribution based on the provided seed.
134+
fn create_zipfian(n_distinct: i32, seed: u64) -> Vec<i32> {
135+
let mut data = Vec::<i32>::new();
136+
for idx in 1..=n_distinct {
137+
let occurance = n_distinct / idx;
138+
for _ in 0..occurance {
139+
data.push(idx);
140+
}
141+
}
142+
let mut rng = StdRng::seed_from_u64(seed);
143+
data.shuffle(&mut rng);
144+
145+
data
146+
}
147+
148+
// Verifies the ability of Misra-Gries in identifying the most frequent elements
149+
// in a dataset following a Zipfian distribution.
150+
fn check_zipfian(misra_gries: &MisraGries<i32>, n_distinct: i32) {
151+
let mfk = misra_gries.most_frequent_keys();
152+
let k = misra_gries.k as i32;
153+
let total_length: i32 = (1..=n_distinct).map(|idx| n_distinct / idx).sum();
154+
155+
assert!((1..=n_distinct)
156+
.filter(|idx| (n_distinct / idx) * k >= total_length)
157+
.all(|idx| mfk.contains(&&idx)));
158+
assert!(mfk.len() == (k as usize));
159+
}
160+
161+
#[test]
162+
fn aggregate_zipfian() {
163+
let n_distinct = 10000;
164+
let k = 200;
165+
166+
let data = create_zipfian(n_distinct, 0);
167+
let mut misra_gries = MisraGries::<i32>::new(k as u16);
168+
169+
misra_gries.aggregate(&data);
170+
171+
check_zipfian(&misra_gries, n_distinct);
172+
}
173+
174+
#[test]
175+
fn merge_zipfians() {
176+
let n_distinct = 10000;
177+
let n_jobs = 16;
178+
let k = 200;
179+
180+
let result_misra_gries = Arc::new(Mutex::new(MisraGries::<i32>::new(k as u16)));
181+
let job_id = AtomicUsize::new(0);
182+
thread::scope(|s| {
183+
for _ in 0..n_jobs {
184+
s.spawn(|_| {
185+
let mut local_misra_gries = MisraGries::<i32>::new(k as u16);
186+
let curr_job_id = job_id.fetch_add(1, Ordering::SeqCst);
187+
188+
let data = create_zipfian(n_distinct, curr_job_id as u64);
189+
local_misra_gries.aggregate(&data);
190+
191+
check_zipfian(&local_misra_gries, n_distinct);
192+
193+
let mut result = result_misra_gries.lock().unwrap();
194+
result.merge(&local_misra_gries);
195+
});
196+
}
197+
})
198+
.unwrap();
199+
200+
check_zipfian(&result_misra_gries.lock().unwrap(), n_distinct);
201+
}
202+
}

0 commit comments

Comments
 (0)