Skip to content

Commit

Permalink
[WIP] tracer: Optimize manpage lookups
Browse files Browse the repository at this point in the history
Co-authored-by: Håvard Sørbø <[email protected]>
  • Loading branch information
oleavr and hsorbo committed Sep 9, 2024
1 parent 704131c commit 219fddf
Showing 1 changed file with 177 additions and 87 deletions.
264 changes: 177 additions & 87 deletions frida_tools/tracer.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import argparse

Check failure on line 1 in frida_tools/tracer.py

View workflow job for this annotation

GitHub Actions / isort

Imports are incorrectly sorted and/or formatted.
import binascii
import codecs
import gzip
import os
from pathlib import Path
import platform
import re
import subprocess
import time
from typing import Callable, List, Optional, Union

import frida

from frida_tools.reactor import Reactor

total_time = 0
total_successes = 0
total_failures = 0

MANPAGE_CONTROL_CHARS = re.compile(r"\.[a-zA-Z]*(\s|$)|\s?\"")
MANPAGE_FUNCTION_PROTOTYPE = re.compile(r"([a-zA-Z_]\w+)\(([^\)]+)")


def main() -> None:
import json
Expand Down Expand Up @@ -226,12 +236,12 @@ def on_trace_events(self, events) -> None:
def on_trace_handler_create(self, target, handler, source) -> None:
if self._quiet:
return
self._print('%s: Auto-generated handler at "%s"' % (target, source.replace("\\", "\\\\")))
#self._print('%s: Auto-generated handler at "%s"' % (target, source.replace("\\", "\\\\")))

def on_trace_handler_load(self, target, handler, source) -> None:
if self._quiet:
return
self._print('%s: Loaded handler at "%s"' % (target, source.replace("\\", "\\\\")))
#self._print('%s: Loaded handler at "%s"' % (target, source.replace("\\", "\\\\")))

def _get_attributes(self, thread_id):
attributes = self._attributes_by_thread_id.get(thread_id, None)
Expand Down Expand Up @@ -422,13 +432,23 @@ def _try_handle_message(self, mtype, params, data, ui) -> False:

repo = self._repository
next_id = base_id
print("")
print(">>>")
print("")
for scope in params["scopes"]:
scope_name = scope["name"]
for member_name in scope["members"]:
target = TraceTarget(next_id, flavor, scope_name, member_name)
next_id += 1
handler = repo.ensure_handler(target)
scripts.append(handler)
print("")
print("Total time:", total_time)
print("Total successes:", total_successes)
print("Total failures:", total_failures)
print("")
print("<<<")
print("")

self._script.post(response)

Expand Down Expand Up @@ -476,6 +496,7 @@ def __init__(self) -> None:
self._on_load_callback: Optional[Callable[[TraceTarget, str, str], None]] = None
self._on_update_callback: Optional[Callable[[TraceTarget, str, str], None]] = None
self._decorate = False
self._manpages = None

def ensure_handler(self, target: TraceTarget):
raise NotImplementedError("not implemented")
Expand Down Expand Up @@ -512,93 +533,11 @@ def _create_stub_handler(self, target: TraceTarget, decorate: bool) -> str:

def _create_stub_native_handler(self, target: TraceTarget, decorate: bool) -> str:
if target.flavor == "objc":
state = {"index": 2}

def objc_arg(m):
index = state["index"]
r = ":${args[%d]} " % index
state["index"] = index + 1
return r

log_str = "`" + re.sub(r":", objc_arg, target.display_name) + "`"
if log_str.endswith("} ]`"):
log_str = log_str[:-3] + "]`"
log_str = self._create_objc_logging_code(target)
elif target.flavor == "swift":
if decorate:
module_string = " [%s]" % os.path.basename(target.scope)
else:
module_string = ""
log_str = "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string}
log_str = self._create_swift_logging_code(target, decorate)
else:
for man_section in (2, 3):
args = []
try:
with open(os.devnull, "w") as devnull:
man_argv = ["man"]
if platform.system() != "Darwin":
man_argv.extend(["-E", "UTF-8"])
man_argv.extend(["-P", "col -b", str(man_section), target.name])
output = subprocess.check_output(man_argv, stderr=devnull)
match = re.search(
r"^SYNOPSIS(?:.|\n)*?((?:^.+$\n)* {5}\w+[ \*\n]*"
+ target.name
+ r"\((?:.+\,\s*?$\n)*?(?:.+\;$\n))(?:.|\n)*^DESCRIPTION",
output.decode("UTF-8", errors="replace"),
re.MULTILINE,
)
if match:
decl = match.group(1)

for argm in re.finditer(r"[\(,]\s*(.+?)\s*\b(\w+)(?=[,\)])", decl):
typ = argm.group(1)
arg = argm.group(2)
if arg == "void":
continue
if arg == "...":
args.append('", ..." +')
continue

read_ops = ""
annotate_pre = ""
annotate_post = ""

normalized_type = re.sub(r"\s+", "", typ)
if normalized_type.endswith("*restrict"):
normalized_type = normalized_type[:-8]
if normalized_type in ("char*", "constchar*"):
read_ops = ".readUtf8String()"
annotate_pre = '"'
annotate_post = '"'

arg_index = len(args)

