Skip to content

Commit 5a76f35

Browse files
committed
Move SendFlows to separate struct for more obvious usage
1 parent 75fd301 commit 5a76f35

5 files changed

Lines changed: 148 additions & 124 deletions

File tree

config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (c *Config) start(client *api.Client, dev *api.Device, errors chan<- error)
198198
return nil, fmt.Errorf("agg setup error: %s", err)
199199
}
200200

201-
sender := newSender(c.flow, c.timeout)
201+
sender := newSender(c.flow)
202202
sender.Errors = errors
203203
sender.sample = c.sample
204204
sender.Metrics = senderMetrics

direct_sender.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package libkflow
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"fmt"
7+
"net/url"
8+
9+
capnp "zombiezen.com/go/capnproto2"
10+
11+
"github.com/kentik/libkflow/api"
12+
"github.com/kentik/libkflow/flow"
13+
)
14+
15+
// messagePrefix is an 80-byte prefix for the message header when sending kflow to the Kentik API. This is a deprecated
16+
// header, but the bytes must remain for backwards compatibility with the Kentik API.
17+
var messagePrefix = [80]byte{}
18+
19+
// A DirectFlowSender transmits flows to Kentik.
20+
type DirectFlowSender struct {
21+
client *api.Client
22+
23+
url string
24+
deviceID uint32
25+
}
26+
27+
// NewDirectSender creates a new DirectFlowSender.
28+
func NewDirectSender(client *api.Client, device *api.Device, url *url.URL) *DirectFlowSender {
29+
return &DirectFlowSender{
30+
client: client,
31+
32+
url: createURLString(*url, device.ClientID()),
33+
deviceID: uint32(device.ID),
34+
}
35+
}
36+
37+
// SendFlows sends the flows to the Kentik API, returning the number of bytes sent as the payload. The device ID on
38+
// the flows is set to the device ID of the sender, regardless of what it was previously set to. This is to ensure all
39+
// data matches the expectations of the downstream URL/API.
40+
//
41+
// This will directly send the slice of flows without any additional downsampling or rate limiting. This does not
42+
// contribute to the underlying Send call.
43+
func (s *DirectFlowSender) SendFlows(flows []flow.Flow) (int64, error) {
44+
if len(flows) == 0 {
45+
return 0, nil
46+
}
47+
// ensure all flows have the device ID set; otherwise it may not be properly queried
48+
for i := range flows {
49+
flows[i].DeviceId = s.deviceID
50+
}
51+
52+
// ensure the sample rate is matching the kentik api expectations
53+
flow.NormalizeSampleRate(flows, 0)
54+
55+
// serialize the data
56+
_, segment, err := capnp.NewMessage(capnp.SingleSegment(nil))
57+
if err != nil {
58+
return 0, fmt.Errorf("failed to create capn proto segment: %w", err)
59+
}
60+
message, err := flow.ToCapnProtoMessage(flows, segment)
61+
if err != nil {
62+
return 0, fmt.Errorf("failed to convert flows to capn proto: %w", err)
63+
}
64+
65+
// write the data with additional gzip compression
66+
buf := &bytes.Buffer{}
67+
z := gzip.NewWriter(buf)
68+
_, err = z.Write(messagePrefix[:])
69+
if err != nil {
70+
return 0, fmt.Errorf("failed to write empty message header: %w", err)
71+
}
72+
err = capnp.NewPackedEncoder(z).Encode(message)
73+
if err != nil {
74+
return 0, fmt.Errorf("failed to encode packed capn proto message: %w", err)
75+
}
76+
err = z.Close()
77+
if err != nil {
78+
return 0, fmt.Errorf("failed to close gzip writer: %w", err)
79+
}
80+
81+
// send the compressed and packed message to the Kentik API
82+
payloadLength := int64(len(buf.Bytes()))
83+
err = s.client.SendFlow(s.url, buf)
84+
if err != nil {
85+
return 0, err
86+
}
87+
return payloadLength, nil
88+
}
89+
90+
// createURLString creates the full URL string to use when sending data to the Kentik API.
91+
func createURLString(u url.URL, clientID string) string {
92+
q := u.Query()
93+
q.Set("sid", "0")
94+
q.Set("sender_id", clientID)
95+
u.RawQuery = q.Encode()
96+
return u.String()
97+
}

