Skip to content

Commit bc276be

Browse files
committed
deltatocumulative: lockfree
1 parent f67a603 commit bc276be

File tree

6 files changed

+371
-98
lines changed

6 files changed

+371
-98
lines changed

processor/deltatocumulativeprocessor/internal/delta/delta.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,24 @@ func (e ErrOutOfOrder) Error() string {
3030
return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last)
3131
}
3232

33-
type Type interface {
33+
type Type[Self any] interface {
3434
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint
3535

3636
StartTimestamp() pcommon.Timestamp
3737
Timestamp() pcommon.Timestamp
38+
39+
CopyTo(Self)
3840
}
3941

4042
// AccumulateInto adds state and dp, storing the result in state
4143
//
4244
// state = state + dp
43-
func AccumulateInto[T Type](state, dp T) error {
45+
func AccumulateInto[T Type[T]](state, dp T) error {
4446
switch {
47+
case state.Timestamp() == 0:
48+
// first sample of series, no state to aggregate with
49+
dp.CopyTo(state)
50+
return nil
4551
case dp.StartTimestamp() < state.StartTimestamp():
4652
// belongs to older series
4753
return ErrOlderStart{Start: state.StartTimestamp(), Sample: dp.StartTimestamp()}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package maps // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maps"
5+
6+
import (
7+
"sync"
8+
"sync/atomic"
9+
)
10+
11+
// Concurrent describes operations of a hashmap-like datastructure that is safe
12+
// for concurrent access, as implemented by e.g. [sync.Map]
13+
type Concurrent[K comparable, V any] interface {
14+
Load(K) (V, bool)
15+
LoadOrStore(K, V) (V, bool)
16+
LoadAndDelete(K) (V, bool)
17+
}
18+
19+
var _ Concurrent[any, any] = &sync.Map{}
20+
21+
// Limit applies the limit to m, such that no more the item count never exceeds
22+
// it
23+
//
24+
// try is used to track the limit. its value may temporarliy exceed limit. For
25+
// accurate size measurements, use [Count] instead. try may be shared across
26+
// multiple maps, to enforce a common limit.
27+
func Limit[M Concurrent[K, V], K comparable, V any](m M, limit int64, try *atomic.Int64) *Limited[M, K, V] {
28+
if try == nil {
29+
try = new(atomic.Int64)
30+
}
31+
return &Limited[M, K, V]{
32+
elems: m,
33+
try: try,
34+
limit: limit,
35+
}
36+
}
37+
38+
type Limited[M Concurrent[K, V], K comparable, V any] struct {
39+
elems M
40+
try *atomic.Int64
41+
limit int64
42+
}
43+
44+
var _ Concurrent[any, any] = (*Limited[*sync.Map, any, any])(nil)
45+
46+
func (m *Limited[M, K, V]) Load(k K) (V, bool) {
47+
return m.elems.Load(k)
48+
}
49+
50+
func (m *Limited[M, K, V]) LoadOrStore(k K, def V) (V, bool) {
51+
v, ok := m.elems.Load(k)
52+
if ok {
53+
return v, true
54+
}
55+
56+
sz := m.try.Add(1)
57+
if sz > m.limit {
58+
m.try.Add(-1)
59+
return m.elems.Load(k) // possibly created by now, or *new(T),false
60+
}
61+
62+
v, loaded := m.elems.LoadOrStore(k, def)
63+
if loaded { // already created
64+
m.try.Add(-1)
65+
}
66+
return v, loaded
67+
}
68+
69+
func (m *Limited[M, K, V]) LoadAndDelete(k K) (V, bool) {
70+
v, ok := m.elems.LoadAndDelete(k)
71+
if ok {
72+
m.try.Add(-1)
73+
}
74+
return v, ok
75+
}
76+
77+
// Exceeded reports whether a [Limited.LoadOrStore] failed due to the limit being exceeded.
78+
func Exceeded[T comparable](v T, loaded bool) bool {
79+
return !loaded && v == *new(T)
80+
}
81+
82+
// Count store / delete operations towards given size.
83+
// Size can be shared across multiple maps.
84+
func Count[M Concurrent[K, V], K comparable, V any](m M, size *atomic.Int64) Sized[M, K, V] {
85+
return Sized[M, K, V]{elems: m, size: size}
86+
}
87+
88+
type Sized[M Concurrent[K, V], K comparable, V any] struct {
89+
elems M
90+
size *atomic.Int64
91+
}
92+
93+
func (s Sized[M, K, V]) Load(k K) (V, bool) {
94+
return s.elems.Load(k)
95+
}
96+
97+
func (s Sized[M, K, V]) LoadOrStore(k K, def V) (V, bool) {
98+
v, loaded := s.elems.LoadOrStore(k, def)
99+
if !loaded {
100+
s.size.Add(1)
101+
}
102+
return v, loaded
103+
}
104+
105+
func (s Sized[M, K, V]) LoadAndDelete(k K) (V, bool) {
106+
v, loaded := s.elems.LoadAndDelete(k)
107+
if loaded {
108+
s.size.Add(-1)
109+
}
110+
return v, loaded
111+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package maps_test
5+
6+
import (
7+
"fmt"
8+
"sync"
9+
"sync/atomic"
10+
"testing"
11+
12+
"github.com/puzpuzpuz/xsync/v3"
13+
"github.com/stretchr/testify/require"
14+
"go.opentelemetry.io/collector/pdata/pmetric"
15+
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maps"
17+
)
18+
19+
func TestLimit(t *testing.T) {
20+
m := maps.Limit(xsync.NewMapOf[int, pmetric.NumberDataPoint](), 100, new(atomic.Int64))
21+
v := pmetric.NewNumberDataPoint()
22+
23+
var (
24+
load = new(atomic.Int64)
25+
store = new(atomic.Int64)
26+
fail = new(atomic.Int64)
27+
)
28+
29+
var wg sync.WaitGroup
30+
for range 10 {
31+
wg.Add(1)
32+
go func() {
33+
for i := range 110 {
34+
o, loaded := m.LoadOrStore(i, v)
35+
switch {
36+
case maps.Exceeded(o, loaded):
37+
fail.Add(1)
38+
case loaded:
39+
load.Add(1)
40+
case !loaded:
41+
store.Add(1)
42+
}
43+
}
44+
wg.Done()
45+
}()
46+
}
47+
wg.Wait()
48+
49+
fmt.Println(load.Load(), store.Load(), fail.Load())
50+
51+
require.Equal(t, int64(100), store.Load())
52+
require.Equal(t, int64(900), load.Load())
53+
require.Equal(t, int64(100), fail.Load())
54+
}

processor/deltatocumulativeprocessor/internal/telemetry/hot.go

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)