Skip to content

Commit

Permalink
use device ID from API call
Browse files Browse the repository at this point in the history
  • Loading branch information
wg committed Nov 21, 2016
1 parent e413e32 commit 09adee4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/github.com/kentik/libkflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func kflowSend(cflow *C.kflow) C.int {

for i, c := range customs {
name := C.GoString(c.name)
id, ok := sender.Customs[name]
id, ok := sender.Device.Customs[name]
if !ok {
return C.EKFLOWNOCUSTOM
}
Expand Down
5 changes: 3 additions & 2 deletions src/github.com/kentik/libkflow/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Sender struct {
Timeout time.Duration
Client *api.Client
Verbose int
Customs api.CustomColumns
Device *api.Device
workers sync.WaitGroup
}

Expand All @@ -42,7 +42,7 @@ func (s *Sender) Start(agg *agg.Agg, client *api.Client, device *api.Device, n i

s.Agg = agg
s.URL.RawQuery = q.Encode()
s.Customs = device.Customs
s.Device = device
s.Client = client
s.workers.Add(n)

Expand All @@ -62,6 +62,7 @@ func (s *Sender) Segment() *capnp.Segment {

func (s *Sender) Send(flow *chf.CHF) {
s.debug("sending flow to aggregator")
flow.SetDeviceId(uint32(s.Device.ID))
s.Agg.Add(flow)
}

Expand Down
18 changes: 18 additions & 0 deletions src/github.com/kentik/libkflow/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@ func TestSender(t *testing.T) {
assert.Equal(expected.String(), msgs.At(0).String())
}

func TestSenderFields(t *testing.T) {
sender, server, assert := setup(t)

expected, err := chf.NewCHF(sender.Segment())
if err != nil {
t.Fatal(err)
}
sender.Send(&expected)

msgs, err := (<-server.Flows()).Msgs()
if err != nil {
t.Fatal(err)
}
actual := msgs.At(0)

assert.EqualValues(sender.Device.ID, actual.DeviceId())
}

func TestSenderStop(t *testing.T) {
sender, _, assert := setup(t)
stopped := sender.Stop(100 * time.Millisecond)
Expand Down

0 comments on commit 09adee4

Please sign in to comment.