diff --git a/src/github.com/kentik/libkflow/main.go b/src/github.com/kentik/libkflow/main.go index b44967d..65d844e 100644 --- a/src/github.com/kentik/libkflow/main.go +++ b/src/github.com/kentik/libkflow/main.go @@ -66,7 +66,7 @@ func kflowSend(cflow *C.kflow) C.int { for i, c := range customs { name := C.GoString(c.name) - id, ok := sender.Customs[name] + id, ok := sender.Device.Customs[name] if !ok { return C.EKFLOWNOCUSTOM } diff --git a/src/github.com/kentik/libkflow/send.go b/src/github.com/kentik/libkflow/send.go index 96d8632..ae1f752 100644 --- a/src/github.com/kentik/libkflow/send.go +++ b/src/github.com/kentik/libkflow/send.go @@ -20,7 +20,7 @@ type Sender struct { Timeout time.Duration Client *api.Client Verbose int - Customs api.CustomColumns + Device *api.Device workers sync.WaitGroup } @@ -42,7 +42,7 @@ func (s *Sender) Start(agg *agg.Agg, client *api.Client, device *api.Device, n i s.Agg = agg s.URL.RawQuery = q.Encode() - s.Customs = device.Customs + s.Device = device s.Client = client s.workers.Add(n) @@ -62,6 +62,7 @@ func (s *Sender) Segment() *capnp.Segment { func (s *Sender) Send(flow *chf.CHF) { s.debug("sending flow to aggregator") + flow.SetDeviceId(uint32(s.Device.ID)) s.Agg.Add(flow) } diff --git a/src/github.com/kentik/libkflow/send_test.go b/src/github.com/kentik/libkflow/send_test.go index ff91850..28bc49f 100644 --- a/src/github.com/kentik/libkflow/send_test.go +++ b/src/github.com/kentik/libkflow/send_test.go @@ -33,6 +33,24 @@ func TestSender(t *testing.T) { assert.Equal(expected.String(), msgs.At(0).String()) } +func TestSenderFields(t *testing.T) { + sender, server, assert := setup(t) + + expected, err := chf.NewCHF(sender.Segment()) + if err != nil { + t.Fatal(err) + } + sender.Send(&expected) + + msgs, err := (<-server.Flows()).Msgs() + if err != nil { + t.Fatal(err) + } + actual := msgs.At(0) + + assert.EqualValues(sender.Device.ID, actual.DeviceId()) +} + func TestSenderStop(t *testing.T) { sender, _, assert := setup(t) stopped := sender.Stop(100 * time.Millisecond)