args.append(
"%(arg_name)s=%(annotate_pre)s${args[%(arg_index)s]%(read_ops)s}%(annotate_post)s"
% {
"arg_name": arg,
"arg_index": arg_index,
"read_ops": read_ops,
"annotate_pre": annotate_pre,
"annotate_post": annotate_post,
}
)
break
except Exception:
pass

if decorate:
module_string = " [%s]" % os.path.basename(target.scope)
else:
module_string = ""

if len(args) == 0:
log_str = "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string}
else:
log_str = "`%(name)s(%(args)s)%(module_string)s`" % {
"name": target.name,
"args": ", ".join(args),
"module_string": module_string,
}
log_str = self._create_cstyle_logging_code(target, decorate)

return """\
/*
Expand Down Expand Up @@ -644,6 +583,46 @@ def objc_arg(m):
"log_str": log_str,
}

def _create_cstyle_logging_code(self, target: TraceTarget, decorate: bool) -> str:
if decorate:
module_string = " [%s]" % os.path.basename(target.scope)
else:
module_string = ""

args = self._generate_cstyle_argument_logging_code(target)
if len(args) == 0:
code = "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string}
else:
code = "`%(name)s(%(args)s)%(module_string)s`" % {
"name": target.name,
"args": ", ".join(args),
"module_string": module_string,
}

return code

def _create_objc_logging_code(self, target: TraceTarget) -> str:
state = {"index": 2}

def objc_arg(m):
index = state["index"]
r = ":${args[%d]} " % index
state["index"] = index + 1
return r

code = "`" + re.sub(r":", objc_arg, target.display_name) + "`"
if code.endswith("} ]`"):
code = code[:-3] + "]`"

return code

def _create_swift_logging_code(self, target: TraceTarget, decorate: bool) -> str:
if decorate:
module_string = " [%s]" % os.path.basename(target.scope)
else:
module_string = ""
return "'%(name)s()%(module_string)s'" % {"name": target.name, "module_string": module_string}

def _create_stub_java_handler(self, target: TraceTarget, decorate) -> str:
return """\
/*
Expand Down Expand Up @@ -685,6 +664,117 @@ def _create_stub_java_handler(self, target: TraceTarget, decorate) -> str:
"display_name": target.display_name
}

def _generate_cstyle_argument_logging_code(self, target: TraceTarget) -> List[str]:
if self._manpages is None:
self._manpages = {}
try:
manroots = [Path(d) for d in subprocess.run(["manpath"],
stdout=subprocess.PIPE,
encoding="utf-8",
check=True).stdout.strip().split(":")]
for manroot in manroots:
for section in {2, 3}:
raw_section = str(section)
mandir = (manroot / f"man{section}")
if not mandir.exists():
continue
for entry in mandir.iterdir():
tokens = entry.name.split(".")
if len(tokens) < 2:
continue
if tokens[1] != raw_section:
continue
name = tokens[0]
if name in self._manpages:
continue
self._manpages[name] = (entry, section)
except Exception as e:
return []

man_entry = self._manpages.get(target.name)
if man_entry is None:
return []
man_location, man_section = man_entry

a = time.time()
try:
if man_location.suffix == ".gz":
man_file = gzip.open(man_location, "rt", encoding="utf-8", errors="replace")
else:
man_file = open(man_location, "r", encoding="utf-8", errors="replace")
with man_file:
synopsis_lines = []
found_synopsis = False
for raw_line in man_file:
line = raw_line.strip()
if not found_synopsis and line.endswith("SYNOPSIS"):
found_synopsis = True
continue
elif found_synopsis and line.endswith("DESCRIPTION"):
break
elif not found_synopsis:
continue
synopsis_lines.append(line)
raw_synopsis = "\n".join(synopsis_lines)
synopsis = MANPAGE_CONTROL_CHARS.sub("", raw_synopsis).replace("\n", " ").replace(" [", "[").replace(" ]", "]")

protom = next(m for m in MANPAGE_FUNCTION_PROTOTYPE.finditer(synopsis) if m.group(1) == target.name)
name = protom.group(1)
signature = protom.group(2)

args = []
for arg in [a.strip() for a in signature.split(",")]:
if arg == "void":
continue
if arg.startswith("..."):
args.append('...')
continue

tokens = arg.split(" ")

arg_type = "".join(tokens[:-1])

arg_name = tokens[-1]
if arg_name.endswith("]"):
arg_type += "*"
arg_name = arg_name[:arg_name.index("[")]

read_ops = ""
annotate_pre = ""
annotate_post = ""

if arg_type.endswith("*restrict"):
arg_type = arg_type[:-8]
if arg_type in ("char*", "constchar*"):
read_ops = ".readUtf8String()"
annotate_pre = '"'
annotate_post = '"'

arg_index = len(args)

args.append(
"%(arg_name)s=%(annotate_pre)s${args[%(arg_index)s]%(read_ops)s}%(annotate_post)s"
% {
"arg_name": arg_name,
"arg_index": arg_index,
"read_ops": read_ops,
"annotate_pre": annotate_pre,
"annotate_post": annotate_post,
}
)

global total_successes
total_successes += 1
return args
except Exception:
global total_failures
total_failures += 1
return []
finally:
b = time.time()
global total_time
total_time += b - a


class MemoryRepository(Repository):
def __init__(self) -> None:
Expand Down

0 comments on commit 219fddf

Please sign in to comment.