Skip to content

Commit c11d299

Browse files
authored
Add Path MTU to Exporter inputs (#407)
It is not straightforward for the consumer of the library to determine the maximum message size (for UDP) just based on the PMTU, as the amount to deduct from the PMTU depends on the protocol used for the connection to the exporter (IPv4 or IPv6). The protocol is only known after resolving the collector address, which happens in InitExportingProcess. As a workaround, we add PMTU to the exporter inputs, and let the exporter compute the maximum message size. It is still possible to request a specific maximum message size. If both are provided and the exporter determines that the values are incompatible, an error will be returned. Signed-off-by: Antonin Bas <[email protected]>
1 parent 5bb5e00 commit c11d299

File tree

2 files changed

+153
-12
lines changed

2 files changed

+153
-12
lines changed

pkg/exporter/process.go

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ type templateValue struct {
4848
// 3. Supports only TCP and UDP; one session at a time. SCTP is not supported.
4949
// 4. UDP needs to send PMTU size packets as per RFC7011. In order to guarantee
5050
// this, maxMsgSize should be set correctly. maxMsgSize is the maximum
51-
// payload (IPFIX message) size, not the maximum packet size. You need to
52-
// compute maxMsgSize based on the desired maximum packet size. If
51+
// payload (IPFIX message) size, not the maximum packet size. If
5352
// maxMsgSize is not set correctly, the message may be fragmented.
5453
type ExportingProcess struct {
5554
connToCollector net.Conn
@@ -94,17 +93,68 @@ type ExporterInput struct {
9493
// JSONBufferLen is recommended for sending json records. If not given a
9594
// valid value, we use a default of 5000B
9695
JSONBufferLen int
97-
// For UDP, this should be set by taking into account the PMTU and
98-
// header sizes.
99-
MaxMsgSize int
96+
// MaxMsgSize can be used to provide a custom maximum IPFIX message
97+
// size. If it is omitted, we will use an appropriate default based on
98+
// the configured protocol. For UDP, we want to avoid fragmentation, so
99+
// the MaxMsgSize should be set by taking into account the PMTU and
100+
// header sizes. The recommended approach is to keep MaxMsgSize unset
101+
// and provide the correct PMTU value.
102+
MaxMsgSize int
103+
// PathMTU is used to calculate the maximum message size when the
104+
// protocol is UDP. It is ignored for TCP. If both MaxMsgSize and
105+
// PathMTU are set, and MaxMsgSize is incompatible with the provided
106+
// PathMTU, exporter initialization will fail.
107+
PathMTU int
100108
CheckConnInterval time.Duration
101109
}
102110

111+
func calculateMaxMsgSize(proto string, requestedSize int, pathMTU int, isIPv6 bool) (int, error) {
112+
if requestedSize > 0 && (requestedSize < entities.MinSupportedMsgSize || requestedSize > entities.MaxSocketMsgSize) {
113+
return 0, fmt.Errorf("requested message size should be between %d and %d", entities.MinSupportedMsgSize, entities.MaxSocketMsgSize)
114+
}
115+
if proto == "tcp" {
116+
if requestedSize == 0 {
117+
return entities.MaxSocketMsgSize, nil
118+
} else {
119+
return requestedSize, nil
120+
}
121+
}
122+
// UDP protocol
123+
if pathMTU == 0 {
124+
if requestedSize == 0 {
125+
klog.InfoS("Neither max IPFIX message size nor PMTU were provided, defaulting to min message size", "messageSize", entities.MinSupportedMsgSize)
126+
return entities.MinSupportedMsgSize, nil
127+
}
128+
klog.InfoS("PMTU was not provided, configured message size may cause fragmentation", "messageSize", requestedSize)
129+
return requestedSize, nil
130+
}
131+
// 20-byte IPv4, 8-byte UDP header
132+
mtuDeduction := 28
133+
if isIPv6 {
134+
// An extra 20 bytes for IPv6
135+
mtuDeduction += 20
136+
}
137+
maxMsgSize := pathMTU - mtuDeduction
138+
if maxMsgSize < entities.MinSupportedMsgSize {
139+
return 0, fmt.Errorf("provided PMTU %d is not large enough to accommodate min message size %d", pathMTU, entities.MinSupportedMsgSize)
140+
}
141+
if requestedSize > maxMsgSize {
142+
return 0, fmt.Errorf("requested message size %d exceeds max message size %d calculated from provided PMTU", requestedSize, maxMsgSize)
143+
}
144+
if requestedSize > 0 {
145+
return requestedSize, nil
146+
}
147+
return maxMsgSize, nil
148+
}
149+
103150
// InitExportingProcess takes in collector address(net.Addr format), obsID(observation ID)
104151
// and tempRefTimeout(template refresh timeout). tempRefTimeout is applicable only
105152
// for collectors listening over UDP; unit is seconds. For TCP, you can pass any
106153
// value and it will be ignored. For UDP, if 0 is passed, 600s is used as the default.
107154
func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
155+
if input.CollectorProtocol != "tcp" && input.CollectorProtocol != "udp" {
156+
return nil, fmt.Errorf("unsupported collector protocol: %s", input.CollectorProtocol)
157+
}
108158
var conn net.Conn
109159
var err error
110160
if input.TLSClientConfig != nil {
@@ -151,6 +201,15 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
151201
return nil, err
152202
}
153203
}
204+
var isIPv6 bool
205+
switch addr := conn.RemoteAddr().(type) {
206+
case *net.TCPAddr:
207+
isIPv6 = addr.IP.To4() == nil
208+
case *net.UDPAddr:
209+
isIPv6 = addr.IP.To4() == nil
210+
default:
211+
return nil, fmt.Errorf("unsupported net.Addr type %T", addr)
212+
}
154213
expProc := &ExportingProcess{
155214
connToCollector: conn,
156215
obsDomainID: input.ObservationDomainID,
@@ -169,13 +228,12 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) {
169228
expProc.jsonBufferLen = input.JSONBufferLen
170229
}
171230
} else {
172-
if input.MaxMsgSize == 0 {
173-
expProc.maxMsgSize = entities.MaxSocketMsgSize
174-
} else if input.MaxMsgSize < entities.MinSupportedMsgSize {
175-
return nil, fmt.Errorf("maxMsgSize cannot be less than 512B")
176-
} else {
177-
expProc.maxMsgSize = input.MaxMsgSize
231+
maxMsgSize, err := calculateMaxMsgSize(input.CollectorProtocol, input.MaxMsgSize, input.PathMTU, isIPv6)
232+
if err != nil {
233+
return nil, err
178234
}
235+
klog.InfoS("Calculated max IPFIX message size", "size", maxMsgSize)
236+
expProc.maxMsgSize = maxMsgSize
179237
}
180238

181239
// Start a goroutine to check whether the collector has already closed the TCP connection.

pkg/exporter/process_test.go

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ func TestExportingProcess_SendingDataRecordToLocalUDPServer(t *testing.T) {
336336
CollectorProtocol: conn.LocalAddr().Network(),
337337
ObservationDomainID: 1,
338338
TempRefTimeout: 0,
339+
PathMTU: 1500,
339340
}
340341
exporter, err := InitExportingProcess(input)
341342
if err != nil {
@@ -730,13 +731,14 @@ func TestExportingProcess_GetMsgSizeLimit(t *testing.T) {
730731
CollectorProtocol: conn.LocalAddr().Network(),
731732
ObservationDomainID: 1,
732733
TempRefTimeout: 1,
734+
MaxMsgSize: 1000,
733735
}
734736
exporter, err := InitExportingProcess(input)
735737
if err != nil {
736738
t.Fatalf("Got error when connecting to local server %s: %v", conn.LocalAddr().String(), err)
737739
}
738740
t.Logf("Created exporter connecting to local server with address: %s", conn.LocalAddr().String())
739-
assert.Equal(t, entities.MaxSocketMsgSize, exporter.GetMsgSizeLimit())
741+
assert.Equal(t, input.MaxMsgSize, exporter.GetMsgSizeLimit())
740742
}
741743

742744
func TestExportingProcess_CheckConnToCollector(t *testing.T) {
@@ -885,3 +887,84 @@ func TestSendDataRecords(t *testing.T) {
885887
}
886888
}
887889
}
890+
891+
func TestCalculateMaxMsgSize(t *testing.T) {
892+
testCases := []struct {
893+
proto string
894+
requestedSize int
895+
pathMTU int
896+
isIPv6 bool
897+
expectedErr string
898+
expectedSize int
899+
}{
900+
{
901+
proto: "tcp",
902+
requestedSize: entities.MinSupportedMsgSize - 1,
903+
expectedErr: "requested message size should be between",
904+
},
905+
{
906+
proto: "tcp",
907+
requestedSize: entities.MaxSocketMsgSize + 1,
908+
expectedErr: "requested message size should be between",
909+
},
910+
{
911+
proto: "tcp",
912+
requestedSize: 0,
913+
expectedSize: entities.MaxSocketMsgSize,
914+
},
915+
{
916+
proto: "tcp",
917+
requestedSize: 1000,
918+
expectedSize: 1000,
919+
},
920+
{
921+
proto: "udp",
922+
expectedSize: entities.MinSupportedMsgSize,
923+
},
924+
{
925+
proto: "udp",
926+
requestedSize: 1000,
927+
expectedSize: 1000,
928+
},
929+
{
930+
proto: "udp",
931+
pathMTU: 1500,
932+
expectedSize: 1472,
933+
},
934+
{
935+
proto: "udp",
936+
pathMTU: 1500,
937+
isIPv6: true,
938+
expectedSize: 1452,
939+
},
940+
{
941+
proto: "udp",
942+
pathMTU: 300,
943+
expectedErr: "is not large enough to accommodate min message size",
944+
},
945+
{
946+
proto: "udp",
947+
requestedSize: 2000,
948+
pathMTU: 1500,
949+
expectedErr: "exceeds max message size",
950+
},
951+
{
952+
proto: "udp",
953+
requestedSize: 1000,
954+
pathMTU: 1500,
955+
expectedSize: 1000,
956+
},
957+
}
958+
959+
for _, tc := range testCases {
960+
t.Run(fmt.Sprintf("%s-req=%d-pmtu=%d-ipv6=%t", tc.proto, tc.requestedSize, tc.pathMTU, tc.isIPv6), func(t *testing.T) {
961+
size, err := calculateMaxMsgSize(tc.proto, tc.requestedSize, tc.pathMTU, tc.isIPv6)
962+
if tc.expectedErr != "" {
963+
assert.ErrorContains(t, err, tc.expectedErr)
964+
} else {
965+
require.NoError(t, err)
966+
assert.Equal(t, tc.expectedSize, size)
967+
}
968+
})
969+
}
970+
}

0 commit comments

Comments
 (0)