@@ -2,10 +2,11 @@ package flow
2
2
3
3
import (
4
4
"context"
5
- "encoding/json "
5
+ "errors "
6
6
"flag"
7
7
"fmt"
8
8
"io"
9
+ "net"
9
10
"net/http"
10
11
"os"
11
12
@@ -17,9 +18,13 @@ import (
17
18
"github.com/kentik/ktranslate/pkg/kt"
18
19
"github.com/kentik/ktranslate/pkg/util/resolv"
19
20
20
- "github.com/netsampler/goflow2/producer"
21
- "github.com/netsampler/goflow2/utils"
21
+ "github.com/netsampler/goflow2/v2/decoders/netflow"
22
+ "github.com/netsampler/goflow2/v2/format"
23
+ "github.com/netsampler/goflow2/v2/metrics"
24
+ protoproducer "github.com/netsampler/goflow2/v2/producer/proto"
25
+ "github.com/netsampler/goflow2/v2/utils"
22
26
"github.com/prometheus/client_golang/prometheus/promhttp"
27
+ "gopkg.in/yaml.v2"
23
28
)
24
29
25
30
type FlowSource string
@@ -66,7 +71,7 @@ func NewFlowSource(ctx context.Context, proto FlowSource, maxBatchSize int, log
66
71
}()
67
72
68
73
// Allow processing of custom ipfix templates here.
69
- var config EntConfig
74
+ var config * protoproducer. ProducerConfig
70
75
71
76
// Load a special config if there's a known enriched flow.
72
77
switch proto {
@@ -76,6 +81,8 @@ func NewFlowSource(ctx context.Context, proto FlowSource, maxBatchSize int, log
76
81
config = loadNBar (cfg )
77
82
case PAN :
78
83
config = loadPAN (cfg )
84
+ default :
85
+ config = loadDefault (cfg )
79
86
}
80
87
81
88
kt := NewKentikDriver (ctx , proto , maxBatchSize , log , registry , jchfChan , apic , cfg .MessageFields , resolv , cfg )
@@ -87,108 +94,104 @@ func NewFlowSource(ctx context.Context, proto FlowSource, maxBatchSize int, log
87
94
kt .Errorf ("Cannot load netflow mapping file: %v" , err )
88
95
return nil , err
89
96
}
90
- config , err = loadMapping (f , proto )
97
+ pc , err : = loadMapping (f )
91
98
f .Close ()
92
99
if err != nil {
93
100
kt .Errorf ("Invalid yaml for netflow mapping file: %v" , err )
94
101
return nil , err
95
102
}
103
+ config = pc
96
104
}
97
105
98
- kt .Infof ("Netflow listener running on %s:%d for format %s and a batch size of %d" , cfg .ListenIP , cfg .ListenPort , proto , maxBatchSize )
99
- kt .Infof ("Netflow listener sending fields %s" , cfg .MessageFields )
100
106
kt .SetConfig (config )
101
107
108
+ flowProducer , err := protoproducer .CreateProtoProducer (config , protoproducer .CreateSamplingSystem )
109
+ if err != nil {
110
+ return nil , err
111
+ }
112
+
113
+ flowProducer = metrics .WrapPromProducer (flowProducer )
114
+ udpCfg := & utils.UDPReceiverConfig {
115
+ Sockets : cfg .Workers ,
116
+ Workers : cfg .Workers ,
117
+ }
118
+ recv , err := utils .NewUDPReceiver (udpCfg )
119
+ if err != nil {
120
+ return nil , err
121
+ }
122
+
123
+ format .RegisterFormatDriver ("chf" , kt ) // Let goflow know about kt.
124
+ formatter , err := format .FindFormat ("chf" )
125
+ if err != nil {
126
+ return nil , err
127
+ }
128
+
129
+ cfgPipe := & utils.PipeConfig {
130
+ Format : formatter ,
131
+ Transport : nil ,
132
+ Producer : flowProducer ,
133
+ NetFlowTemplater : metrics .NewDefaultPromTemplateSystem , // wrap template system to get Prometheus info
134
+ }
135
+
136
+ var decodeFunc utils.DecoderFunc
137
+
102
138
switch proto {
103
- case Ipfix , Netflow9 , ASA , NBar , PAN :
104
- sNF := & utils.StateNetFlow {
105
- Format : kt ,
106
- Logger : & KentikLog {l : kt },
107
- Config : & config .FlowConfig ,
108
- }
139
+ case Ipfix , Netflow9 , ASA , NBar , PAN , Netflow5 , JFlow , CFlow :
109
140
switch proto {
110
141
case Ipfix , ASA , NBar , PAN :
111
- for _ , v := range config .FlowConfig . IPFIX .Mapping {
112
- kt .Infof ("Custom IPFIX Field Mapping: Field=%v, Pen=%v -> %v" , v .Type , v .Pen , config . NameMap [ v .Destination ] )
142
+ for _ , v := range config .IPFIX .Mapping {
143
+ kt .Infof ("Custom IPFIX Field Mapping: Field=%v, Pen=%v -> %v" , v .Type , v .Pen , v .Destination )
113
144
}
114
145
case Netflow9 :
115
- for _ , v := range config .FlowConfig . NetFlowV9 .Mapping {
116
- kt .Infof ("Custom Netflow9 Field Mapping: Field=%v -> %v" , v .Type , config . NameMap [ v .Destination ] )
146
+ for _ , v := range config .NetFlowV9 .Mapping {
147
+ kt .Infof ("Custom Netflow9 Field Mapping: Field=%v -> %v" , v .Type , v .Destination )
117
148
}
118
149
}
119
- go func () { // Let this run, returning flow into the kentik transport struct
120
- err := sNF .FlowRoutine (cfg .Workers , cfg .ListenIP , cfg .ListenPort , cfg .EnableReusePort )
121
- if err != nil {
122
- sNF .Logger .Fatalf ("Fatal error: could not listen to UDP (%v)" , err )
123
- }
124
- }()
125
- return kt , nil
150
+ kt .pipe = utils .NewNetFlowPipe (cfgPipe )
151
+ decodeFunc = metrics .PromDecoderWrapper (kt .pipe .DecodeFlow , string (proto ))
126
152
case Sflow :
127
- sSF := & utils.StateSFlow {
128
- Format : kt ,
129
- Logger : & KentikLog {l : kt },
130
- Config : & config .FlowConfig ,
131
- }
132
- for _ , v := range config .FlowConfig .SFlow .Mapping {
133
- kt .Infof ("Custom SFlow Field Mapping: Layer=%d, Offset=%d, Length=%d -> %v" , v .Layer , v .Offset , v .Length , config .NameMap [v .Destination ])
134
- }
135
- go func () { // Let this run, returning flow into the kentik transport struct
136
- err := sSF .FlowRoutine (cfg .Workers , cfg .ListenIP , cfg .ListenPort , cfg .EnableReusePort )
137
- if err != nil {
138
- sSF .Logger .Fatalf ("Fatal error: could not listen to UDP (%v)" , err )
139
- }
140
- }()
141
- return kt , nil
142
- case Netflow5 , JFlow , CFlow :
143
- sNFL := & utils.StateNFLegacy {
144
- Format : kt ,
145
- Logger : & KentikLog {l : kt },
153
+ for _ , v := range config .SFlow .Mapping {
154
+ kt .Infof ("Custom SFlow Field Mapping: Layer=%s, Offset=%d, Length=%d -> %v" , v .Layer , v .Offset , v .Length , v .Destination )
146
155
}
147
- go func () { // Let this run, returning flow into the kentik transport struct
148
- err := sNFL .FlowRoutine (cfg .Workers , cfg .ListenIP , cfg .ListenPort , cfg .EnableReusePort )
149
- if err != nil {
150
- sNFL .Logger .Fatalf ("Fatal error: could not listen to UDP (%v)" , err )
156
+ kt .pipe = utils .NewSFlowPipe (cfgPipe )
157
+ decodeFunc = metrics .PromDecoderWrapper (kt .pipe .DecodeFlow , string (proto ))
158
+ default :
159
+ return nil , fmt .Errorf ("Unknown flow format %v" , proto )
160
+ }
161
+
162
+ kt .producer = flowProducer
163
+ kt .receiver = recv
164
+ if err := kt .receiver .Start (cfg .ListenIP , cfg .ListenPort , decodeFunc ); err != nil {
165
+ return nil , err
166
+ } else {
167
+ go func () {
168
+ for {
169
+ select {
170
+ case <- ctx .Done ():
171
+ return
172
+ case err := <- recv .Errors ():
173
+ if errors .Is (err , netflow .ErrorTemplateNotFound ) {
174
+ kt .Warnf ("template error: %v" , err )
175
+ } else if errors .Is (err , net .ErrClosed ) {
176
+ kt .Infof ("closed receiver" )
177
+ } else {
178
+ kt .Warnf ("error: %v" , err )
179
+ }
180
+
181
+ }
151
182
}
152
183
}()
153
- return kt , nil
154
184
}
155
- return nil , fmt .Errorf ("Unknown flow format %v" , proto )
156
- }
157
-
158
- type EntConfig struct {
159
- FlowConfig producer.ProducerConfig `json:"flow_config"`
160
- NameMap map [string ]string `json:"name_map"`
161
- }
162
185
163
- func loadMapping (f io.Reader , proto FlowSource ) (EntConfig , error ) {
164
- config := EntConfig {}
165
- dec := json .NewDecoder (f )
166
- err := dec .Decode (& config )
186
+ kt .Infof ("Netflow listener running on %s:%d for format %s and a batch size of %d" , cfg .ListenIP , cfg .ListenPort , proto , maxBatchSize )
187
+ kt .Infof ("Netflow listener sending fields %s" , cfg .MessageFields )
167
188
168
- // Update any non filled in name maps to the default.
169
- if config .NameMap == nil {
170
- config .NameMap = map [string ]string {}
171
- }
172
- switch proto {
173
- case Ipfix , ASA , NBar , PAN :
174
- for _ , v := range config .FlowConfig .IPFIX .Mapping {
175
- if _ , ok := config .NameMap [v .Destination ]; ! ok {
176
- config .NameMap [v .Destination ] = v .Destination
177
- }
178
- }
179
- case Netflow9 :
180
- for _ , v := range config .FlowConfig .NetFlowV9 .Mapping {
181
- if _ , ok := config .NameMap [v .Destination ]; ! ok {
182
- config .NameMap [v .Destination ] = v .Destination
183
- }
184
- }
185
- case Sflow :
186
- for _ , v := range config .FlowConfig .SFlow .Mapping {
187
- if _ , ok := config .NameMap [v .Destination ]; ! ok {
188
- config .NameMap [v .Destination ] = v .Destination
189
- }
190
- }
191
- }
189
+ return kt , nil
190
+ }
192
191
192
+ func loadMapping (f io.Reader ) (* protoproducer.ProducerConfig , error ) {
193
+ config := & protoproducer.ProducerConfig {}
194
+ dec := yaml .NewDecoder (f )
195
+ err := dec .Decode (config )
193
196
return config , err
194
197
}
0 commit comments