Skip to content

Commit 6b9d20c

Browse files
committed
Refactor storing data and writing to disk - using gzip and lines
In previous versions, collected flows (parsed data) were stored in memory by the collector. In regular intervals, or at shutdown, this one single dict was dumped as JSON onto disk. With this commit, the behaviour is changed to line-based JSON dumps for each flow, gzipped onto disk for storage efficiency. The analyze_json is updated as well to handle the new gzipped files in the new format. See the comments in main.py for more details. Fixes #10
1 parent 3dee135 commit 6b9d20c

File tree

2 files changed

+56
-21
lines changed

2 files changed

+56
-21
lines changed

analyze_json.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,20 @@
1313
import contextlib
1414
from datetime import datetime
1515
import functools
16+
import gzip
1617
import ipaddress
1718
import json
19+
import logging
20+
import os.path
1821
import socket
1922
import sys
2023

2124

2225
Pair = namedtuple('Pair', ['src', 'dest'])
26+
logger = logging.getLogger(__name__)
2327

2428

25-
@functools.lru_cache(maxsize=128)
29+
@functools.lru_cache(maxsize=None)
2630
def resolve_hostname(ip):
2731
return socket.getfqdn(ip)
2832

@@ -139,14 +143,38 @@ def service(self):
139143

140144
if __name__ == "__main__":
141145
parser = argparse.ArgumentParser(description="Output a basic analysis of NetFlow data")
142-
parser.add_argument('filename', nargs='?', type=argparse.FileType('r'),
143-
default=sys.stdin,
146+
parser.add_argument('-f', '--file', dest='file', type=str, default=sys.stdin,
144147
help="The file to analyze (defaults to stdin if not provided)")
145148
args = parser.parse_args()
146149

147-
data = json.load(args.filename)
150+
# Using a file and using stdin differ in their further usage for gzip.open
151+
file = args.file
152+
mode = "rb" # reading files
153+
if file != sys.stdin and not os.path.exists(file):
154+
exit("File {} does not exist!".format(file))
155+
156+
if file == sys.stdin:
157+
file = sys.stdin.buffer
158+
mode = "rt" # reading from stdin
159+
160+
data = {}
161+
162+
with gzip.open(file, mode) as gzipped:
163+
# "for line in" lazy-loads all lines in the file
164+
for line in gzipped:
165+
entry = json.loads(line)
166+
if len(entry.keys()) != 1:
167+
logger.warning("Line \"{}\" does not have exactly one timestamp key.")
168+
169+
try:
170+
ts = list(entry)[0] # timestamp from key
171+
except KeyError:
172+
logger.error("Saved line \"{}\" has no timestamp key!".format(line))
173+
continue
174+
175+
data[ts] = entry[ts]
148176

149-
# Go through data and disect every flow saved inside the dump
177+
# Go through data and dissect every flow saved inside the dump
150178
for key in sorted(data):
151179
timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S")
152180

@@ -157,7 +185,7 @@ def service(self):
157185
pending = flow
158186
continue
159187
con = Connection(pending, flow)
160-
print("{timestamp}: {service:7} | {size:8} | {duration:9} | {src_host} ({src}) to {dest_host} ({dest})" \
188+
print("{timestamp}: {service:<10} | {size:8} | {duration:9} | {src_host} ({src}) to {dest_host} ({dest})" \
161189
.format(timestamp=timestamp, service=con.service.upper(), src_host=con.hostnames.src, src=con.src,
162190
dest_host=con.hostnames.dest, dest=con.dest, size=con.human_size, duration=con.human_duration))
163191
pending = None

main.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import argparse
1212
from collections import namedtuple
1313
import queue
14+
import gzip
1415
import json
1516
import logging
1617
import sys
@@ -20,7 +21,6 @@
2021

2122
from netflow import parse_packet, TemplateNotRecognized, UnknownNetFlowVersion
2223

23-
2424
logger = logging.getLogger(__name__)
2525

2626
# Amount of time to wait before dropping an undecodable ExportPacket
@@ -122,7 +122,7 @@ def run(self):
122122
else:
123123
to_retry.append(pkt)
124124
logger.debug("Failed to decode a v9 ExportPacket - will "
125-
"re-attempt when a new template is discovered")
125+
"re-attempt when a new template is discovered")
126126
continue
127127

128128
logger.debug("Processed a v%d ExportPacket with %d flows.",
@@ -172,8 +172,8 @@ def get_export_packets(host, port):
172172
parser.add_argument("--port", "-p", type=int, default=2055,
173173
help="collector listener port")
174174
parser.add_argument("--file", "-o", type=str, dest="output_file",
175-
default="{}.json".format(int(time.time())),
176-
help="collector export JSON file")
175+
default="{}.gz".format(int(time.time())),
176+
help="collector export multiline JSON file")
177177
parser.add_argument("--debug", "-D", action="store_true",
178178
help="Enable debug output")
179179
args = parser.parse_args()
@@ -183,19 +183,26 @@ def get_export_packets(host, port):
183183
if args.debug:
184184
logger.setLevel(logging.DEBUG)
185185

186-
data = {}
187186
try:
188-
# TODO: For a long-running processes, this will consume loads of memory
187+
# With every parsed flow a new line is appended to the output file. In previous versions, this was implemented
188+
# by storing the whole data dict in memory and dumping it regularly onto disk. This was extremely fragile, as
189+
# it a) consumed a lot of memory and CPU (dropping packets since storing one flow took longer than the arrival
190+
# of the next flow) and b) broke the exported JSON file, if the collector crashed during the write process,
191+
# rendering all collected flows during the runtime of the collector useless (the file contained one large JSON
192+
# dict which represented the 'data' dict).
193+
194+
# In this new approach, each received flow is parsed as usual, but it gets appended to a gzipped file each time.
195+
# All in all, this improves in three aspects:
196+
# 1. collected flow data is not stored in memory any more
197+
# 2. received and parsed flows are persisted reliably
198+
# 3. the disk usage of files with JSON and its full strings as keys is reduced by using gzipped files
199+
# This also means that the files have to be handled differently, because they are gzipped and not formatted as
200+
# one single big JSON dump, but rather many little JSON dumps, separated by line breaks.
189201
for ts, export in get_export_packets(args.host, args.port):
190-
data[ts] = [flow.data for flow in export.flows]
202+
entry = {ts: [flow.data for flow in export.flows]}
203+
line = json.dumps(entry).encode() + b"\n" # byte encoded line
204+
with gzip.open(args.output_file, "ab") as fh: # open as append, not reading the whole file
205+
fh.write(line)
191206
except KeyboardInterrupt:
192207
logger.info("Received KeyboardInterrupt, passing through")
193208
pass
194-
195-
if data:
196-
# TODO: this should be done periodically to not lose any data (only saved in memory)
197-
logger.info("Outputting collected data to '%s'", args.output_file)
198-
with open(args.output_file, 'w') as f:
199-
json.dump(data, f)
200-
else:
201-
logger.info("No data collected")

0 commit comments

Comments
 (0)