Skip to content

Commit

Permalink
Add functionality to return an internally managed error channel when …
Browse files Browse the repository at this point in the history
…creating a Sender
  • Loading branch information
ktkenny committed Jan 23, 2024
1 parent a802799 commit b021bdb
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 29 deletions.
48 changes: 34 additions & 14 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@ import (

// Config describes the libkflow configuration.
type Config struct {
email string
token string
capture Capture
proxy *url.URL
api *url.URL
flow *url.URL
metrics *url.URL
sample int
timeout time.Duration
retries int
logger interface{}
program string
version string
registry go_metrics.Registry
email string
token string
capture Capture
proxy *url.URL
api *url.URL
flow *url.URL
metrics *url.URL
sample int
timeout time.Duration
retries int
logger interface{}
program string
version string
registry go_metrics.Registry
useInternalErrors bool

metricsPrefix string
metricsInterval time.Duration
Expand Down Expand Up @@ -154,6 +155,10 @@ func (c *Config) OverrideRegistry(registry go_metrics.Registry) {
c.registry = registry
}

func (c *Config) WithInternalErrors() {
c.useInternalErrors = true
}

func (c *Config) client() *api.Client {
return api.NewClient(api.ClientConfig{
Email: c.email,
Expand All @@ -167,6 +172,21 @@ func (c *Config) client() *api.Client {
})
}

func (c *Config) startWithInternalErrors(client *api.Client, dev *api.Device) (*Sender, <-chan error, error) {
errChan := make(chan error)
sender, err := c.start(client, dev, errChan)
if err != nil {
close(errChan)
return nil, nil, err
}

if c.useInternalErrors {
sender.useInternalErrors = true
}

return sender, errChan, nil
}

func (c *Config) start(client *api.Client, dev *api.Device, errors chan<- error) (*Sender, error) {
if c.metricsInterval == 0 {
c.metricsInterval = 60 * time.Second
Expand Down
23 changes: 23 additions & 0 deletions lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ func NewSenderWithDeviceName(name string, errors chan<- error, cfg *Config) (*Se
return cfg.start(client, d, errors)
}

// NewSenderWithDeviceNameWithErrors creates a new flow Sender given a device name address and Config.
// The returned error channel is managed internally and will be closed after Sender.Stop() is called.
// If the timeout of Sender.Stop() is reached, it will return before the internal error channel is closed
func NewSenderWithDeviceNameWithErrors(name string, cfg *Config) (*Sender, <-chan error, error) {
client := cfg.client()
d, err := lookupdev(client.GetDeviceByName(name))
if err != nil {
return nil, nil, err
}

return cfg.startWithInternalErrors(client, d)
}

// NewSenderWithNewDevice creates a new device given device creation parameters,
// and then creates a new flow Sender with that device, the error channel, and
// the Config.
Expand All @@ -80,6 +93,16 @@ func NewSenderWithNewDevice(dev *api.DeviceCreate, errors chan<- error, cfg *Con
return cfg.start(client, d, errors)
}

func NewSenderWithNewDeviceWithErrors(dev *api.DeviceCreate, cfg *Config) (*Sender, <-chan error, error) {
client := cfg.client()
d, err := client.CreateDevice(dev)
if err != nil {
return nil, nil, err
}

return cfg.startWithInternalErrors(client, d)
}

func NewSenderWithNewSiteAndDevice(siteAndDevice *api.SiteAndDeviceCreate, errors chan<- error, cfg *Config) (*Sender, error) {
client := cfg.client()
d, err := client.CreateDeviceAndSite(siteAndDevice)
Expand Down
168 changes: 167 additions & 1 deletion lib_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package libkflow_test

import (
"net/http"
"net/http/httptest"
"net/url"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -10,6 +13,7 @@ import (
"github.com/kentik/libkflow"
"github.com/kentik/libkflow/api"
"github.com/kentik/libkflow/api/test"
"github.com/kentik/libkflow/flow"
metrics2 "github.com/kentik/libkflow/metrics"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
Expand Down Expand Up @@ -54,8 +58,170 @@ func TestNewSenderWithDeviceName(t *testing.T) {
assert.Nil(err)
}

func TestNewSenderWithDeviceNameLeaks(t *testing.T) {
func TestNewSenderWithDeviceNameWithErrors_NoErrs(t *testing.T) {
client, server, device, err := test.NewClientServer()
if err != nil {
t.Fatal(err)
}

apiurl = server.URL(test.API)
flowurl = server.URL(test.FLOW)
metricsurl = server.URL(test.TSDB)

email = client.Email
token = client.Token

config := libkflow.NewConfig(email, token, "test", "0.0.1")
config.OverrideURLs(apiurl, flowurl, metricsurl)

l := stubLeveledLogger{}

registry := metrics.NewRegistry()
metrics2.StartWithSetConf(registry, &l, metricsurl.String(), email, token, "chf")
config.OverrideRegistry(registry)
config.WithInternalErrors()

s, errors, err := libkflow.NewSenderWithDeviceNameWithErrors(device.Name, config)
assert.NoError(t, err)

errorsFromChan := make([]error, 0)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for err := range errors {
errorsFromChan = append(errorsFromChan, err)
}
wg.Done()
}()

for i := 0; i < 5; i++ {
s.Send(&flow.Flow{
TimestampNano: time.Now().UnixNano(),
})
}

s.Stop(time.Second)

wg.Wait()

assert.Len(t, errorsFromChan, 0)
}

func TestNewSenderWithDeviceNameWithErrors_WithErrs(t *testing.T) {
client, server, device, err := test.NewClientServer()
if err != nil {
t.Fatal(err)
}

flowServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(400)
}))

apiurl = server.URL(test.API)
flowurl = server.URL(flowServer.URL)
metricsurl = server.URL(test.TSDB)

email = client.Email
token = client.Token

config := libkflow.NewConfig(email, token, "test", "0.0.1")
config.OverrideURLs(apiurl, flowurl, metricsurl)

l := stubLeveledLogger{}

registry := metrics.NewRegistry()
metrics2.StartWithSetConf(registry, &l, metricsurl.String(), email, token, "chf")
config.OverrideRegistry(registry)
config.WithInternalErrors()

s, errors, err := libkflow.NewSenderWithDeviceNameWithErrors(device.Name, config)
assert.NoError(t, err)

errorsFromChan := make([]error, 0)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
for err := range errors {
errorsFromChan = append(errorsFromChan, err)
}
wg.Done()
}()

s.Send(&flow.Flow{
TimestampNano: time.Now().UnixNano(),
})

s.Stop(time.Second)

wg.Wait()

assert.Len(t, errorsFromChan, 1)
}

func TestNewSenderWithDeviceName_WithErrs_NoPanic(t *testing.T) {
client, server, device, err := test.NewClientServer()
if err != nil {
t.Fatal(err)
}

flowServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(400)
time.Sleep(time.Second)
}))

apiurl = server.URL(test.API)
flowurl = server.URL(flowServer.URL)
metricsurl = server.URL(test.TSDB)

email = client.Email
token = client.Token

config := libkflow.NewConfig(email, token, "test", "0.0.1")
config.OverrideURLs(apiurl, flowurl, metricsurl)

l := stubLeveledLogger{}

registry := metrics.NewRegistry()
metrics2.StartWithSetConf(registry, &l, metricsurl.String(), email, token, "chf")
config.OverrideRegistry(registry)

errors := make(chan error)

s, err := libkflow.NewSenderWithDeviceName(device.Name, errors, config)
assert.NoError(t, err)

errorsFromChan := make([]error, 0)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-time.After(time.Second):
return
case err := <-errors:
errorsFromChan = append(errorsFromChan, err)
}
}
}()

