From cc1667fd91c9f70515461959231a4fc716986764 Mon Sep 17 00:00:00 2001 From: Stefan Tatschner Date: Tue, 2 Jan 2024 13:30:03 +0100 Subject: [PATCH] fix(dumpcap): Handle dumpcap invocation for unix transports Currently there is a flaw in the plugin interface. Plugins cannot specify how dumpcap should be invoked to capture traffic. The default is ethernet traffic. There are Unix transports available which are neither CAN traffic nor ethernet traffic. In this case the wrong method `_eth_cmd()` is called which cannot create a valid CLI for dumpcap. --- src/gallia/dumpcap.py | 54 +++++++++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/src/gallia/dumpcap.py b/src/gallia/dumpcap.py index 8e3cf2d52..b4d3337c1 100644 --- a/src/gallia/dumpcap.py +++ b/src/gallia/dumpcap.py @@ -18,7 +18,13 @@ from urllib.parse import urlparse from gallia.log import get_logger -from gallia.transports import ISOTPTransport, RawCANTransport, TargetURI +from gallia.transports import ( + ISOTPTransport, + RawCANTransport, + TargetURI, + UnixLinesTransport, + UnixTransport, +) from gallia.utils import auto_int, split_host_port logger = get_logger("gallia.dumpcap") @@ -48,25 +54,39 @@ async def start( artifacts_dir: Path, ) -> Dumpcap | None: ts = int(datetime.now().timestamp()) - if target.scheme in [ISOTPTransport.SCHEME, RawCANTransport.SCHEME]: - outfile = artifacts_dir.joinpath(f"candump-{ts}.pcap.gz") - src_addr = ( - auto_int(target.qs["src_addr"][0]) if "src_addr" in target.qs else None - ) - dst_addr = ( - auto_int(target.qs["dst_addr"][0]) if "dst_addr" in target.qs else None - ) - cmd = cls._can_cmd( - target.netloc, - src_addr, - dst_addr, - ) - else: - outfile = artifacts_dir.joinpath(f"eth-{ts}.pcap.gz") - cmd = await cls._eth_cmd(target.netloc) + + match target.scheme: + case ISOTPTransport.SCHEME | RawCANTransport.SCHEME: + outfile = artifacts_dir.joinpath(f"candump-{ts}.pcap.gz") + src_addr = ( + auto_int(target.qs["src_addr"][0]) + if "src_addr" in target.qs + else None + ) + dst_addr = ( + auto_int(target.qs["dst_addr"][0]) + if "dst_addr" in target.qs + else None + ) + cmd = cls._can_cmd( + target.netloc, + src_addr, + dst_addr, + ) + # Unix domain sockets are not supported by dumpcap. + case UnixTransport.SCHEME | UnixLinesTransport.SCHEME: + return None + # There is currently no API for transport plugins to + # register a scheme and a corresponding invocation + # for dumpcap. So this match…case is best effort, + # since it defaults to ethernet. + case _: + outfile = artifacts_dir.joinpath(f"eth-{ts}.pcap.gz") + cmd = await cls._eth_cmd(target.netloc) if cmd is None: return None + cmd_str = shlex.join(cmd) try: proc = await asyncio.create_subprocess_exec(