Skip to content

Commit

Permalink
Wire up some more
Browse files Browse the repository at this point in the history
  • Loading branch information
oleavr committed Jul 17, 2024
1 parent 040aeb6 commit 4b76cd0
Showing 1 changed file with 62 additions and 26 deletions.
88 changes: 62 additions & 26 deletions frida_tools/lsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ def main() -> None:

import frida
from prompt_toolkit.application import Application
from prompt_toolkit.eventloop import call_soon_threadsafe
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout.containers import HSplit, VSplit
from prompt_toolkit.layout.layout import Layout
Expand All @@ -13,21 +12,21 @@ def main() -> None:
from frida_tools.application import ConsoleApplication
from frida_tools.reactor import Reactor


class LSDApplication(ConsoleApplication):
def __init__(self) -> None:
super().__init__(self._process_input, self._on_stop)
self._ui_app = None
self._pending_labels = set()
self._spinner_frames = ["v", "<", "^", ">"]
self._spinner_offset = 0
self._lock = threading.Lock()

def _usage(self) -> str:
return "%(prog)s [options]"

def _needs_device(self) -> bool:
return False

def _start(self) -> None:
pass

def _process_input(self, reactor: Reactor) -> None:
try:
devices = frida.enumerate_devices()
Expand All @@ -36,8 +35,13 @@ def _process_input(self, reactor: Reactor) -> None:
self._exit(1)
return

self._ui_app = Application(full_screen=False)
self._ui_app.output.show_cursor = lambda: None
bindings = KeyBindings()

@bindings.add("<any>")
def _(event):
self._reactor.io_cancellable.cancel()

self._ui_app = Application(key_bindings=bindings, full_screen=False)

id_rows = []
type_rows = []
Expand All @@ -49,41 +53,73 @@ def _process_input(self, reactor: Reactor) -> None:
name_rows.append(Label(device.name, dont_extend_width=True))
os_label = Label("(loading)", dont_extend_width=True)
os_rows.append(os_label)
with self._lock:
self._pending_labels.add(os_label)
worker = threading.Thread(target=self._fetch_parameters, args=(device, os_label))
worker.start()

def simulate_work():
time.sleep(1)
os_rows[0].text = "BADGER"

body = VSplit(
status_label = Label(" ")
body = HSplit(
[
HSplit([Label("Id", dont_extend_width=True), HSplit(id_rows)], padding_char="-", padding=1),
HSplit([Label("Type", dont_extend_width=True), HSplit(type_rows)], padding_char="-", padding=1),
HSplit([Label("Name", dont_extend_width=True), HSplit(name_rows)], padding_char="-", padding=1),
HSplit([Label("OS", dont_extend_width=True), HSplit(os_rows)], padding_char="-", padding=1),
],
padding=2,
VSplit(
[
HSplit([Label("Id", dont_extend_width=True), HSplit(id_rows)], padding_char="-", padding=1),
HSplit(
[Label("Type", dont_extend_width=True), HSplit(type_rows)], padding_char="-", padding=1
),
HSplit(
[Label("Name", dont_extend_width=True), HSplit(name_rows)], padding_char="-", padding=1
),
HSplit([Label("OS", dont_extend_width=True), HSplit(os_rows)], padding_char="-", padding=1),
],
padding=2,
),
status_label,
]
)

self._ui_app.layout = Layout(body)
self._ui_app.layout = Layout(body, focused_element=status_label)

self._reactor.schedule(self._update_progress)
self._ui_app.run()
self._ui_app._redraw()

def _on_stop(self):
if self._ui_app is not None:
self._ui_app.exit()

def _update_progress(self):
with self._lock:
if not self._pending_labels:
self._exit(0)
return

glyph = self._spinner_frames[self._spinner_offset % len(self._spinner_frames)]
self._spinner_offset += 1
for label in self._pending_labels:
label.text = glyph
self._ui_app.invalidate()

self._reactor.schedule(self._update_progress, delay=0.1)

def _fetch_parameters(self, device, os_label):
try:
params = device.query_system_parameters()
with self._reactor.io_cancellable:
params = device.query_system_parameters()
os = params["os"]
version = os.get("version")
if version is not None:
os_label.text = os["name"] + " " + version
text = os["name"] + " " + version
else:
os_label.text = os["name"]
text = os["name"]
except:
os_label.text = ""
self._ui_app.invalidate()
text = ""

def _on_stop(self) -> None:
pass # TODO
with self._lock:
os_label.text = text
self._pending_labels.remove(os_label)

self._ui_app.invalidate()

def compare_devices(a: frida.core.Device, b: frida.core.Device) -> int:
a_score = score(a)
Expand Down

0 comments on commit 4b76cd0

Please sign in to comment.