Skip to content

Commit

Permalink
adds support for Senders to use a shared registry which is managed ex…
Browse files Browse the repository at this point in the history
…ternally to libkflow
  • Loading branch information
ktkenny committed Jan 20, 2024
1 parent 0ed6079 commit 4515751
Show file tree
Hide file tree
Showing 32 changed files with 1,347 additions and 75 deletions.
6 changes: 6 additions & 0 deletions api/test/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Server struct {
res chan *api.DNSResponse
mux *mux.Router
listener net.Listener
cron *cron.Cron
}

var (
Expand Down Expand Up @@ -109,6 +110,7 @@ func (s *Server) Serve(email, token string, dev *api.Device) error {
s.mux.HandleFunc(API+"/devices", s.wrap(s.devices))

c := cron.New()
s.cron = c
c.AddFunc("* * * * * *", func() {
flows := atomic.SwapUint64(&flowCounter, 0)
packets := atomic.SwapUint64(&packetCounter, 0)
Expand All @@ -122,6 +124,10 @@ func (s *Server) Serve(email, token string, dev *api.Device) error {
return http.Serve(s.listener, s.mux)
}

func (s *Server) Close() {
s.cron.Stop()
}

func (s *Server) URL(path string) *url.URL {
url, _ := url.Parse(fmt.Sprintf("http://%s:%d%s", s.Host, s.Port, path))
return url
Expand Down
47 changes: 30 additions & 17 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"time"

go_metrics "github.com/kentik/kit/go/legacy/go-metrics"
"github.com/kentik/libkflow/agg"
"github.com/kentik/libkflow/api"
"github.com/kentik/libkflow/log"
Expand All @@ -17,19 +18,20 @@ import (

// Config describes the libkflow configuration.
type Config struct {
email string
token string
capture Capture
proxy *url.URL
api *url.URL
flow *url.URL
metrics *url.URL
sample int
timeout time.Duration
retries int
logger interface{}
program string
version string
email string
token string
capture Capture
proxy *url.URL
api *url.URL
flow *url.URL
metrics *url.URL
sample int
timeout time.Duration
retries int
logger interface{}
program string
version string
registry go_metrics.Registry

metricsPrefix string
metricsInterval time.Duration
Expand Down Expand Up @@ -147,6 +149,11 @@ func (c *Config) SetMetricsInterval(dur time.Duration) {
c.metricsInterval = dur
}

// OverrideRegistry allows setting a registry which will act as a shared registry between multiple Senders.
func (c *Config) OverrideRegistry(registry go_metrics.Registry) {
c.registry = registry
}

func (c *Config) client() *api.Client {
return api.NewClient(api.ClientConfig{
Email: c.email,
Expand All @@ -164,18 +171,24 @@ func (c *Config) start(client *api.Client, dev *api.Device, errors chan<- error)
if c.metricsInterval == 0 {
c.metricsInterval = 60 * time.Second
}
metrics := c.NewMetrics(dev)
metrics.Start(c.metrics.String(), c.email, c.token, c.metricsPrefix, c.metricsInterval, c.proxy)

agg, err := agg.NewAgg(time.Second, dev.MaxFlowRate, metrics)
var senderMetrics *metrics.Metrics
if c.registry == nil {
senderMetrics = c.NewMetrics(dev)
senderMetrics.Start(c.metrics.String(), c.email, c.token, c.metricsPrefix, c.metricsInterval, c.proxy)
} else {
senderMetrics = metrics.NewWithRegistry(c.registry, dev.CompanyID, dev.ID, c.program, c.version)
}

agg, err := agg.NewAgg(time.Second, dev.MaxFlowRate, senderMetrics)
if err != nil {
return nil, fmt.Errorf("agg setup error: %s", err)
}

sender := newSender(c.flow, c.timeout)
sender.Errors = errors
sender.sample = c.sample
sender.Metrics = metrics
sender.Metrics = senderMetrics

if c.capture.Device != "" {
nif, err := net.InterfaceByName(c.capture.Device)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/robfig/cron v0.0.0-20160927164231-9585fd555638
github.com/stretchr/testify v1.8.0
github.com/tinylib/msgp v1.1.6
go.uber.org/goleak v1.3.0
zombiezen.com/go/capnproto2 v2.18.2+incompatible
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,8 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down
88 changes: 88 additions & 0 deletions lib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package libkflow_test

import (
"net/url"
"sync/atomic"
"testing"
"time"

"github.com/kentik/kit/go/legacy/go-metrics"
"github.com/kentik/libkflow"
"github.com/kentik/libkflow/api"
"github.com/kentik/libkflow/api/test"
metrics2 "github.com/kentik/libkflow/metrics"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)

func TestNewSenderWithDeviceID(t *testing.T) {
Expand Down Expand Up @@ -49,6 +54,74 @@ func TestNewSenderWithDeviceName(t *testing.T) {
assert.Nil(err)
}

func TestNewSenderWithDeviceNameLeaks(t *testing.T) {

client, server, device, err := test.NewClientServer()
if err != nil {
t.Fatal(err)
}
assert := assert.New(t)

apiurl = server.URL(test.API)
flowurl = server.URL(test.FLOW)
metricsurl = server.URL(test.TSDB)

email = client.Email
token = client.Token

errors := make(chan error, 100)
config := libkflow.NewConfig(email, token, "test", "0.0.1")
config.OverrideURLs(apiurl, flowurl, metricsurl)

l := stubLeveledLogger{}

registry := metrics.NewRegistry()
metrics2.StartWithSetConf(registry, &l, metricsurl.String(), email, token, "chf")
config.OverrideRegistry(registry)

// kick off the tick go routines from the go metrics library
// these are only started once per process
_ = metrics.NewMeter()

ignore := goleak.IgnoreCurrent()

s, err := libkflow.NewSenderWithDeviceName(device.Name, errors, config)
assert.NoError(err)
assert.NotNil(s)
s.Stop(time.Second)
s.Metrics.Unregister()

s, err = libkflow.NewSenderWithDeviceName(device.Name, errors, config)
assert.NoError(err)
assert.NotNil(s)
s.Stop(time.Second)
s.Metrics.Unregister()

s, err = libkflow.NewSenderWithDeviceName(device.Name, errors, config)
assert.NoError(err)
assert.NotNil(s)
s.Stop(time.Second)
s.Metrics.Unregister()

s, err = libkflow.NewSenderWithDeviceName(device.Name, errors, config)
assert.NoError(err)
assert.NotNil(s)
s.Stop(time.Second)
s.Metrics.Unregister()

s, err = libkflow.NewSenderWithDeviceName(device.Name, errors, config)
assert.NoError(err)
assert.NotNil(s)
s.Stop(time.Second)
s.Metrics.Unregister()

server.Close()

time.Sleep(time.Second)

goleak.VerifyNone(t, ignore)
}

func TestNewSenderFromDevice(t *testing.T) {
dev, assert := setupLibTest(t)

Expand Down Expand Up @@ -101,3 +174,18 @@ var (
email string
token string
)

type stubLogger struct {
count uint32
}

func (s *stubLogger) Printf(string, ...interface{}) { atomic.AddUint32(&s.count, 1) }

type stubLeveledLogger struct {
count uint32
}

func (s *stubLeveledLogger) Errorf(string, string, ...interface{}) { atomic.AddUint32(&s.count, 1) }
func (s *stubLeveledLogger) Infof(string, string, ...interface{}) { atomic.AddUint32(&s.count, 1) }
func (s *stubLeveledLogger) Debugf(string, string, ...interface{}) { atomic.AddUint32(&s.count, 1) }
func (s *stubLeveledLogger) Warnf(string, string, ...interface{}) { atomic.AddUint32(&s.count, 1) }
29 changes: 25 additions & 4 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ type Metrics struct {
}

func New(companyID int, deviceID int, program, version string) *Metrics {
// libkflow creates its own go-metrics Registry, which hold only its
// own metrics (or ones that its clients create with
reg := metrics.NewRegistry()
return NewWithRegistry(reg, companyID, deviceID, program, version)
}

// NewWithRegistry returns a new Metrics but allows a specific registry to be used rather than creating a new one
func NewWithRegistry(reg metrics.Registry, companyID int, deviceID int, program, version string) *Metrics {
name := func(key string) string {
return fmt.Sprintf("client_%s", key)
}
Expand All @@ -45,10 +53,6 @@ func New(companyID int, deviceID int, program, version string) *Metrics {
"did": strconv.Itoa(deviceID),
}

// libkflow creates its own go-metrics Registry, which hold only its
// own metrics (or ones that its clients create with
reg := metrics.NewRegistry()

return &Metrics{
reg: reg,
TotalFlowsIn: metrics.GetOrRegisterMeter(name("Total"), reg),
Expand All @@ -61,6 +65,10 @@ func New(companyID int, deviceID int, program, version string) *Metrics {
}
}

func StartWithSetConf(registry metrics.Registry, logger cmetrics.Logger, url, email, token string, prefix string) {
cmetrics.SetConfWithRegistry(url, logger, prefix, "chf", nil, nil, &email, &token, registry)
}

func (m *Metrics) Start(url, email, token string, prefix string, interval time.Duration, proxy *url.URL) {
proxyURL := ""
if proxy != nil {
Expand All @@ -83,6 +91,19 @@ func (m *Metrics) Start(url, email, token string, prefix string, interval time.D
})
}

func (m *Metrics) Unregister() {
name := func(key string) string {
return fmt.Sprintf("client_%s", key)
}

m.reg.Unregister(name("Total"))
m.reg.Unregister(name("DownsampleFPS"))
m.reg.Unregister(name("OrigSampleRate"))
m.reg.Unregister(name("NewSampleRate"))
m.reg.Unregister(name("RateLimitDrops"))
m.reg.Unregister(name("BytesSent"))
}

func NewMeter() metrics.Meter {
return metrics.NewMeter()
}
Expand Down
Loading

0 comments on commit 4515751

Please sign in to comment.