direct_sender_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package libkflow
2+
3+
import (
4+
"math/rand"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/kentik/libkflow/api/test"
11+
"github.com/kentik/libkflow/flow"
12+
)
13+
14+
func TestDirectFlowSender_SendFlows(t *testing.T) {
15+
client, server, device, err := test.NewClientServer()
16+
require.NoError(t, err)
17+
url := server.URL(test.FLOW)
18+
19+
directSender := NewDirectSender(client, device, url)
20+
21+
expected1 := flow.Flow{
22+
DeviceId: uint32(device.ID),
23+
SrcAs: rand.Uint32(),
24+
DstAs: rand.Uint32(),
25+
SampleAdj: true,
26+
}
27+
expected2 := flow.Flow{
28+
DeviceId: uint32(device.ID),
29+
SrcAs: rand.Uint32(),
30+
DstAs: rand.Uint32(),
31+
SampleAdj: true,
32+
}
33+
34+
flows := []flow.Flow{expected1, expected2}
35+
n, err := directSender.SendFlows(flows)
36+
require.NoError(t, err)
37+
38+
msgs, err := receive(server)
39+
if err != nil {
40+
t.Fatal(err)
41+
}
42+
assert.Equal(t, len(flows), msgs.Len())
43+
assert.Greater(t, n, int64(0))
44+
assert.Equal(t, flowToCHF(expected1, t).String(), msgs.At(0).String())
45+
assert.Equal(t, flowToCHF(expected2, t).String(), msgs.At(1).String())
46+
}

send.go

Lines changed: 2 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,11 @@ import (
2020
"github.com/kentik/libkflow/metrics"
2121
)
2222

