From 7d9e01e070c2517f1ec8700af514e6dba8fe7e63 Mon Sep 17 00:00:00 2001 From: Nick Ripley Date: Wed, 5 Feb 2025 12:20:22 -0500 Subject: [PATCH] WIP: internal/traceprof: make endpoint call counting efficient MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch to a data structure designed for efficient concurrent updates for endpoint call counting. This is a prerequisite for enabling the feature by default. TODO: say more about the choice of library Benchmark results on my M1 Max Macbook Pro: goos: darwin goarch: arm64 pkg: gopkg.in/DataDog/dd-trace-go.v1/internal/traceprof cpu: Apple M1 Max │ before.txt │ after.txt │ │ sec/op │ sec/op vs base │ EndpointCounter/enabled=true-10 139.800n ± 11% 5.000n ± 18% -96.42% (p=0.000 n=10) EndpointCounter/enabled=false-10 0.2716n ± 5% 0.2682n ± 1% ~ (p=0.118 n=10) geomean 6.162n 1.158n -81.21% │ before.txt │ after.txt │ │ B/op │ B/op vs base │ EndpointCounter/enabled=true-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ EndpointCounter/enabled=false-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ geomean ² +0.00% ² ¹ all samples are equal ² summaries must be >0 to compute geomean │ before.txt │ after.txt │ │ allocs/op │ allocs/op vs base │ EndpointCounter/enabled=true-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ EndpointCounter/enabled=false-10 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ geomean ² +0.00% ² ¹ all samples are equal ² summaries must be >0 to compute geomean --- go.mod | 1 + go.sum | 2 + internal/traceprof/endpoint_counter.go | 71 +++++++++++++++------ internal/traceprof/endpoint_counter_test.go | 19 ++++++ 4 files changed, 75 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 21ac06b4e0..1032f6aa36 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( github.com/miekg/dns v1.1.55 github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c github.com/opentracing/opentracing-go v1.2.0 + github.com/puzpuzpuz/xsync/v3 v3.5.0 github.com/redis/go-redis/v9 v9.1.0 github.com/richardartoul/molecule v1.0.1-0.20240531184615-7ca0df43c0b3 github.com/segmentio/kafka-go v0.4.42 diff --git a/go.sum b/go.sum index 9ea20e2ff9..4f7fcef157 100644 --- a/go.sum +++ b/go.sum @@ -855,6 +855,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.15.0 h1:A82kmvXJq2jTu5YUhSGNlYoxh85zLnKgPz4bMZgI5Ek= github.com/prometheus/procfs v0.15.0/go.mod h1:Y0RJ/Y5g5wJpkTisOtqwDSo4HwhGmLB4VQSw2sQJLHk= +github.com/puzpuzpuz/xsync/v3 v3.5.0 h1:i+cMcpEDY1BkNm7lPDkCtE4oElsYLn+EKF8kAu2vXT4= +github.com/puzpuzpuz/xsync/v3 v3.5.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.1.0 h1:137FnGdk+EQdCbye1FW+qOEcY5S+SpY9T0NiuqvtfMY= diff --git a/internal/traceprof/endpoint_counter.go b/internal/traceprof/endpoint_counter.go index 7d2c3e4a65..6629ca9a64 100644 --- a/internal/traceprof/endpoint_counter.go +++ b/internal/traceprof/endpoint_counter.go @@ -8,6 +8,8 @@ package traceprof import ( "sync" "sync/atomic" + + "github.com/puzpuzpuz/xsync/v3" ) // globalEndpointCounter is shared between the profiler and the tracer. @@ -38,7 +40,11 @@ func GlobalEndpointCounter() *EndpointCounter { // NewEndpointCounter returns a new NewEndpointCounter that will track hit // counts for up to limit endpoints. A limit of <= 0 indicates no limit. func NewEndpointCounter(limit int) *EndpointCounter { - return &EndpointCounter{enabled: 1, limit: limit, counts: map[string]uint64{}} + return &EndpointCounter{ + enabled: 1, + limit: limit, + counts: xsync.NewMapOf[string, *xsync.Counter](), + } } // EndpointCounter counts hits per endpoint. @@ -50,7 +56,7 @@ func NewEndpointCounter(limit int) *EndpointCounter { type EndpointCounter struct { enabled uint64 mu sync.Mutex - counts map[string]uint64 + counts *xsync.MapOf[string, *xsync.Counter] limit int } @@ -69,30 +75,59 @@ func (e *EndpointCounter) Inc(endpoint string) { return } - // Acquire lock until func returns - e.mu.Lock() - defer e.mu.Unlock() - - // Don't add another endpoint to the map if the limit is reached. See - // globalEndpointCounter comment. - count, ok := e.counts[endpoint] - if !ok && e.limit > 0 && len(e.counts) >= e.limit { + count, ok := e.counts.Load(endpoint) + if !ok { + // If we haven't seen this endpoint yet, add it. Another + // goroutine might be racing to add it, so use + // LoadOrStore: we'll only store if this goroutine + // "wins" the race to add it, and we'll have a small + // wasted allocation if the goroutine "loses" the race. + // In microbenchmarks this seems to be faster than a + // single LoadOrCompute + // TODO: our tests pass whether or not we re-set ok + // here. re-setting seems right because we need to check + // whether we hit the limit _if_ we added + // Can we test more thoroughly? + count, ok = e.counts.LoadOrStore(endpoint, xsync.NewCounter()) + } + if !ok && e.limit > 0 && e.counts.Size() > e.limit { + // If we went over the limit when we added the counter, + // delete it. + // TODO: this is racy: another goroutine might also add + // a different endpoint and exceed the limit _after_ + // this one, yet we check the size first end delete our + // endpoint _before_ the other goroutine. + // Does it matter in practice? + e.counts.Delete(endpoint) return } // Increment the endpoint count - e.counts[endpoint] = count + 1 + count.Inc() + return } // GetAndReset returns the hit counts for all endpoints and resets their counts // back to 0. func (e *EndpointCounter) GetAndReset() map[string]uint64 { - // Acquire lock until func returns - e.mu.Lock() - defer e.mu.Unlock() - - // Return current counts and reset internal map. - counts := e.counts - e.counts = make(map[string]uint64) + // Try to right-size the allocation + counts := make(map[string]uint64, e.counts.Size()) + e.counts.Range(func(key string, _ *xsync.Counter) bool { + // TODO: in https://github.com/felixge/countermap/blob/main/xsync_map_counter_map.go, + // Felix reads the input value and then deletes the key. + // A LoadAndDelete ensures we don't miss updates to the + // count for the endpoint: either we get them here or in + // the next cycle. We could also consider not deleting + // the value, but instead reset it, if we aren't at the + // size limit? Would be nice if xsync.Counter had a + // Swap operation for that. + v, ok := e.counts.LoadAndDelete(key) + if ok { + // ok should always be true unless we're calling + // GetAndReset concurrently somewhere else... + counts[key] = uint64(v.Value()) + } + return true + }) return counts } diff --git a/internal/traceprof/endpoint_counter_test.go b/internal/traceprof/endpoint_counter_test.go index 81875327dc..88206951b8 100644 --- a/internal/traceprof/endpoint_counter_test.go +++ b/internal/traceprof/endpoint_counter_test.go @@ -7,6 +7,7 @@ package traceprof import ( "fmt" + "sync" "testing" "github.com/stretchr/testify/require" @@ -27,6 +28,24 @@ func TestEndpointCounter(t *testing.T) { require.Equal(t, map[string]uint64{}, ec.GetAndReset()) }) + t.Run("fixed limit parallel", func(t *testing.T) { + ec := NewEndpointCounter(10) + var wg sync.WaitGroup + for i := range 10 { + wg.Add(1) + go func() { + defer wg.Done() + for j := range 10 { + // Non-overlapping keys + // TODO: explain why (if?) this is a good thing to check + ec.Inc(fmt.Sprintf("%d", i*10+j)) + } + }() + } + wg.Wait() + require.Len(t, ec.GetAndReset(), 10) + }) + t.Run("no limit", func(t *testing.T) { ec := NewEndpointCounter(-1) for i := 0; i < 100; i++ {