Skip to content

Commit c7c71fa

Browse files
committed
Fixed analyzer.py to display collected flows whose version is not 9
1 parent 71fb316 commit c7c71fa

File tree

1 file changed

+46
-22
lines changed

1 file changed

+46
-22
lines changed

netflow/analyzer.py

+46-22
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ def __init__(self, flow1, flow2):
9696

9797
# Assume the size that sent the most data is the source
9898
# TODO: this might not always be right, maybe use earlier timestamp?
99-
size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS'])
100-
size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS'])
99+
size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS', "octetDeltaCount"])
100+
size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS', "octetDeltaCount"])
101101
if size1 >= size2:
102102
src = flow1
103103
dest = flow2
@@ -120,12 +120,24 @@ def __init__(self, flow1, flow2):
120120
ips = self.get_ips(src)
121121
self.src = ips.src
122122
self.dest = ips.dest
123-
self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT'])
124-
self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT'])
125-
self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS'])
123+
self.proto = fallback(src, ['PROTOCOL', 'PROTO', 'protocolIdentifier'])
124+
# ICMP and ICMPv6 does not include port fields
125+
if self.proto == 1 or self.proto == 58:
126+
self.src_port = 0
127+
# ICMP field is treated as destination port
128+
try:
129+
self.dest_port = fallback(dest, ['ICMP_TYPE', 'icmpTypeCodeIPv4', 'icmpTypeCodeIPv6'])
130+
except:
131+
self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT', 'destinationTransportPort'])
132+
else:
133+
self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT', 'sourceTransportPort'])
134+
self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT', 'destinationTransportPort'])
135+
self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS', 'octetDeltaCount'])
126136

127137
# Duration is given in milliseconds
128-
self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED']
138+
lastSwitched = fallback(src, ['LAST_SWITCHED', 'flowEndSysUpTime'])
139+
firstSwitched = fallback(src, ['FIRST_SWITCHED', 'flowStartSysUpTime'])
140+
self.duration = lastSwitched - firstSwitched
129141
if self.duration < 0:
130142
# 32 bit int has its limits. Handling overflow here
131143
# TODO: Should be handled in the collection phase
@@ -139,16 +151,17 @@ def __repr__(self):
139151
def get_ips(flow):
140152
# IPv4
141153
if flow.get('IP_PROTOCOL_VERSION') == 4 or \
154+
'sourceIPv4Address' in flow or 'destinationIPv4Address' in flow or\
142155
'IPV4_SRC_ADDR' in flow or 'IPV4_DST_ADDR' in flow:
143156
return Pair(
144-
ipaddress.ip_address(flow['IPV4_SRC_ADDR']),
145-
ipaddress.ip_address(flow['IPV4_DST_ADDR'])
157+
ipaddress.ip_address(fallback(flow, ['IPV4_SRC_ADDR', 'sourceIPv4Address'])),
158+
ipaddress.ip_address(fallback(flow, ['IPV4_DST_ADDR', 'destinationIPv4Address']))
146159
)
147160

148161
# IPv6
149162
return Pair(
150-
ipaddress.ip_address(flow['IPV6_SRC_ADDR']),
151-
ipaddress.ip_address(flow['IPV6_DST_ADDR'])
163+
ipaddress.ip_address(fallback(flow, ['IPV6_SRC_ADDR', 'sourceIPv6Address'])),
164+
ipaddress.ip_address(fallback(flow, ['IPV6_DST_ADDR', 'destinationIPv6Address']))
152165
)
153166

154167
@property
@@ -179,7 +192,13 @@ def service(self):
179192

180193
@property
181194
def total_packets(self):
182-
return self.src_flow["IN_PKTS"] + self.dest_flow["IN_PKTS"]
195+
src_flow_packets = fallback(
196+
self.src_flow, ["IN_PKTS", "IN_PACKETS", "packetDeltaCount"]
197+
)
198+
dest_flow_packets = fallback(
199+
self.dest_flow, ["IN_PKTS", "IN_PACKETS", "packetDeltaCount"]
200+
)
201+
return src_flow_packets + dest_flow_packets
183202

184203

185204
if __name__ == "netflow.analyzer":
@@ -236,10 +255,6 @@ def total_packets(self):
236255
logger.error("No header dict in entry {}".format(ts))
237256
raise ValueError
238257

239-
if entry[ts]["header"]["version"] == 10:
240-
logger.warning("Skipped IPFIX entry, because analysis of IPFIX is not yet implemented")
241-
continue
242-
243258
data[ts] = entry[ts]
244259

245260
# Go through data and dissect every flow saved inside the dump
@@ -258,20 +273,29 @@ def total_packets(self):
258273
client = data[key]["client"]
259274
flows = data[key]["flows"]
260275

261-
for flow in sorted(flows, key=lambda x: x["FIRST_SWITCHED"]):
262-
first_switched = flow["FIRST_SWITCHED"]
276+
for flow in sorted(flows,
277+
key=lambda x:fallback(x,
278+
["FIRST_SWITCHED", "flowStartSysUpTime", "systemInitTimeMilliseconds"],
279+
),
280+
):
281+
if "systemInitTimeMilliseconds" in flow:
282+
# systemInitTimeMilliseconds exists in only option data record
283+
continue
284+
first_switched = fallback(flow, ["FIRST_SWITCHED", "flowStartSysUpTime"])
263285

264286
if first_switched - 1 in pending:
265287
# TODO: handle fitting, yet mismatching (here: 1 second) pairs
266288
pass
267289

268290
# Find the peer for this connection
269-
if "IPV4_SRC_ADDR" in flow or flow.get("IP_PROTOCOL_VERSION") == 4:
270-
local_peer = flow["IPV4_SRC_ADDR"]
271-
remote_peer = flow["IPV4_DST_ADDR"]
291+
if ("IPV4_SRC_ADDR" in flow or "sourceIPv4Address" in flow
292+
or fallback(flow, ["IP_PROTOCOL_VERSION", "ipVersion"]) == 4
293+
):
294+
local_peer = fallback(flow, ["IPV4_SRC_ADDR", "sourceIPv4Address"])
295+
remote_peer = fallback(flow, ["IPV4_DST_ADDR", "destinationIPv4Address"])
272296
else:
273-
local_peer = flow["IPV6_SRC_ADDR"]
274-
remote_peer = flow["IPV6_DST_ADDR"]
297+
local_peer = fallback(flow, ["IPV6_SRC_ADDR", "sourceIPv6Address"])
298+
remote_peer = fallback(flow, ["IPV6_DST_ADDR", "destinationIPv6Address"])
275299

276300
# Match on host filter passed in as argument
277301
if args.match_host and not any([local_peer == args.match_host, remote_peer == args.match_host]):

0 commit comments

Comments
 (0)