for i := 0; i < 100000; i++ {
s.Send(&flow.Flow{
TimestampNano: time.Now().UnixNano(),
})
}

s.Stop(time.Second * 0)

wg.Wait()

assert.Len(t, errorsFromChan, 1)
}

func TestNewSenderWithDeviceNameLeaks(t *testing.T) {
client, server, device, err := test.NewClientServer()
if err != nil {
t.Fatal(err)
Expand Down
32 changes: 18 additions & 14 deletions send.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ import (

// A Sender aggregates and transmits flow information to Kentik.
type Sender struct {
agg *agg.Agg
exit chan struct{}
url *url.URL
timeout time.Duration
client *api.Client
sample int
ticker *time.Ticker
tickerCtx context.Context
tickerCancelFunc context.CancelFunc
workers sync.WaitGroup
dns chan []byte
Device *api.Device
Errors chan<- error
Metrics *metrics.Metrics
agg *agg.Agg
exit chan struct{}
url *url.URL
timeout time.Duration
client *api.Client
sample int
ticker *time.Ticker
tickerCtx context.Context
tickerCancelFunc context.CancelFunc
workers sync.WaitGroup
dns chan []byte
Device *api.Device
Errors chan<- error
useInternalErrors bool
Metrics *metrics.Metrics
}

func newSender(url *url.URL, timeout time.Duration) *Sender {
Expand Down Expand Up @@ -196,6 +197,9 @@ func (s *Sender) monitor() {
s.ticker.Stop()
s.tickerCancelFunc()
s.Metrics.Unregister()
if s.useInternalErrors {
close(s.Errors)
}
s.exit <- struct{}{}
log.Debugf("sender stopped")
return
Expand Down

0 comments on commit b021bdb

Please sign in to comment.