23-
// messagePrefix is an 80-byte prefix for the message header when sending kflow to the Kentik API. This is a deprecated
24-
// header, but the bytes must remain for backwards compatibility with the Kentik API.
25-
var messagePrefix = [80]byte{}
26-
2723
// A Sender aggregates and transmits flow information to Kentik.
2824
type Sender struct {
2925
agg *agg.Agg
3026
exit chan struct{}
3127
url *url.URL
32-
timeout time.Duration
3328
client *api.Client
3429
sample int
3530
ticker *time.Ticker
@@ -43,12 +38,11 @@ type Sender struct {
4338
Metrics *metrics.Metrics
4439
}
4540

46-
func newSender(url *url.URL, timeout time.Duration) *Sender {
41+
func newSender(url *url.URL) *Sender {
4742
tickerCtx, cancelFunc := context.WithCancel(context.Background())
4843
return &Sender{
4944
exit: make(chan struct{}),
5045
url: url,
51-
timeout: timeout,
5246
ticker: time.NewTicker(20 * time.Minute),
5347
tickerCtx: tickerCtx,
5448
tickerCancelFunc: cancelFunc,
@@ -62,80 +56,6 @@ func (s *Sender) Send(flow *flow.Flow) {
6256
s.agg.Add(flow)
6357
}
6458

65-
// SendFlows sends the flows to the Kentik API, returning the number of bytes sent as the payload. The device ID on
66-
// the flows is set to the device ID of the sender, regardless of what it was previously set to. This is to ensure all
67-
// data matches the expectations of the downstream URL/API.
68-
//
69-
// This will directly send the slice of flows without any additional downsampling or rate limiting. This does not
70-
// contribute to the underlying Send call.
71-
func (s *Sender) SendFlows(flows []flow.Flow) (int64, error) {
72-
s.workers.Add(1)
73-
defer s.workers.Done()
74-
75-
if s.Device == nil {
76-
return 0, fmt.Errorf("device not initialized")
77-
}
78-
if len(flows) == 0 {
79-
return 0, nil
80-
}
81-
decoratedURL, err := s.createURLString()
82-
if err != nil {
83-
return 0, fmt.Errorf("failed to create URL string: %w", err)
84-
}
85-
86-
if s.Metrics != nil {
87-
s.Metrics.TotalFlowsIn.Mark(int64(len(flows)))
88-
}
89-
90-
// ensure all flows have the device ID set; otherwise it may not be properly queried
91-
for i := range flows {
92-
flows[i].DeviceId = uint32(s.Device.ID)
93-
}
94-
95-
// ensure the sample rate is matching the kentik api expectations
96-
flow.NormalizeSampleRate(flows, 0)
97-
98-
// serialize the data
99-
_, segment, err := capnp.NewMessage(capnp.SingleSegment(nil))
100-
if err != nil {
101-
return 0, fmt.Errorf("failed to create capn proto segment: %w", err)
102-
}
103-
message, err := flow.ToCapnProtoMessage(flows, segment)
104-
if err != nil {
105-
return 0, fmt.Errorf("failed to convert flows to capn proto: %w", err)
106-
}
107-
108-
// write the data with additional gzip compression
109-
buf := &bytes.Buffer{}
110-
z := gzip.NewWriter(buf)
111-
_, err = z.Write(messagePrefix[:])
112-
if err != nil {
113-
return 0, fmt.Errorf("failed to write empty message header: %w", err)
114-
}
115-
err = capnp.NewPackedEncoder(z).Encode(message)
116-
if err != nil {
117-
return 0, fmt.Errorf("failed to encode packed capn proto message: %w", err)
118-
}
119-
err = z.Close()
120-
if err != nil {
121-
return 0, fmt.Errorf("failed to close gzip writer: %w", err)
122-
}
123-
124-
// send the compressed and packed message to the Kentik API
125-
payloadLength := int64(len(buf.Bytes()))
126-
err = s.client.SendFlow(decoratedURL, buf)
127-
if err != nil {
128-
return 0, err
129-
}
130-
131-
if s.Metrics != nil {
132-
s.Metrics.TotalFlowsOut.Mark(int64(len(flows)))
133-
s.Metrics.BytesSent.Mark(payloadLength)
134-
}
135-
136-
return payloadLength, nil
137-
}
138-
13959
// Stop requests a graceful shutdown of the Sender.
14060
func (s *Sender) Stop(wait time.Duration) bool {
14161
s.agg.Stop()
@@ -336,12 +256,5 @@ func (s *Sender) createURLString() (string, error) {
336256
if s.url == nil {
337257
return "", fmt.Errorf("url not initialized")
338258
}
339-
340-
// Create a new URL to avoid modifying the original, backed by the config
341-
u := *s.url
342-
q := u.Query()
343-
q.Set("sid", "0")
344-
q.Set("sender_id", s.Device.ClientID())
345-
u.RawQuery = q.Encode()
346-
return u.String(), nil
259+
return createURLString(*s.url, s.Device.ClientID()), nil
347260
}

send_test.go

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -42,38 +42,6 @@ func TestSender(t *testing.T) {
4242
assert.Equal(flowToCHF(expected, t).String(), msgs.At(0).String())
4343
}
4444

45-
func TestSender_SendFlows(t *testing.T) {
46-
sender, server, assert := setup(t)
47-
48-
expected1 := flow.Flow{
49-
DeviceId: uint32(sender.Device.ID),
50-
SrcAs: rand.Uint32(),
51-
DstAs: rand.Uint32(),
52-
SampleAdj: true,
53-
}
54-
expected2 := flow.Flow{
55-
DeviceId: uint32(sender.Device.ID),
56-
SrcAs: rand.Uint32(),
57-
DstAs: rand.Uint32(),
58-
SampleAdj: true,
59-
}
60-
61-
flows := []flow.Flow{expected1, expected2}
62-
n, err := sender.SendFlows(flows)
63-
assert.NoError(err)
64-
65-
msgs, err := receive(server)
66-
if err != nil {
67-
t.Fatal(err)
68-
}
69-
assert.Equal(int64(len(flows)), sender.Metrics.TotalFlowsIn.Count())
70-
assert.Equal(int64(len(flows)), sender.Metrics.TotalFlowsOut.Count())
71-
assert.Equal(n, sender.Metrics.BytesSent.Count())
72-
assert.Equal(len(flows), msgs.Len())
73-
assert.Equal(flowToCHF(expected1, t).String(), msgs.At(0).String())
74-
assert.Equal(flowToCHF(expected2, t).String(), msgs.At(1).String())
75-
}
76-
7745
func TestSenderStop(t *testing.T) {
7846
sender, _, assert := setup(t)
7947
stopped := sender.Stop(100 * time.Millisecond)
@@ -140,7 +108,7 @@ func setup(t testing.TB) (*Sender, *test.Server, *assert.Assertions) {
140108
server.Log.SetOutput(io.Discard)
141109

142110
url := server.URL(test.FLOW)
143-
sender := newSender(url, 1*time.Second)
111+
sender := newSender(url)
144112
sender.Metrics = metrics
145113
sender.start(agg, client, device, 1)
146114

@@ -226,7 +194,7 @@ func TestSender_createURLString(t *testing.T) {
226194
agg, err := agg.NewAgg(10*time.Millisecond, 100, m)
227195
require.NoError(t, err)
228196

229-
sender := newSender(u, 1*time.Second)
197+
sender := newSender(u)
230198
err = sender.start(agg, client, device, 1)
231199
require.NoError(t, err)
232200

0 commit comments

Comments
 (0)