Skip to content

Commit 61439ec

Browse files
committed
Improve analyzer (handling of pairs, dropping noise)
Previously, the analyzer assumed that two consecutive flows would be a pair. This proved unreliable, therefore a new comparison algorithm is ussed. It utilizes the IP addresses and the 'first_switched' parameter to identify two flows of the same connection. More improvements can be done, especially filtering and in the identification of the initiating peer. Tests still fail, have to be adapted to the new dicts and gzip.
1 parent eff99fc commit 61439ec

File tree

3 files changed

+90
-13
lines changed

3 files changed

+90
-13
lines changed

analyze_json.py renamed to analyzer.py

+88-11
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,18 @@ def __init__(self, flow1, flow2):
6060
src = flow2
6161
dest = flow1
6262

63+
# TODO: this next approach uses the lower port as the service identifier
64+
# port1 = fallback(flow1, ['L4_SRC_PORT', 'SRC_PORT'])
65+
# port2 = fallback(flow2, ['L4_SRC_PORT', 'SRC_PORT'])
66+
#
67+
# src = flow1
68+
# dest = flow2
69+
# if port1 > port2:
70+
# src = flow2
71+
# dest = flow1
72+
73+
self.src_flow = src
74+
self.dest_flow = dest
6375
ips = self.get_ips(src)
6476
self.src = ips.src
6577
self.dest = ips.dest
@@ -128,18 +140,26 @@ def hostnames(self):
128140
@property
129141
def service(self):
130142
# Resolve ports to their services, if known
131-
# Try source port, fallback to dest port, otherwise "unknown"
143+
default = "({} {})".format(self.src_port, self.dest_port)
144+
if self.src_port > 10000:
145+
return default
132146
with contextlib.suppress(OSError):
133147
return socket.getservbyport(self.src_port)
134148
with contextlib.suppress(OSError):
135149
return socket.getservbyport(self.dest_port)
136-
return "unknown"
150+
return default
151+
152+
@property
153+
def total_packets(self):
154+
return self.src_flow["IN_PKTS"] + self.dest_flow["IN_PKTS"]
137155

138156

139157
if __name__ == "__main__":
140158
parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data")
141159
parser.add_argument('-f', '--file', dest='file', type=str, default=sys.stdin,
142160
help="The file to analyze (defaults to stdin if not provided)")
161+
parser.add_argument('-p', '--packets', dest='packets_threshold', type=int, default=10,
162+
help="Number of packets representing the lower bound in connections to be processed")
143163
args = parser.parse_args()
144164

145165
# Using a file and using stdin differ in their further usage for gzip.open
@@ -170,17 +190,74 @@ def service(self):
170190
data[ts] = entry[ts]
171191

172192
# Go through data and dissect every flow saved inside the dump
193+
194+
# The following dict holds flows which are looking for a peer, to analyze a duplex 'Connection'.
195+
# For each flow, the destination address is looked up. If the peer is not in the list of pending peers,
196+
# insert this flow, waiting for its peer. If found, take the waiting peer and create a Connection object.
197+
pending = {}
198+
skipped = 0
199+
skipped_threshold = args.packets_threshold
200+
173201
for key in sorted(data):
174202
timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S")
175203
client = data[key]["client"]
176204
flows = data[key]["flows"]
177-
pending = None # Two flows normally appear together for duplex connection
178-
for flow in flows:
179-
if not pending:
180-
pending = flow
205+
206+
for flow in sorted(flows, key=lambda x: x["FIRST_SWITCHED"]):
207+
first_switched = flow["FIRST_SWITCHED"]
208+
209+
if first_switched - 1 in pending:
210+
# TODO: handle fitting, yet mismatching (here: 1 second) pairs
211+
pass
212+
213+
if first_switched not in pending:
214+
pending[first_switched] = {}
215+
216+
# Find the peer for this connection
217+
if flow["IP_PROTOCOL_VERSION"] == 4:
218+
local_peer = flow["IPV4_SRC_ADDR"]
219+
remote_peer = flow["IPV4_DST_ADDR"]
220+
else:
221+
local_peer = flow["IPV6_SRC_ADDR"]
222+
remote_peer = flow["IPV6_DST_ADDR"]
223+
224+
if remote_peer in pending[first_switched]:
225+
# The destination peer put itself into the pending dict, getting and removing entry
226+
peer_flow = pending[first_switched].pop(remote_peer)
227+
if len(pending[first_switched]) == 0:
228+
del pending[first_switched]
229+
else:
230+
# Flow did not find a matching, pending peer - inserting itself
231+
pending[first_switched][local_peer] = flow
232+
continue
233+
234+
con = Connection(flow, peer_flow)
235+
if con.total_packets < skipped_threshold:
236+
skipped += 1
181237
continue
182-
con = Connection(pending, flow)
183-
print("{timestamp}: {service:<10} | {size:8} | {duration:9} | {src_host} ({src}) to {dest_host} ({dest})" \
184-
.format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src,
185-
dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration))
186-
pending = None
238+
239+
print("{timestamp}: {service:<14} | {size:8} | {duration:9} | {packets:5} | Between {src_host} ({src}) and {dest_host} ({dest})" \
240+
.format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src,
241+
dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration,
242+
packets=con.total_packets))
243+
244+
if skipped > 0:
245+
print(f"{skipped} connections skipped, because they had less than {skipped_threshold} packets.")
246+
247+
if len(pending) > 0:
248+
print(f"There are {len(pending)} first_switched entries left in the pending dict!")
249+
all_noise = True
250+
for first_switched, flows in sorted(pending.items(), key=lambda x: x[0]):
251+
for peer, flow in flows.items():
252+
# Ignore all pings, SYN scans and other noise to find only those peers left over which need a fix
253+
if flow["IN_PKTS"] < skipped_threshold:
254+
continue
255+
all_noise = False
256+
257+
if flow["IP_PROTOCOL_VERSION"] == 4:
258+
print(first_switched, peer, flow["IPV4_DST_ADDR"], flow["IN_PKTS"])
259+
else:
260+
print(first_switched, peer, flow["IPV6_DST_ADDR"], flow["IN_PKTS"])
261+
262+
if all_noise:
263+
print("They were all noise!")

netflow/v9.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ def __init__(self, data):
284284
field = TemplateField(field_type, field_length)
285285
fields.append(field)
286286

287-
# Create a tempalte object with all collected data
287+
# Create a template object with all collected data
288288
template = TemplateRecord(template_id, field_count, fields)
289289

290290
# Append the new template to the global templates list

tests.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def test_analyzer(self):
132132
pkts, _, _ = send_recv_packets([TEMPLATE_PACKET, *PACKETS])
133133
data = {p[0]: [f.data for f in p[1].flows] for p in pkts}
134134
analyzer = subprocess.run(
135-
[sys.executable, 'analyze_json.py'],
135+
[sys.executable, 'analyzer.py'],
136136
input=json.dumps(data),
137137
encoding='utf-8',
138138
capture_output=True

0 commit comments

Comments
 (0)