Skip to content

Commit 1eae992

Browse files
authored
Add buffered exporter for IPFIX messages (#409)
SendDataRecords was added in #391 as a way to export data records more efficiently. However, this method is not convenient to use for clients which need to mutate the records after sending them (as is the case when aggregation is used). To work around this issue, we introduce a buffered exporter. The buffered exporter wraps the standard exporter process. Whenever a data record is added, it is serialized to a byte buffer immediately, and the buffer is sent when it reaches the maximum message size. Note that for convenience the buffered exporter also supports JSON records, but in this case it just acts as a passthrough to the regular ("unbuffered") exporter process. Signed-off-by: Antonin Bas <[email protected]>
1 parent c11d299 commit 1eae992

File tree

2 files changed

+354
-0
lines changed

2 files changed

+354
-0
lines changed

pkg/exporter/buffered.go

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// Copyright 2025 VMware, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package exporter
16+
17+
import (
18+
"bytes"
19+
"encoding/binary"
20+
"fmt"
21+
"time"
22+
23+
"github.com/vmware/go-ipfix/pkg/entities"
24+
)
25+
26+
// BufferedIPFIXExporter wraps an ExportingProcess instance and supports buffering data records
27+
// before sending them. BufferedIPFIXExporter is not safe for usage by multiple goroutines. There
28+
// should be a single BufferedIPFIXExporter created for a given ExportingProcess.
29+
// While the BufferedIPFIXExporter supports sending JSON records, in that case it mostly acts as a
30+
// passthrough to the underlying ExportingProcess (no actual buffering).
31+
type BufferedIPFIXExporter struct {
32+
ep *ExportingProcess
33+
templateSet entities.Set
34+
// maps templateID to the corresponding buffer for data records. Note that entries are never
35+
// deleted from this map.
36+
messages map[uint16]*bufferedMessage
37+
jsonBuffer bytes.Buffer
38+
}
39+
40+
type bufferedMessage struct {
41+
ep *ExportingProcess
42+
templateID uint16
43+
buffer []byte
44+
numRecords int
45+
}
46+
47+
func newBufferedMessage(ep *ExportingProcess, templateID uint16) *bufferedMessage {
48+
m := &bufferedMessage{
49+
ep: ep,
50+
templateID: templateID,
51+
buffer: make([]byte, 0, ep.maxMsgSize),
52+
numRecords: 0,
53+
}
54+
m.reset()
55+
return m
56+
}
57+
58+
// NewBufferedIPFIXExporter creates a BufferedIPFIXExporter .
59+
func NewBufferedIPFIXExporter(ep *ExportingProcess) *BufferedIPFIXExporter {
60+
bufferedExporter := &BufferedIPFIXExporter{
61+
ep: ep,
62+
templateSet: entities.NewSet(false),
63+
}
64+
if !ep.sendJSONRecord {
65+
bufferedExporter.messages = make(map[uint16]*bufferedMessage)
66+
}
67+
return bufferedExporter
68+
}
69+
70+
func (e *BufferedIPFIXExporter) addTemplateRecord(record entities.Record) error {
71+
e.templateSet.ResetSet()
72+
e.templateSet.PrepareSet(entities.Template, entities.TemplateSetID)
73+
// It's important to use the method from ExporterProcess, for template management purposes.
74+
_, err := e.ep.SendSet(e.templateSet)
75+
return err
76+
}
77+
78+
func (e *BufferedIPFIXExporter) addDataRecord(record entities.Record) error {
79+
templateID := record.GetTemplateID()
80+
m, ok := e.messages[templateID]
81+
if ok {
82+
return m.addRecord(record)
83+
}
84+
m = newBufferedMessage(e.ep, templateID)
85+
e.messages[templateID] = m
86+
return m.addRecord(record)
87+
}
88+
89+
// AddRecord adds a record to be sent to the destination collector. If it is a template record, then
90+
// it will be sent to the collector right away. If it is a data record, it will be added to the
91+
// buffer. If adding the record to the buffer would cause the buffer length to exceed the max
92+
// message size, the buffer is flushed first. Note that because data records are serialized to the
93+
// buffer immediately, it is safe for the provided record to be mutated as soon as this function
94+
// returns.
95+
func (e *BufferedIPFIXExporter) AddRecord(record entities.Record) error {
96+
recordType := record.GetRecordType()
97+
if recordType == entities.Template {
98+
// We don't send templates for JSON records
99+
if e.ep.sendJSONRecord {
100+
return nil
101+
}
102+
return e.addTemplateRecord(record)
103+
} else if recordType == entities.Data {
104+
if e.ep.sendJSONRecord {
105+
_, _, err := e.ep.createAndSendJSONRecords([]entities.Record{record}, &e.jsonBuffer)
106+
return err
107+
}
108+
return e.addDataRecord(record)
109+
}
110+
return fmt.Errorf("invalid record type: %v", recordType)
111+
}
112+
113+
// Flush sends all buffered data records immediately.
114+
func (e *BufferedIPFIXExporter) Flush() error {
115+
if e.ep.sendJSONRecord {
116+
return nil
117+
}
118+
for _, m := range e.messages {
119+
if err := m.flush(); err != nil {
120+
return err
121+
}
122+
}
123+
return nil
124+
}
125+
126+
func (m *bufferedMessage) addRecord(record entities.Record) error {
127+
recordLength := record.GetRecordLength()
128+
if len(m.buffer)+recordLength > m.ep.maxMsgSize {
129+
if m.numRecords == 0 {
130+
return fmt.Errorf("record is too big to fit into single message")
131+
}
132+
if _, err := m.sendMessage(); err != nil {
133+
return err
134+
}
135+
}
136+
var err error
137+
m.buffer, err = record.AppendToBuffer(m.buffer)
138+
if err != nil {
139+
return err
140+
}
141+
m.numRecords += 1
142+
return nil
143+
}
144+
145+
func (m *bufferedMessage) flush() error {
146+
if m.numRecords == 0 {
147+
return nil
148+
}
149+
_, err := m.sendMessage()
150+
return err
151+
}
152+
153+
func (m *bufferedMessage) reset() {
154+
const headerLength = entities.MsgHeaderLength + entities.SetHeaderLen
155+
m.buffer = m.buffer[:headerLength]
156+
m.numRecords = 0
157+
}
158+
159+
func encodeMessageHeader(buf []byte, version, length uint16, exportTime, seqNumber, obsDomainID uint32) {
160+
bigEndian := binary.BigEndian
161+
bigEndian.PutUint16(buf, version)
162+
bigEndian.PutUint16(buf[2:], length)
163+
bigEndian.PutUint32(buf[4:], exportTime)
164+
bigEndian.PutUint32(buf[8:], seqNumber)
165+
bigEndian.PutUint32(buf[12:], obsDomainID)
166+
}
167+
168+
func encodeSetHeader(buf []byte, templateID, length uint16) {
169+
bigEndian := binary.BigEndian
170+
bigEndian.PutUint16(buf, templateID)
171+
bigEndian.PutUint16(buf[2:], length)
172+
}
173+
174+
func (m *bufferedMessage) sendMessage() (int, error) {
175+
now := time.Now()
176+
m.ep.seqNumber = m.ep.seqNumber + uint32(m.numRecords)
177+
msgLen := len(m.buffer)
178+
encodeMessageHeader(m.buffer, 10, uint16(msgLen), uint32(now.Unix()), m.ep.seqNumber, m.ep.obsDomainID)
179+
encodeSetHeader(m.buffer[entities.MsgHeaderLength:], m.templateID, uint16(msgLen-entities.MsgHeaderLength))
180+
n, err := m.ep.connToCollector.Write(m.buffer)
181+
if err != nil {
182+
return n, err
183+
}
184+
m.reset()
185+
return n, nil
186+
}

pkg/exporter/buffered_test.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Copyright 2025 VMware, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package exporter
16+
17+
import (
18+
"net"
19+
"testing"
20+
"time"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
25+
"github.com/vmware/go-ipfix/pkg/entities"
26+
"github.com/vmware/go-ipfix/pkg/registry"
27+
)
28+
29+
func TestBufferedExporter(t *testing.T) {
30+
// Create local server for testing
31+
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
32+
require.NoError(t, err)
33+
conn, err := net.ListenUDP("udp", udpAddr)
34+
require.NoError(t, err)
35+
t.Log("Created local server on random available port for testing")
36+
37+
receivedLengthsCh := make(chan int, 10)
38+
go func() {
39+
defer conn.Close()
40+
b := make([]byte, 512)
41+
for {
42+
n, err := conn.Read(b)
43+
if err != nil {
44+
return
45+
}
46+
receivedLengthsCh <- n
47+
}
48+
}()
49+
50+
// Create exporter using local server info
51+
input := ExporterInput{
52+
CollectorAddress: conn.LocalAddr().String(),
53+
CollectorProtocol: conn.LocalAddr().Network(),
54+
ObservationDomainID: 1,
55+
MaxMsgSize: 512,
56+
}
57+
exporter, err := InitExportingProcess(input)
58+
require.NoError(t, err)
59+
t.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String())
60+
defer exporter.CloseConnToCollector()
61+
62+
bufferedExporter := NewBufferedIPFIXExporter(exporter)
63+
64+
// Create template record with two fields
65+
templateID := exporter.NewTemplateID()
66+
elements := make([]entities.InfoElementWithValue, 0)
67+
ieSrc, err := registry.GetInfoElement("sourceIPv4Address", registry.IANAEnterpriseID)
68+
require.NoError(t, err, "Did not find the element with name sourceIPv4Address")
69+
elements = append(elements, entities.NewIPAddressInfoElement(ieSrc, nil))
70+
ieDst, err := registry.GetInfoElement("destinationIPv4Address", registry.IANAEnterpriseID)
71+
require.NoError(t, err, "Did not find the element with name destinationIPv4Address")
72+
elements = append(elements, entities.NewIPAddressInfoElement(ieDst, nil))
73+
template := entities.NewTemplateRecordFromElements(templateID, elements, false)
74+
75+
require.NoError(t, bufferedExporter.AddRecord(template))
76+
select {
77+
case <-receivedLengthsCh:
78+
break
79+
case <-time.After(100 * time.Millisecond):
80+
require.Fail(t, "Expected template not received")
81+
}
82+
83+
record := func() entities.Record {
84+
elements := []entities.InfoElementWithValue{
85+
entities.NewIPAddressInfoElement(ieSrc, net.ParseIP("1.2.3.4")),
86+
entities.NewIPAddressInfoElement(ieDst, net.ParseIP("5.6.7.8")),
87+
}
88+
return entities.NewDataRecordFromElements(templateID, elements, false)
89+
}()
90+
// Each record will be 8B. The message size has been set to 512B above.
91+
// The overheade per message is 16 (message header) + 4 (set header).
92+
// So we can fit 61 records per message.
93+
// If we send 200 records, we will need 4 messages.
94+
for range 200 {
95+
require.Equal(t, 8, record.GetRecordLength()) // sanity check
96+
require.NoError(t, bufferedExporter.AddRecord(record))
97+
}
98+
require.NoError(t, bufferedExporter.Flush())
99+
100+
timerCh := time.After(100 * time.Millisecond)
101+
for _, expectedBytesReceived := range []int{508, 508, 508, 156} {
102+
select {
103+
case bytesReceived := <-receivedLengthsCh:
104+
assert.Equal(t, expectedBytesReceived, bytesReceived)
105+
case <-timerCh:
106+
require.Fail(t, "Expected message not received")
107+
}
108+
}
109+
}
110+
111+
func BenchmarkBufferedExporter(b *testing.B) {
112+
// Create local server for testing
113+
udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
114+
require.NoError(b, err)
115+
conn, err := net.ListenUDP("udp", udpAddr)
116+
require.NoError(b, err)
117+
118+
go func() {
119+
defer conn.Close()
120+
b := make([]byte, 512)
121+
for {
122+
if _, err := conn.Read(b); err != nil {
123+
return
124+
}
125+
}
126+
}()
127+
128+
// Create exporter using local server info
129+
input := ExporterInput{
130+
CollectorAddress: conn.LocalAddr().String(),
131+
CollectorProtocol: conn.LocalAddr().Network(),
132+
ObservationDomainID: 1,
133+
MaxMsgSize: 512,
134+
}
135+
exporter, err := InitExportingProcess(input)
136+
require.NoError(b, err)
137+
b.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String())
138+
defer exporter.CloseConnToCollector()
139+
140+
bufferedExporter := NewBufferedIPFIXExporter(exporter)
141+
142+
// Create template record with two fields
143+
templateID := exporter.NewTemplateID()
144+
elements := make([]entities.InfoElementWithValue, 0)
145+
ieSrc, err := registry.GetInfoElement("sourceIPv4Address", registry.IANAEnterpriseID)
146+
require.NoError(b, err, "Did not find the element with name sourceIPv4Address")
147+
elements = append(elements, entities.NewIPAddressInfoElement(ieSrc, nil))
148+
ieDst, err := registry.GetInfoElement("destinationIPv4Address", registry.IANAEnterpriseID)
149+
require.NoError(b, err, "Did not find the element with name destinationIPv4Address")
150+
elements = append(elements, entities.NewIPAddressInfoElement(ieDst, nil))
151+
template := entities.NewTemplateRecordFromElements(templateID, elements, false)
152+
153+
require.NoError(b, bufferedExporter.AddRecord(template))
154+
155+
record := func() entities.Record {
156+
elements := []entities.InfoElementWithValue{
157+
entities.NewIPAddressInfoElement(ieSrc, net.ParseIP("1.2.3.4")),
158+
entities.NewIPAddressInfoElement(ieDst, net.ParseIP("5.6.7.8")),
159+
}
160+
return entities.NewDataRecordFromElements(templateID, elements, false)
161+
}()
162+
163+
b.ResetTimer()
164+
165+
for range b.N {
166+
require.NoError(b, bufferedExporter.AddRecord(record))
167+
}
168+
}

0 commit comments

Comments
 (0)