Skip to content

Commit ff672f5

Browse files
authored
Merge pull request #12 from Ja7ad/feat/reservoir_sampling
Feat: Reservoir Sampling
2 parents ff204d8 + d61752e commit ff672f5

17 files changed

+791
-10
lines changed

README.md

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
[![codecov](https://codecov.io/gh/Ja7ad/algo/graph/badge.svg?token=9fLKrkUviU)](https://codecov.io/gh/Ja7ad/algo)
55
[![Go Report Card](https://goreportcard.com/badge/github.com/Ja7ad/algo)](https://goreportcard.com/report/github.com/Ja7ad/algo)
66

7-
**`algo`** is a Golang library featuring a variety of **efficient** and **well-optimized** algorithms designed for diverse **problem-solving needs**.
7+
**`algo`** is a Golang library featuring a variety of **efficient** and **well-optimized** algorithms
8+
designed for diverse **problem-solving needs**.
89

910
## 📌 Features
11+
1012
**Optimized Performance** – Algorithms are designed with efficiency in mind.
1113
**Modular Structure** – Each algorithm is in its own package for easy use.
1214
**Well-Documented** – Clear documentation and examples for every algorithm.
@@ -15,10 +17,13 @@
1517

1618
## 📚 Available Algorithms
1719

18-
| Algorithm | Description |
19-
|-----------|-------------|
20-
| [Random Weighted Selection](./rws/README.md) | Selects items randomly based on assigned weights. Useful in load balancing, gaming, and AI. |
21-
20+
| Algorithm | Description |
21+
|--------------------------------------------------|-------------|
22+
| [Random Weighted Selection](./rws/README.md) | Selects items randomly based on assigned weights. Useful in load balancing, gaming, and AI. |
23+
| [Reservoir Sampling Algorithm R](./rs/README.md) | Basic reservoir sampling, replaces elements with probability `k/i`. Efficient for uniform random sampling. |
24+
| [Reservoir Sampling Algorithm L](./rs/README.md) | Optimized reservoir sampling for large `N`, reduces unnecessary replacements using skipping. |
25+
| [Weighted Reservoir Sampling](./rs/README.md) | Selects items with probability proportional to their weights using a heap-based approach. Used in recommendation systems and A/B testing. |
26+
| [Random Sort Reservoir Sampling](./rs/README.md) | Uses a min-heap and random priorities to maintain the top `k` elements in a streaming dataset. |
2227

2328
## 🚀 Installation >= go 1.19
2429

rs/README.md

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# Reservoir sampling
2+
3+
Reservoir sampling is a family of randomized algorithms for choosing a simple random sample, without replacement,
4+
of k items from a population of unknown size n in a single pass over the items. The size of the population n is not
5+
known to the algorithm and is typically too large for all n items to fit into main memory. The population is revealed
6+
to the algorithm over time, and the algorithm cannot look back at previous items. At any point, the current state of
7+
the algorithm must permit extraction of a simple random sample without replacement of size k over the part of
8+
the population seen so far.
9+
10+
## **🔹 Variants of Reservoir Sampling**
11+
While **Algorithm R** is the simplest and most commonly used, there are **other variants** that improve performance in specific cases:
12+
13+
| **Algorithm** | **Description** | **Complexity** |
14+
|--------------------------------|----------------|---------------|
15+
| **Algorithm R** | Basic reservoir sampling, replaces elements with probability `k/i` | **O(N)** |
16+
| **Algorithm L** | Optimized for large `N`, reduces replacements via skipping | **O(N), fewer iterations** |
17+
| **Weighted Reservoir Sampling** | Assigns elements weights, prioritizing selection based on weight | **O(N log k)** (heap-based) |
18+
| **Random Sort Reservoir Sampling** | Uses a min-heap priority queue, selecting `k` elements with highest random priority scores | **O(N log k)** |
19+
20+
## Algorithm Weighted R – Weighted Reservoir Sampling
21+
**Weighted Reservoir Sampling** is an **efficient algorithm** for selecting `k` elements **proportionally to their weights** from a stream of unknown length `N`, using only `O(k)` memory.
22+
23+
This repository implements **Weighted Algorithm R**, an extension of **Jeffrey Vitter's Algorithm R**, which allows weighted sampling using a **heap-based approach**.
24+
25+
> This algorithm uses a **min-heap-based priority selection**, ensuring **O(N log k)** time complexity, making it efficient for large streaming datasets.
26+
27+
## 📊 **Mathematical Formula for Weighted Algorithm R**
28+
29+
### **Problem Definition**
30+
We need to select **`k` elements** from a data stream **of unknown length `N`**, ensuring **each element is selected with a probability proportional to its weight `w_i`**.
31+
32+
### **Algorithm Steps**
33+
1. **Initialize a Min-Heap of Size `k`**
34+
- Store the first `k` elements **with their priority scores**:
35+
\[
36+
$p_i = \frac{w_i}{U_i}$
37+
\]
38+
where \( $U_i$ \) is a uniform random number from **(0,1]**.
39+
40+
2. **Process Remaining Elements (`i > k`)**
41+
- For each new element `s_i`:
42+
- Compute **priority score**:
43+
\[
44+
$p_i = \frac{w_i}{U_i}$
45+
\]
46+
- If `p_i` is greater than the **smallest priority in the heap**, replace the smallest element.
47+
48+
3. **After processing `N` elements**, the reservoir will contain `k` elements **selected proportionally to their weights**.
49+
50+
---
51+
52+
## 🔬 **Probability Proof**
53+
For any element \( $s_i$ \) with weight \( $w_i$ \):
54+
1. The **priority score** is:
55+
\[
56+
$p_i = \frac{w_i}{U_i}$
57+
\]
58+
where \( $U_i \sim U(0,1]$ \).
59+
60+
2. The **probability that `s_i` is among the top `k` elements**:
61+
\[
62+
$P(s_i \text{ is selected}) \propto w_i$
63+
\]
64+
meaning elements with **higher weights** are **more likely to be selected**.
65+
66+
**Conclusion:** Weighted Algorithm R correctly samples elements **proportionally to their weights**, unlike uniform Algorithm R.
67+
68+
---
69+
70+
## 🧪 **Test Case Formula for Weighted Algorithm R**
71+
72+
### **Test Case Design**
73+
To validate Weighted Algorithm R, we must check if:
74+
- **Higher-weight elements are chosen more frequently**.
75+
- **Selection follows the weight distribution over multiple runs**.
76+
77+
### **Mathematical Test**
78+
For `T` independent runs:
79+
- Let `count(s_i)` be the number of times `s_i` appears in the reservoir.
80+
- Expected probability:
81+
\[
82+
$P(s_i) = \frac{w_i}{\sum w_j}$
83+
\]
84+
- Expected occurrence over `T` runs:
85+
\[
86+
$\text{Expected count}(s_i) = T \times \frac{w_i}{\sum w_j}$
87+
\]
88+
- We verify that `count(s_i)` is **statistically close** to this value.
89+
90+
# 🎯 Algorithm L
91+
92+
**Reservoir Sampling** is a technique for randomly selecting `k` elements from a stream of unknown length `N`.
93+
**Algorithm L**, introduced by **Jeffrey Vitter (1985)**, improves upon traditional methods by using an **optimized skipping approach**, significantly reducing the number of random number calls.
94+
95+
### **Problem Definition**
96+
We need to select **`k` elements** from a data stream **of unknown length `N`**, ensuring **each element has an equal probability `k/N`** of being chosen.
97+
98+
### **Algorithm Steps**
99+
1. **Fill the reservoir** with the **first `k` elements**.
100+
2. **Initialize weight factor `W`** using:
101+
102+
$W = \exp\left(\frac{\log(\text{random}())}{k}\right)$
103+
104+
3. **Skip elements efficiently** using the geometric formula:
105+
106+
$\text{skip} = \lfloor \frac{\log(\text{random}())}{\log(1 - W)} \rfloor$
107+
108+
4. **If still in bounds**, **randomly replace** an element in the reservoir.
109+
5. **Update `W`** for the next iteration using:
110+
111+
$W = W \times \exp\left(\frac{\log(\text{random}())}{k}\right)$
112+
113+
6. **Repeat until the end of the stream**.
114+
115+
### **Probability Proof**
116+
For each element \( $s_i$ \), we show that it has an equal probability of being selected:
117+
118+
1. The probability that \( $s_i$ \) **reaches the selection process**:
119+
120+
$P(s_i \text{ is considered}) = \frac{k}{i}$
121+
122+
2. The probability that \( $s_i$ \) **remains in the reservoir** is:
123+
124+
$P(s_i \text{ in final reservoir}) = \frac{k}{N}, \quad \forall i \in \{1, ..., N\}$
125+
126+
This confirms that **Algorithm L ensures uniform selection**.
127+
128+
129+
## 🧪 **Test Case Formula for Algorithm L**
130+
131+
### **Test Case Design**
132+
To validate Algorithm L, we must check if:
133+
- **Each element is chosen with probability `k/N`**.
134+
- **Selection is uniform over multiple runs**.
135+
136+
### **Mathematical Test**
137+
For `T` independent runs:
138+
- Let `count(s_i)` be the number of times `s_i` appears in the reservoir.
139+
- Expected probability:
140+
141+
$P(s_i) = \frac{k}{N}$
142+
143+
- Expected occurrence over `T` runs:
144+
145+
$\text{Expected count}(s_i) = T \times \frac{k}{N}$
146+
147+
- We verify that `count(s_i)` is **statistically close** to this value.

rs/pq.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package rs
2+
3+
// Item represents an element with a priority
4+
type Item[T any] struct {
5+
Value T
6+
Priority float64
7+
}
8+
9+
// PriorityQueue implements a min-heap for Items
10+
type PriorityQueue[T any] []*Item[T]
11+
12+
func (pq PriorityQueue[T]) Len() int { return len(pq) }
13+
func (pq PriorityQueue[T]) Less(i, j int) bool { return pq[i].Priority < pq[j].Priority }
14+
func (pq PriorityQueue[T]) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
15+
16+
func (pq *PriorityQueue[T]) Push(x any) {
17+
item := x.(*Item[T])
18+
*pq = append(*pq, item)
19+
}
20+
21+
func (pq *PriorityQueue[T]) Pop() any {
22+
old := *pq
23+
n := len(old)
24+
item := old[n-1]
25+
*pq = old[:n-1]
26+
return item
27+
}

rs/pq_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package rs
2+
3+
import (
4+
"container/heap"
5+
"testing"
6+
)
7+
8+
func TestPriorityQueue_PushPop(t *testing.T) {
9+
pq := &PriorityQueue[int]{}
10+
heap.Init(pq)
11+
12+
items := []struct {
13+
value int
14+
priority float64
15+
}{
16+
{1, 0.8},
17+
{2, 0.5},
18+
{3, 0.2},
19+
{4, 0.9},
20+
{5, 0.1},
21+
}
22+
23+
for _, item := range items {
24+
heap.Push(pq, &Item[int]{Value: item.value, Priority: item.priority})
25+
}
26+
27+
expectedOrder := []int{5, 3, 2, 1, 4}
28+
for i, expected := range expectedOrder {
29+
popped := heap.Pop(pq).(*Item[int])
30+
if popped.Value != expected {
31+
t.Errorf("Expected %d, but got %d at index %d", expected, popped.Value, i)
32+
}
33+
}
34+
}
35+
36+
func TestPriorityQueue_GenericTypes(t *testing.T) {
37+
pq := &PriorityQueue[string]{}
38+
heap.Init(pq)
39+
40+
heap.Push(pq, &Item[string]{Value: "apple", Priority: 0.9})
41+
heap.Push(pq, &Item[string]{Value: "banana", Priority: 0.5})
42+
heap.Push(pq, &Item[string]{Value: "cherry", Priority: 0.1})
43+
44+
expectedOrder := []string{"cherry", "banana", "apple"}
45+
for i, expected := range expectedOrder {
46+
popped := heap.Pop(pq).(*Item[string])
47+
if popped.Value != expected {
48+
t.Errorf("Expected %s, but got %s at index %d", expected, popped.Value, i)
49+
}
50+
}
51+
}
52+
53+
func TestPriorityQueue_Length(t *testing.T) {
54+
pq := &PriorityQueue[float64]{}
55+
heap.Init(pq)
56+
57+
heap.Push(pq, &Item[float64]{Value: 1.5, Priority: 0.3})
58+
heap.Push(pq, &Item[float64]{Value: 2.5, Priority: 0.7})
59+
60+
if pq.Len() != 2 {
61+
t.Errorf("Expected length 2, got %d", pq.Len())
62+
}
63+
64+
heap.Pop(pq)
65+
if pq.Len() != 1 {
66+
t.Errorf("Expected length 1 after pop, got %d", pq.Len())
67+
}
68+
}
69+
70+
func TestPriorityQueue_Empty(t *testing.T) {
71+
pq := &PriorityQueue[int]{}
72+
heap.Init(pq)
73+
74+
if pq.Len() != 0 {
75+
t.Errorf("Expected empty queue, got length %d", pq.Len())
76+
}
77+
78+
heap.Push(pq, &Item[int]{Value: 10, Priority: 0.5})
79+
if pq.Len() != 1 {
80+
t.Errorf("Expected length 1 after push, got %d", pq.Len())
81+
}
82+
83+
heap.Pop(pq)
84+
if pq.Len() != 0 {
85+
t.Errorf("Expected empty queue after pop, got length %d", pq.Len())
86+
}
87+
}

rs/rs_L.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package rs
2+
3+
import (
4+
"math"
5+
"math/rand"
6+
"time"
7+
)
8+
9+
func init() {
10+
rand.New(rand.NewSource(time.Now().UnixNano()))
11+
}
12+
13+
// ReservoirSampleL selects k elements from a stream using Algorithm L
14+
func ReservoirSampleL[T any](stream []T, k int) []T {
15+
if len(stream) < k {
16+
return nil // Not enough elements
17+
}
18+
19+
// Step 1: Fill the initial reservoir
20+
reservoir := make([]T, k)
21+
copy(reservoir, stream[:k])
22+
23+
// Step 2: Initialize weight factor W
24+
W := math.Exp(math.Log(rand.Float64()) / float64(k))
25+
26+
i := k // Current position in the stream
27+
28+
// Step 3: Process remaining elements with skipping
29+
for i < len(stream) {
30+
// Calculate number of elements to skip
31+
skip := int(math.Floor(math.Log(rand.Float64()) / math.Log(1-W)))
32+
i += skip + 1 // Move forward in the stream
33+
34+
// If within bounds, replace a random item in the reservoir
35+
if i < len(stream) {
36+
j := rand.Intn(k) // Random index in the reservoir
37+
reservoir[j] = stream[i]
38+
39+
// Update weight factor W
40+
W *= math.Exp(math.Log(rand.Float64()) / float64(k))
41+
}
42+
}
43+
44+
return reservoir
45+
}

rs/rs_L_example_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package rs
2+
3+
import "fmt"
4+
5+
func ExampleReservoirSampleL() {
6+
// Define a sample stream of integers
7+
stream := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
8+
9+
// Select 5 random elements using Algorithm L
10+
reservoir := ReservoirSampleL(stream, 5)
11+
12+
// Print the selected reservoir sample
13+
fmt.Println("Selected Reservoir Sample:", reservoir)
14+
}

0 commit comments

Comments
 (0)