Skip to content

Commit d08caab

Browse files
committed
Replace mutex with channel for TelemetryCounter
1 parent 1829e93 commit d08caab

File tree

5 files changed

+58
-66
lines changed

5 files changed

+58
-66
lines changed

counter/counter.go

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,95 +19,107 @@ import "encoding/json"
1919

2020
// TelemetryCounter tracks the number of times a set of resource and attribute dimensions have been seen.
2121
type TelemetryCounter struct {
22-
resources map[string]*ResourceCounter
22+
resources map[string]*resourceCounter
23+
commands chan func()
2324
}
2425

2526
// NewTelemetryCounter creates a new TelemetryCounter.
2627
func NewTelemetryCounter() *TelemetryCounter {
27-
return &TelemetryCounter{
28-
resources: make(map[string]*ResourceCounter),
28+
t := &TelemetryCounter{
29+
resources: make(map[string]*resourceCounter),
30+
commands: make(chan func()),
2931
}
32+
go t.run()
33+
return t
3034
}
3135

32-
// Add increments the counter with the supplied dimensions.
33-
func (t *TelemetryCounter) Add(resource, attributes map[string]any) {
34-
key := getDimensionKey(resource)
35-
if _, ok := t.resources[key]; !ok {
36-
t.resources[key] = NewResourceCounter(resource)
36+
// run listens for commands to modify or read the resources.
37+
func (t *TelemetryCounter) run() {
38+
for cmd := range t.commands {
39+
cmd()
3740
}
38-
39-
t.resources[key].Add(attributes)
4041
}
4142

42-
// Resources returns a map of resource ID to a counter for that resource.
43-
func (t TelemetryCounter) Resources() map[string]*ResourceCounter {
44-
return t.resources
43+
// Add increments the counter with the supplied dimensions.
44+
func (t *TelemetryCounter) Add(resource, attributes map[string]any) {
45+
t.commands <- func() {
46+
key := getDimensionKey(resource)
47+
if _, ok := t.resources[key]; !ok {
48+
t.resources[key] = newResourceCounter(resource)
49+
}
50+
t.resources[key].add(attributes)
51+
}
4552
}
4653

47-
// Reset resets the counter.
48-
func (t *TelemetryCounter) Reset() {
49-
t.resources = make(map[string]*ResourceCounter)
54+
// Resources returns a map of resource ID to a counter for that resource and resets the counter.
55+
func (t *TelemetryCounter) Resources() map[string]*resourceCounter {
56+
result := make(chan map[string]*resourceCounter)
57+
t.commands <- func() {
58+
result <- t.resources
59+
t.resources = make(map[string]*resourceCounter) // Reset the counter
60+
}
61+
return <-result
5062
}
5163

52-
// ResourceCounter dimensions the counter by resource.
53-
type ResourceCounter struct {
64+
// resourceCounter dimensions the counter by resource.
65+
type resourceCounter struct {
5466
values map[string]any
55-
attributes map[string]*AttributeCounter
67+
attributes map[string]*attributeCounter
5668
}
5769

58-
// NewResourceCounter creates a new ResourceCounter.
59-
func NewResourceCounter(values map[string]any) *ResourceCounter {
60-
return &ResourceCounter{
70+
// newResourceCounter creates a new ResourceCounter.
71+
func newResourceCounter(values map[string]any) *resourceCounter {
72+
return &resourceCounter{
6173
values: values,
62-
attributes: map[string]*AttributeCounter{},
74+
attributes: map[string]*attributeCounter{},
6375
}
6476
}
6577

66-
// Add increments the counter with the supplied dimensions.
67-
func (r *ResourceCounter) Add(attributes map[string]any) {
78+
// add increments the counter with the supplied dimensions.
79+
func (r *resourceCounter) add(attributes map[string]any) {
6880
key := getDimensionKey(attributes)
6981
if _, ok := r.attributes[key]; !ok {
70-
r.attributes[key] = NewAttributeCounter(attributes)
82+
r.attributes[key] = newAttributeCounter(attributes)
7183
}
7284

73-
r.attributes[key].Add()
85+
r.attributes[key].add()
7486
}
7587

7688
// Attributes returns a map of attribute set ID to a counter for that attribute set.
77-
func (r ResourceCounter) Attributes() map[string]*AttributeCounter {
89+
func (r resourceCounter) Attributes() map[string]*attributeCounter {
7890
return r.attributes
7991
}
8092

8193
// Values returns the raw map value of the resource that this counter counts.
82-
func (r ResourceCounter) Values() map[string]any {
94+
func (r resourceCounter) Values() map[string]any {
8395
return r.values
8496
}
8597

86-
// AttributeCounter dimensions the counter by attributes.
87-
type AttributeCounter struct {
98+
// attributeCounter dimensions the counter by attributes.
99+
type attributeCounter struct {
88100
values map[string]any
89101
count int
90102
}
91103

92-
// NewAttributeCounter creates a new AttributeCounter.
93-
func NewAttributeCounter(values map[string]any) *AttributeCounter {
94-
return &AttributeCounter{
104+
// newAttributeCounter creates a new AttributeCounter.
105+
func newAttributeCounter(values map[string]any) *attributeCounter {
106+
return &attributeCounter{
95107
values: values,
96108
}
97109
}
98110

99-
// Add increments the counter.
100-
func (a *AttributeCounter) Add() {
111+
// add increments the counter.
112+
func (a *attributeCounter) add() {
101113
a.count++
102114
}
103115

104116
// Count returns the number of counts for this attribute counter.
105-
func (a AttributeCounter) Count() int {
117+
func (a attributeCounter) Count() int {
106118
return a.count
107119
}
108120

109121
// Values returns the attribute map that this counter tracks.
110-
func (a AttributeCounter) Values() map[string]any {
122+
func (a attributeCounter) Values() map[string]any {
111123
return a.values
112124
}
113125

counter/counter_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,13 @@ func TestLogCounter(t *testing.T) {
4242
attrKey1 := getDimensionKey(attrMap1)
4343
attrKey2 := getDimensionKey(attrMap2)
4444

45-
require.Equal(t, 10, counter.resources[resourceKey1].attributes[attrKey1].count)
46-
require.Equal(t, 5, counter.resources[resourceKey1].attributes[attrKey2].count)
47-
require.Equal(t, 1, counter.resources[resourceKey2].attributes[attrKey1].count)
45+
resources := counter.Resources()
4846

49-
counter.Reset()
50-
require.Len(t, counter.resources, 0)
47+
require.Equal(t, 10, resources[resourceKey1].attributes[attrKey1].count)
48+
require.Equal(t, 5, resources[resourceKey1].attributes[attrKey2].count)
49+
require.Equal(t, 1, resources[resourceKey2].attributes[attrKey1].count)
50+
51+
// Ensure that the counter has been reset
52+
resources = counter.Resources()
53+
require.Len(t, resources, 0)
5154
}

processor/datapointcountprocessor/processor.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ type metricCountProcessor struct {
4242
logger *zap.Logger
4343
cancel context.CancelFunc
4444
wg sync.WaitGroup
45-
mux sync.Mutex
4645
}
4746

4847
// newExprProcessor returns a new processor with expr expressions.
@@ -110,8 +109,6 @@ func (p *metricCountProcessor) Shutdown(_ context.Context) error {
110109

111110
// ConsumeMetrics processes the metrics.
112111
func (p *metricCountProcessor) ConsumeMetrics(ctx context.Context, m pmetric.Metrics) error {
113-
p.mux.Lock()
114-
defer p.mux.Unlock()
115112

116113
if p.isOTTL() {
117114
p.consumeMetricsOTTL(ctx, m)
@@ -202,9 +199,6 @@ func (p *metricCountProcessor) sendMetrics(ctx context.Context) {
202199

203200
// createMetrics creates metrics from the counter. The counter is reset after the metrics are created.
204201
func (p *metricCountProcessor) createMetrics() pmetric.Metrics {
205-
p.mux.Lock()
206-
defer p.mux.Unlock()
207-
208202
metrics := pmetric.NewMetrics()
209203
for _, resource := range p.counter.Resources() {
210204
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
@@ -231,9 +225,6 @@ func (p *metricCountProcessor) createMetrics() pmetric.Metrics {
231225

232226
}
233227
}
234-
235-
p.counter.Reset()
236-
237228
return metrics
238229
}
239230

processor/logcountprocessor/processor.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ type logCountProcessor struct {
4343
logger *zap.Logger
4444
cancel context.CancelFunc
4545
wg sync.WaitGroup
46-
mux sync.Mutex
4746
}
4847

4948
// newProcessor returns a new processor.
@@ -105,8 +104,6 @@ func (p *logCountProcessor) Shutdown(_ context.Context) error {
105104

106105
// ConsumeLogs processes the logs.
107106
func (p *logCountProcessor) ConsumeLogs(ctx context.Context, pl plog.Logs) error {
108-
p.mux.Lock()
109-
defer p.mux.Unlock()
110107

111108
if p.isOTTL() {
112109
p.consumeLogsOTTL(ctx, pl)
@@ -176,16 +173,12 @@ func (p *logCountProcessor) handleMetricInterval(ctx context.Context) {
176173

177174
// sendMetrics sends metrics to the consumer.
178175
func (p *logCountProcessor) sendMetrics(ctx context.Context) {
179-
p.mux.Lock()
180-
defer p.mux.Unlock()
181176

182177
metrics := p.createMetrics()
183178
if metrics.ResourceMetrics().Len() == 0 {
184179
return
185180
}
186181

187-
p.counter.Reset()
188-
189182
if err := routereceiver.RouteMetrics(ctx, p.config.Route, metrics); err != nil {
190183
p.logger.Error("Failed to send metrics", zap.Error(err))
191184
}

processor/spancountprocessor/processor.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ type spanCountProcessor struct {
4343
logger *zap.Logger
4444
cancel context.CancelFunc
4545
wg sync.WaitGroup
46-
mux sync.Mutex
4746
}
4847

4948
// newProcessor returns a new processor.
@@ -106,8 +105,6 @@ func (p *spanCountProcessor) Shutdown(_ context.Context) error {
106105

107106
// ConsumeMetrics processes the metrics.
108107
func (p *spanCountProcessor) ConsumeTraces(ctx context.Context, t ptrace.Traces) error {
109-
p.mux.Lock()
110-
defer p.mux.Unlock()
111108

112109
if p.isOTTL() {
113110
p.consumeTracesOTTL(ctx, t)
@@ -184,16 +181,12 @@ func (p *spanCountProcessor) handleMetricInterval(ctx context.Context) {
184181

185182
// sendMetrics sends metrics to the consumer.
186183
func (p *spanCountProcessor) sendMetrics(ctx context.Context) {
187-
p.mux.Lock()
188-
defer p.mux.Unlock()
189184

190185
metrics := p.createMetrics()
191186
if metrics.ResourceMetrics().Len() == 0 {
192187
return
193188
}
194189

195-
p.counter.Reset()
196-
197190
if err := routereceiver.RouteMetrics(ctx, p.config.Route, metrics); err != nil {
198191
p.logger.Error("Failed to send metrics", zap.Error(err))
199192
}

0 commit comments

Comments
 (0)