Skip to content

Commit eff99fc

Browse files
committed
Add client info to stored data
Until now, packets arriving at the collector's interface were stored by timestamp, with the exported flows in the payload. This format is now extended to also store the client's IP address and port, allowing multiple clients to export flows to the same collector instance.
1 parent 1646a52 commit eff99fc

File tree

2 files changed

+14
-12
lines changed

2 files changed

+14
-12
lines changed

analyze_json.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ def service(self):
172172
# Go through data and dissect every flow saved inside the dump
173173
for key in sorted(data):
174174
timestamp = datetime.fromtimestamp(float(key)).strftime("%Y-%m-%d %H:%M.%S")
175-
176-
flows = data[key]
175+
client = data[key]["client"]
176+
flows = data[key]["flows"]
177177
pending = None # Two flows normally appear together for duplex connection
178178
for flow in flows:
179179
if not pending:

main.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,15 @@
2626
# Amount of time to wait before dropping an undecodable ExportPacket
2727
PACKET_TIMEOUT = 60 * 60
2828

29-
# TODO: Add source IP
30-
RawPacket = namedtuple('RawPacket', ['ts', 'data'])
29+
RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data'])
3130

3231

3332
class QueuingRequestHandler(socketserver.BaseRequestHandler):
3433
def handle(self):
35-
data = self.request[0]
36-
self.server.queue.put(RawPacket(time.time(), data))
34+
data = self.request[0] # get content, [1] would be the socket
35+
self.server.queue.put(RawPacket(time.time(), self.client_address, data))
3736
logger.debug(
38-
"Received %d bytes of data from %s", len(data), self.client_address[0]
37+
"Received %d bytes of data from %s", len(data), self.client_address
3938
)
4039

4140

@@ -107,7 +106,7 @@ def run(self):
107106
while not self._shutdown.is_set():
108107
try:
109108
# 0.5s delay to limit CPU usage while waiting for new packets
110-
pkt = self.input.get(block=True, timeout=0.5)
109+
pkt: RawPacket = self.input.get(block=True, timeout=0.5)
111110
except queue.Empty:
112111
continue
113112

@@ -130,15 +129,15 @@ def run(self):
130129

131130
# If any new templates were discovered, dump the unprocessable
132131
# data back into the queue and try to decode them again
133-
if (export.header.version == 9 and export.contains_new_templates and to_retry):
132+
if export.header.version == 9 and export.contains_new_templates and to_retry:
134133
logger.debug("Received new template(s)")
135134
logger.debug("Will re-attempt to decode %d old v9 ExportPackets",
136135
len(to_retry))
137136
for p in to_retry:
138137
self.input.put(p)
139138
to_retry.clear()
140139

141-
self.output.put((pkt.ts, export))
140+
self.output.put((pkt.ts, pkt.client, export))
142141
finally:
143142
self.server.shutdown()
144143
self.server.server_close()
@@ -198,8 +197,11 @@ def get_export_packets(host, port):
198197
# 3. the disk usage of files with JSON and its full strings as keys is reduced by using gzipped files
199198
# This also means that the files have to be handled differently, because they are gzipped and not formatted as
200199
# one single big JSON dump, but rather many little JSON dumps, separated by line breaks.
201-
for ts, export in get_export_packets(args.host, args.port):
202-
entry = {ts: [flow.data for flow in export.flows]}
200+
for ts, client, export in get_export_packets(args.host, args.port):
201+
entry = {ts: {
202+
"client": client,
203+
"flows": [flow.data for flow in export.flows]}
204+
}
203205
line = json.dumps(entry).encode() + b"\n" # byte encoded line
204206
with gzip.open(args.output_file, "ab") as fh: # open as append, not reading the whole file
205207
fh.write(line)

0 commit comments

Comments
 (0)