diff --git a/pkg/exporter/buffered.go b/pkg/exporter/buffered.go new file mode 100644 index 0000000..315686c --- /dev/null +++ b/pkg/exporter/buffered.go @@ -0,0 +1,186 @@ +// Copyright 2025 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "bytes" + "encoding/binary" + "fmt" + "time" + + "github.com/vmware/go-ipfix/pkg/entities" +) + +// BufferedIPFIXExporter wraps an ExportingProcess instance and supports buffering data records +// before sending them. BufferedIPFIXExporter is not safe for usage by multiple goroutines. There +// should be a single BufferedIPFIXExporter created for a given ExportingProcess. +// While the BufferedIPFIXExporter supports sending JSON records, in that case it mostly acts as a +// passthrough to the underlying ExportingProcess (no actual buffering). +type BufferedIPFIXExporter struct { + ep *ExportingProcess + templateSet entities.Set + // maps templateID to the corresponding buffer for data records. Note that entries are never + // deleted from this map. + messages map[uint16]*bufferedMessage + jsonBuffer bytes.Buffer +} + +type bufferedMessage struct { + ep *ExportingProcess + templateID uint16 + buffer []byte + numRecords int +} + +func newBufferedMessage(ep *ExportingProcess, templateID uint16) *bufferedMessage { + m := &bufferedMessage{ + ep: ep, + templateID: templateID, + buffer: make([]byte, 0, ep.maxMsgSize), + numRecords: 0, + } + m.reset() + return m +} + +// NewBufferedIPFIXExporter creates a BufferedIPFIXExporter . +func NewBufferedIPFIXExporter(ep *ExportingProcess) *BufferedIPFIXExporter { + bufferedExporter := &BufferedIPFIXExporter{ + ep: ep, + templateSet: entities.NewSet(false), + } + if !ep.sendJSONRecord { + bufferedExporter.messages = make(map[uint16]*bufferedMessage) + } + return bufferedExporter +} + +func (e *BufferedIPFIXExporter) addTemplateRecord(record entities.Record) error { + e.templateSet.ResetSet() + e.templateSet.PrepareSet(entities.Template, entities.TemplateSetID) + // It's important to use the method from ExporterProcess, for template management purposes. + _, err := e.ep.SendSet(e.templateSet) + return err +} + +func (e *BufferedIPFIXExporter) addDataRecord(record entities.Record) error { + templateID := record.GetTemplateID() + m, ok := e.messages[templateID] + if ok { + return m.addRecord(record) + } + m = newBufferedMessage(e.ep, templateID) + e.messages[templateID] = m + return m.addRecord(record) +} + +// AddRecord adds a record to be sent to the destination collector. If it is a template record, then +// it will be sent to the collector right away. If it is a data record, it will be added to the +// buffer. If adding the record to the buffer would cause the buffer length to exceed the max +// message size, the buffer is flushed first. Note that because data records are serialized to the +// buffer immediately, it is safe for the provided record to be mutated as soon as this function +// returns. +func (e *BufferedIPFIXExporter) AddRecord(record entities.Record) error { + recordType := record.GetRecordType() + if recordType == entities.Template { + // We don't send templates for JSON records + if e.ep.sendJSONRecord { + return nil + } + return e.addTemplateRecord(record) + } else if recordType == entities.Data { + if e.ep.sendJSONRecord { + _, _, err := e.ep.createAndSendJSONRecords([]entities.Record{record}, &e.jsonBuffer) + return err + } + return e.addDataRecord(record) + } + return fmt.Errorf("invalid record type: %v", recordType) +} + +// Flush sends all buffered data records immediately. +func (e *BufferedIPFIXExporter) Flush() error { + if e.ep.sendJSONRecord { + return nil + } + for _, m := range e.messages { + if err := m.flush(); err != nil { + return err + } + } + return nil +} + +func (m *bufferedMessage) addRecord(record entities.Record) error { + recordLength := record.GetRecordLength() + if len(m.buffer)+recordLength > m.ep.maxMsgSize { + if m.numRecords == 0 { + return fmt.Errorf("record is too big to fit into single message") + } + if _, err := m.sendMessage(); err != nil { + return err + } + } + var err error + m.buffer, err = record.AppendToBuffer(m.buffer) + if err != nil { + return err + } + m.numRecords += 1 + return nil +} + +func (m *bufferedMessage) flush() error { + if m.numRecords == 0 { + return nil + } + _, err := m.sendMessage() + return err +} + +func (m *bufferedMessage) reset() { + const headerLength = entities.MsgHeaderLength + entities.SetHeaderLen + m.buffer = m.buffer[:headerLength] + m.numRecords = 0 +} + +func encodeMessageHeader(buf []byte, version, length uint16, exportTime, seqNumber, obsDomainID uint32) { + bigEndian := binary.BigEndian + bigEndian.PutUint16(buf, version) + bigEndian.PutUint16(buf[2:], length) + bigEndian.PutUint32(buf[4:], exportTime) + bigEndian.PutUint32(buf[8:], seqNumber) + bigEndian.PutUint32(buf[12:], obsDomainID) +} + +func encodeSetHeader(buf []byte, templateID, length uint16) { + bigEndian := binary.BigEndian + bigEndian.PutUint16(buf, templateID) + bigEndian.PutUint16(buf[2:], length) +} + +func (m *bufferedMessage) sendMessage() (int, error) { + now := time.Now() + m.ep.seqNumber = m.ep.seqNumber + uint32(m.numRecords) + msgLen := len(m.buffer) + encodeMessageHeader(m.buffer, 10, uint16(msgLen), uint32(now.Unix()), m.ep.seqNumber, m.ep.obsDomainID) + encodeSetHeader(m.buffer[entities.MsgHeaderLength:], m.templateID, uint16(msgLen-entities.MsgHeaderLength)) + n, err := m.ep.connToCollector.Write(m.buffer) + if err != nil { + return n, err + } + m.reset() + return n, nil +} diff --git a/pkg/exporter/buffered_test.go b/pkg/exporter/buffered_test.go new file mode 100644 index 0000000..1fe7229 --- /dev/null +++ b/pkg/exporter/buffered_test.go @@ -0,0 +1,168 @@ +// Copyright 2025 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/vmware/go-ipfix/pkg/entities" + "github.com/vmware/go-ipfix/pkg/registry" +) + +func TestBufferedExporter(t *testing.T) { + // Create local server for testing + udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + require.NoError(t, err) + conn, err := net.ListenUDP("udp", udpAddr) + require.NoError(t, err) + t.Log("Created local server on random available port for testing") + + receivedLengthsCh := make(chan int, 10) + go func() { + defer conn.Close() + b := make([]byte, 512) + for { + n, err := conn.Read(b) + if err != nil { + return + } + receivedLengthsCh <- n + } + }() + + // Create exporter using local server info + input := ExporterInput{ + CollectorAddress: conn.LocalAddr().String(), + CollectorProtocol: conn.LocalAddr().Network(), + ObservationDomainID: 1, + MaxMsgSize: 512, + } + exporter, err := InitExportingProcess(input) + require.NoError(t, err) + t.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String()) + defer exporter.CloseConnToCollector() + + bufferedExporter := NewBufferedIPFIXExporter(exporter) + + // Create template record with two fields + templateID := exporter.NewTemplateID() + elements := make([]entities.InfoElementWithValue, 0) + ieSrc, err := registry.GetInfoElement("sourceIPv4Address", registry.IANAEnterpriseID) + require.NoError(t, err, "Did not find the element with name sourceIPv4Address") + elements = append(elements, entities.NewIPAddressInfoElement(ieSrc, nil)) + ieDst, err := registry.GetInfoElement("destinationIPv4Address", registry.IANAEnterpriseID) + require.NoError(t, err, "Did not find the element with name destinationIPv4Address") + elements = append(elements, entities.NewIPAddressInfoElement(ieDst, nil)) + template := entities.NewTemplateRecordFromElements(templateID, elements, false) + + require.NoError(t, bufferedExporter.AddRecord(template)) + select { + case <-receivedLengthsCh: + break + case <-time.After(100 * time.Millisecond): + require.Fail(t, "Expected template not received") + } + + record := func() entities.Record { + elements := []entities.InfoElementWithValue{ + entities.NewIPAddressInfoElement(ieSrc, net.ParseIP("1.2.3.4")), + entities.NewIPAddressInfoElement(ieDst, net.ParseIP("5.6.7.8")), + } + return entities.NewDataRecordFromElements(templateID, elements, false) + }() + // Each record will be 8B. The message size has been set to 512B above. + // The overheade per message is 16 (message header) + 4 (set header). + // So we can fit 61 records per message. + // If we send 200 records, we will need 4 messages. + for range 200 { + require.Equal(t, 8, record.GetRecordLength()) // sanity check + require.NoError(t, bufferedExporter.AddRecord(record)) + } + require.NoError(t, bufferedExporter.Flush()) + + timerCh := time.After(100 * time.Millisecond) + for _, expectedBytesReceived := range []int{508, 508, 508, 156} { + select { + case bytesReceived := <-receivedLengthsCh: + assert.Equal(t, expectedBytesReceived, bytesReceived) + case <-timerCh: + require.Fail(t, "Expected message not received") + } + } +} + +func BenchmarkBufferedExporter(b *testing.B) { + // Create local server for testing + udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:0") + require.NoError(b, err) + conn, err := net.ListenUDP("udp", udpAddr) + require.NoError(b, err) + + go func() { + defer conn.Close() + b := make([]byte, 512) + for { + if _, err := conn.Read(b); err != nil { + return + } + } + }() + + // Create exporter using local server info + input := ExporterInput{ + CollectorAddress: conn.LocalAddr().String(), + CollectorProtocol: conn.LocalAddr().Network(), + ObservationDomainID: 1, + MaxMsgSize: 512, + } + exporter, err := InitExportingProcess(input) + require.NoError(b, err) + b.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String()) + defer exporter.CloseConnToCollector() + + bufferedExporter := NewBufferedIPFIXExporter(exporter) + + // Create template record with two fields + templateID := exporter.NewTemplateID() + elements := make([]entities.InfoElementWithValue, 0) + ieSrc, err := registry.GetInfoElement("sourceIPv4Address", registry.IANAEnterpriseID) + require.NoError(b, err, "Did not find the element with name sourceIPv4Address") + elements = append(elements, entities.NewIPAddressInfoElement(ieSrc, nil)) + ieDst, err := registry.GetInfoElement("destinationIPv4Address", registry.IANAEnterpriseID) + require.NoError(b, err, "Did not find the element with name destinationIPv4Address") + elements = append(elements, entities.NewIPAddressInfoElement(ieDst, nil)) + template := entities.NewTemplateRecordFromElements(templateID, elements, false) + + require.NoError(b, bufferedExporter.AddRecord(template)) + + record := func() entities.Record { + elements := []entities.InfoElementWithValue{ + entities.NewIPAddressInfoElement(ieSrc, net.ParseIP("1.2.3.4")), + entities.NewIPAddressInfoElement(ieDst, net.ParseIP("5.6.7.8")), + } + return entities.NewDataRecordFromElements(templateID, elements, false) + }() + + b.ResetTimer() + + for range b.N { + require.NoError(b, bufferedExporter.AddRecord(record)) + } +}