Skip to content

Commit 3dee135

Browse files
committed
Merge branch 'props-master'
Merging pull request #9 by @pR0Ps #9 Thanks for the contribution! Resolves #9
2 parents ce2be70 + 9f16d24 commit 3dee135

File tree

10 files changed

+610
-329
lines changed

10 files changed

+610
-329
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Python NetFlow v9 parser and UDP collector
2-
This script is able to parse incoming UDP NetFlow packets of **NetFlow version 9**.
2+
This script is able to collect and parse incoming UDP NetFlow packets of **NetFlow versions 1, 5 and 9**.
33

44
Version 9 is the first NetFlow version using templates.
55
Templates make dynamically sized and configured NetFlow data flowsets possible,

analyze_json.py

100644100755
Lines changed: 82 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -8,67 +8,93 @@
88
Licensed under MIT License. See LICENSE.
99
"""
1010

11+
import argparse
12+
from collections import namedtuple
13+
import contextlib
1114
from datetime import datetime
15+
import functools
1216
import ipaddress
1317
import json
14-
import os.path
15-
import sys
1618
import socket
17-
from collections import namedtuple
19+
import sys
1820

19-
Pair = namedtuple('Pair', 'src dest')
2021

21-
def getIPs(flow):
22-
use_ipv4 = False # optimistic default case of IPv6
22+
Pair = namedtuple('Pair', ['src', 'dest'])
2323

24-
if 'IP_PROTOCOL_VERSION' in flow and flow['IP_PROTOCOL_VERSION'] == 4:
25-
use_ipv4 = True
26-
elif 'IPV4_SRC_ADDR' in flow or 'IPV4_DST_ADDR' in flow:
27-
use_ipv4 = True
2824

29-
if use_ipv4:
30-
return Pair(
31-
ipaddress.ip_address(flow['IPV4_SRC_ADDR']),
32-
ipaddress.ip_address(flow['IPV4_DST_ADDR']))
25+
@functools.lru_cache(maxsize=128)
26+
def resolve_hostname(ip):
27+
return socket.getfqdn(ip)
3328

34-
# else: return IPv6 pair
35-
return Pair(
36-
ipaddress.ip_address(flow['IPV6_SRC_ADDR']),
37-
ipaddress.ip_address(flow['IPV6_DST_ADDR']))
29+
30+
def fallback(d, keys):
31+
for k in keys:
32+
if k in d:
33+
return d[k]
34+
raise KeyError(", ".join(keys))
3835

3936

4037
class Connection:
4138
"""Connection model for two flows.
4239
The direction of the data flow can be seen by looking at the size.
4340
4441
'src' describes the peer which sends more data towards the other. This
45-
does NOT have to mean, that 'src' was the initiator of the connection.
42+
does NOT have to mean that 'src' was the initiator of the connection.
4643
"""
4744
def __init__(self, flow1, flow2):
48-
if flow1['IN_BYTES'] >= flow2['IN_BYTES']:
45+
if not flow1 or not flow2:
46+
raise Exception("A connection requires two flows")
47+
48+
# Assume the size that sent the most data is the source
49+
# TODO: this might not always be right, maybe use earlier timestamp?
50+
size1 = fallback(flow1, ['IN_BYTES', 'IN_OCTETS'])
51+
size2 = fallback(flow2, ['IN_BYTES', 'IN_OCTETS'])
52+
if size1 >= size2:
4953
src = flow1
5054
dest = flow2
5155
else:
5256
src = flow2
5357
dest = flow1
5458

55-
ips = getIPs(src)
59+
ips = self.get_ips(src)
5660
self.src = ips.src
5761
self.dest = ips.dest
58-
self.src_port = src['L4_SRC_PORT']
59-
self.dest_port = src['L4_DST_PORT']
60-
self.size = src['IN_BYTES']
62+
self.src_port = fallback(src, ['L4_SRC_PORT', 'SRC_PORT'])
63+
self.dest_port = fallback(dest, ['L4_DST_PORT', 'DST_PORT'])
64+
self.size = fallback(src, ['IN_BYTES', 'IN_OCTETS'])
6165

6266
# Duration is given in milliseconds
6367
self.duration = src['LAST_SWITCHED'] - src['FIRST_SWITCHED']
6468
if self.duration < 0:
6569
# 32 bit int has its limits. Handling overflow here
70+
# TODO: Should be handled in the collection phase
6671
self.duration = (2**32 - src['FIRST_SWITCHED']) + src['LAST_SWITCHED']
6772

6873
def __repr__(self):
6974
return "<Connection from {} to {}, size {}>".format(
7075
self.src, self.dest, self.human_size)
7176

77+
@staticmethod
78+
def get_ips(flow):
79+
# TODO: These values should be parsed into strings in the collection phase.
80+
# The floating point representation of an IPv6 address in JSON
81+
# could lose precision.
82+
83+
# IPv4
84+
if flow.get('IP_PROTOCOL_VERSION') == 4 \
85+
or 'IPV4_SRC_ADDR' in flow \
86+
or 'IPV4_DST_ADDR' in flow:
87+
return Pair(
88+
ipaddress.ip_address(flow['IPV4_SRC_ADDR']),
89+
ipaddress.ip_address(flow['IPV4_DST_ADDR'])
90+
)
91+
92+
# IPv6
93+
return Pair(
94+
ipaddress.ip_address(flow['IPV6_SRC_ADDR']),
95+
ipaddress.ip_address(flow['IPV6_DST_ADDR'])
96+
)
97+
7298
@property
7399
def human_size(self):
74100
# Calculate a human readable size of the traffic
@@ -96,52 +122,42 @@ def human_duration(self):
96122
@property
97123
def hostnames(self):
98124
# Resolve the IPs of this flows to their hostname
99-
src_hostname = socket.getfqdn(self.src.compressed)
100-
dest_hostname = socket.getfqdn(self.dest.compressed)
101-
125+
src_hostname = resolve_hostname(self.src.compressed)
126+
dest_hostname = resolve_hostname(self.dest.compressed)
102127
return Pair(src_hostname, dest_hostname)
103128

104129
@property
105130
def service(self):
106131
# Resolve ports to their services, if known
107-
service = "unknown"
108-
try:
109-
# Try service of sending peer first
110-
service = socket.getservbyport(self.src_port)
111-
except OSError:
112-
# Resolving the sport did not work, trying dport
113-
try:
114-
service = socket.getservbyport(self.dest_port)
115-
except OSError:
116-
pass
117-
return service
118-
119-
120-
# Handle CLI args and load the data dump
121-
if len(sys.argv) < 2:
122-
exit("Use {} <filename>.json".format(sys.argv[0]))
123-
filename = sys.argv[1]
124-
if not os.path.exists(filename):
125-
exit("File {} does not exist!".format(filename))
126-
with open(filename, 'r') as fh:
127-
data = json.loads(fh.read())
128-
129-
130-
# Go through data and disect every flow saved inside the dump
131-
for export in sorted(data):
132-
timestamp = datetime.fromtimestamp(float(export)).strftime("%Y-%m-%d %H:%M.%S")
133-
134-
flows = data[export]
135-
pending = None # Two flows normally appear together for duplex connection
136-
for flow in flows:
137-
if not pending:
138-
pending = flow
139-
else:
132+
# Try source port, fallback to dest port, otherwise "unknown"
133+
with contextlib.suppress(OSError):
134+
return socket.getservbyport(self.src_port)
135+
with contextlib.suppress(OSError):
136+
return socket.getservbyport(self.dest_port)
137+
return "unknown"
138+
139+
140+
if __name__ == "__main__":
141+
parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data")
142+
parser.add_argument('filename', nargs='?', type=argparse.FileType('r'),
143+
default=sys.stdin,
144+
help="The file to analyze (defaults to stdin if not provided)")
145+
args = parser.parse_args()
146+
147+
data = json.load(args.filename)
148+
149+
# Go through data and disect every flow saved inside the dump
150+
for key in sorted(data):
151+
timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S")
152+
153+
flows = data[key]
154+
pending = None # Two flows normally appear together for duplex connection
155+
for flow in flows:
156+
if not pending:
157+
pending = flow
158+
continue
140159
con = Connection(pending, flow)
141-
print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to"\
142-
" {dest_host} ({dest})".format(
143-
timestamp=timestamp, service=con.service.upper(),
144-
src_host=con.hostnames.src, src=con.src,
145-
dest_host=con.hostnames.dest, dest=con.dest,
146-
size=con.human_size, duration=con.human_duration))
160+
print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to {dest_host} ({dest})" \
161+
.format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src,
162+
dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration))
147163
pending = None

0 commit comments

Comments
 (0)