diff --git a/CHANGELOG.md b/CHANGELOG.md index 12c2461..cf9a752 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,23 @@ # Changelog All notable changes to this project will be documented in this file. + ## [Unreleased] - Add `movsq` opcode support (@ddash-ct) +- Added utility functions for analyzing strings: + - `find_user_strings()` + - `find_api_resolve_strings()` + - `is_code_string()` + - `is_library_string()` + - `detect_encoding()` + - `force_to_string()` +- Added better support for operands with segment registers (fs/gs) +- Fixed default data type to be an 8 byte qword when forcing extra arguments on a 64bit sample. +- Added `default_data_type` argument on `get_function_args()`/`get_function_arg_values()`/`get_function_signatures()` to change the data type used when forcing extra arguments. (Data type given should be valid for the underlying disassembler.) +- Add support for `wsprintfW` call hook. +- Added `FunctionArgument.location` property, which provides the location of the argument. (stack offset, register, etc.) +- Added `disable_all()` and `enable()` to emulator instances which simulates a whitelist for opcode/function hooks. +- Fixed bug in `idiv` opcode emulation. ## [0.6.1] - 2022-12-20 diff --git a/README.md b/README.md index bc58949..f0c3d24 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ The following utilities are included with Rugosa: - [Emulation](./docs/CPUEmulation.md) - [Extra Disssembly Interfaces](./rugosa/disassembly.py) - [Regex](./docs/Regex.md) -- String Management - *TODO* +- [Strings](./rugosa/strings.py) - [YARA](./docs/YARA.md) diff --git a/rugosa/emulation/call_hooks/stdlib/libc.py b/rugosa/emulation/call_hooks/stdlib/libc.py index 67f1a32..4e2c1cc 100644 --- a/rugosa/emulation/call_hooks/stdlib/libc.py +++ b/rugosa/emulation/call_hooks/stdlib/libc.py @@ -2,12 +2,17 @@ Common standard C library builtin functions. """ +from __future__ import annotations import logging import re +from typing import TYPE_CHECKING from ... import constants from ...call_hooks import builtin_func +if TYPE_CHECKING: + from rugosa.emulation.cpu_context import ProcessorContext + logger = logging.getLogger(__name__) @@ -440,18 +445,22 @@ def strstr(cpu_context, func_name, func_args): return str1_ptr + offset -def _format_string(ctx, fmt, required_args, string_type=constants.STRING): +def _format_string(ctx: ProcessorContext, fmt: str, required_args: int, wide: bool = False) -> str: """ Handles formatting the string with the function arguments based on the format. :param ctx: cpu_context object :param fmt: format string :param required_args: num of required arguments for the particular format function to skip + :param wide: Whether the strings are wide + + :return: The formatted string """ + # TODO: parse string instead of bytes # Format using best attempt here. Basically, locate all the format specifiers, and convert them to a python # supported format string. For each format string, extract the appropriate data from the context, and append it to # the values list. - fmt_val_re = re.compile(br""" + fmt_val_re = re.compile(r""" % # start with percent character [-+ #0]{0,1} # optional flag character (\*|[0-9]{1,}){0,} # optional width specifier, though mutually exclusive (either a number or *, not both) @@ -464,68 +473,62 @@ def _format_string(ctx, fmt, required_args, string_type=constants.STRING): logger.debug("Format vals: %r", fmt_vals) # Re-pull function arguments with correct number of arguments. - func_sig = ctx.get_function_signature() - for _ in range(len(func_sig.arguments) - required_args): - func_sig.remove_argument(-1) - # For an unknown reason, int is not always being read as a QWORD on 64-bit, so this line - # forces the issue to ensure pointer addresses aren't being truncated to 32 bits - data_type = "qword" if ctx.bitness == 64 else "dword" - for _ in range(len(fmt_vals)): - func_sig.add_argument(data_type) - func_args = [arg.value for arg in func_sig.arguments] + func_args = ctx.get_function_arg_values(num_args=required_args + len(fmt_vals)) format_vals = [] arg_pos = required_args # skip destination and format string for match in fmt_vals: - if b"*" in match: + value = func_args[arg_pos] + + if "*" in match: # Indicates that one of the parameters is a width, which must be pulled and added to the list first - format_vals.append(func_args[arg_pos]) arg_pos += 1 - if match.endswith(b"c"): # character (will this be the value or a read from the context??? - arg_val = func_args[arg_pos] - if arg_val <= 0xFF: # assume that the argument contains the character - format_vals.append(arg_val) + if match.endswith("c"): # character (will this be the value or a read from the context??? + if value <= 0xFF: # assume that the argument contains the character + value = chr(value) else: # assume it's a pointer that must be dereferenced - format_vals.append(ctx.memory.read_data(arg_val, size=1)) + value = chr(ctx.memory.read_data(value, size=1)) - elif match.endswith(b"s"): # string value, should be a pointer - _arg = ctx.memory.read_data(func_args[arg_pos], data_type=string_type) - if not len(_arg): # If the argument isn't set during parsing, preserve the formatting + elif match.endswith("s"): # string value, should be a pointer + value = ctx.memory.read_string(value, wide=wide) + if not value: # If the argument isn't set during parsing, preserve the formatting logger.debug("Pulled 0 byte format string, reverting") - _arg = b"%s" - format_vals.append(_arg) + value = "%s" - else: # all other numerical types??? - format_vals.append(func_args[arg_pos]) + # all other numerical types??? + format_vals.append(value) arg_pos += 1 - result = fmt % tuple(format_vals) + format_vals = tuple(format_vals) + result = fmt % format_vals + logger.debug(f"Formatted string: {fmt!r} % {format_vals!r} -> {result!r}") return result -@builtin_func +@builtin_func("sprintf") +@builtin_func("wsprintfW") # TODO: technically from winuser.h def sprintf(ctx, func_name, func_args): """ Format a string based on provided format string and parameters. + int sprintf (char *s, const char *format, ...); + For sprintf, there's no way to know up front how many args are needed, but there should always be at least 2 (destination and format). We can use the format string to determine how many arguments we need by counting the format specifiers. """ - # Almost guaranteed to get the incorrect number of args. So obtain the format string and count the number of - # format specifiers to determine how many args we need, not including the first 2 + wide = func_name.endswith("W") if len(func_args) < 2: # Ensure that there are at least 2 arguments, dest and format - # Need to try to get at least 2 arguments... func_args = ctx.get_function_arg_values(num_args=2) - dest = func_args[0] - fmt = ctx.memory.read_data(func_args[1]) + dest, fmt_ptr, *_ = func_args + fmt = ctx.memory.read_string(fmt_ptr, wide=wide) logger.debug("Format string: %s", fmt) - result = _format_string(ctx, fmt, 2) + result = _format_string(ctx, fmt, 2, wide) logger.debug("Writing formatted value %s to 0x%X", result, dest) - ctx.memory.write(dest, result + b"\0") + ctx.memory.write_string(dest, result + "\0", wide=wide) return len(result) @@ -539,16 +542,15 @@ def snprintf(ctx, func_name, func_args): Format a string using the provided format string and values, truncated if necessary to length n. """ wide = func_name.startswith("sw") - string_type = constants.WIDE_STRING if wide else constants.STRING if len(func_args) < 3: func_args = ctx.get_function_arg_values(num_args=3) - dest, n = func_args[:2] - fmt = ctx.memory.read_data(func_args[2], data_type=string_type) + dest, n, fmt_ptr, *_ = func_args + fmt = ctx.memory.read_string(fmt_ptr, wide=wide) logger.debug("Format string: %s", fmt) - result = _format_string(ctx, fmt, 3, string_type) - logger.debug("Writing formatted value %s to 0x%X", result[:n - 1], dest) - ctx.memory.write(dest, result[:n - 1] + b"\0") + result = _format_string(ctx, fmt, 3, wide)[:n - 1] + logger.debug("Writing formatted value %s to 0x%X", result, dest) + ctx.memory.write_string(dest, result + "\0", wide=wide) return len(result) @@ -565,9 +567,10 @@ def printf(ctx, func_name, func_args): if len(func_args) < 1: func_args = ctx.get_function_arg_values(num_args=1) - fmt = ctx.memory.read_data(func_args[0]) + fmt_ptr, *_ = func_args + fmt = ctx.memory.read_string(fmt_ptr) logger.debug("Format string: %s", fmt) result = _format_string(ctx, fmt, 1) logger.debug("Writing formatted value %s to stdout", result) - ctx.stdout += result.decode() + ctx.stdout += result return len(result) diff --git a/rugosa/emulation/cpu_context.py b/rugosa/emulation/cpu_context.py index 348e2b7..62aaa6c 100644 --- a/rugosa/emulation/cpu_context.py +++ b/rugosa/emulation/cpu_context.py @@ -484,7 +484,9 @@ def get_original_location(self, addr): else: return ip, None - def get_function_signature(self, func_ea=None, num_args=None) -> Optional[FunctionSignature]: + def get_function_signature( + self, func_ea: int = None, num_args: int = None, default_data_type: str = None + ) -> Optional[FunctionSignature]: """ Returns the function signature of the given func_ea with argument values pulled from this context. @@ -493,13 +495,14 @@ def get_function_signature(self, func_ea=None, num_args=None) -> Optional[Functi The first operand is used if not provided. (helpful for a "call" instruction) :param int num_args: Force a specific number of arguments in the signature. If not provided, number of arguments is determined by the disassembler. - Extra arguments not defined by the disassembler are assumed to be 'int' type. + Extra arguments not defined by the disassembler are assumed to be the default_data_type. Avoid using num_args and adjust the returned FunctionSignature manually if more customization is needed. - (NOTE: The function signature will be forced on failure if this is set.) WARNING: Setting the number of arguments will permanently change the signature on the backend disassembler. + :param str default_data_type: The default data type to use when forcing extra arguments. + (Defaults to "dword" for 32-bit or "qword" for 64-bit) :return: FunctionSignature object or None if not applicable @@ -521,21 +524,23 @@ def get_function_signature(self, func_ea=None, num_args=None) -> Optional[Functi if num_args is not None: if num_args < 0: raise ValueError("num_args is negative") + arguments = signature.arguments if len(arguments) > num_args: - # TODO: Instead of removing arugments, can we just not pull them all? for _ in range(len(arguments) - num_args): signature.remove_argument(-1) elif len(arguments) < num_args: - # TODO: Is there a way to just see what the argument location would be - # without having to modify the function signature? + if not default_data_type: + default_data_type = "qword" if self.bitness == 64 else "dword" for _ in range(num_args - len(arguments)): - signature.add_argument("int") + signature.add_argument(default_data_type) return signature - def get_function_args(self, func_ea=None, num_args=None) -> List[FunctionArgument]: + def get_function_args( + self, func_ea: int = None, num_args: int = None, default_data_type: str = None + ) -> List[FunctionArgument]: """ Returns the FunctionArg objects for this context based on the given function. @@ -550,20 +555,24 @@ def get_function_args(self, func_ea=None, num_args=None) -> List[FunctionArgumen Use get_function_signature() and adjust the FunctionSignature manually if more customization is needed. (NOTE: The function signature will be forced on failure if this is set.) + :param str default_data_type: The default data type to use when forcing extra arguments. + (Defaults to "dword" for 32-bit or "qword" for 64-bit) :returns: list of FunctionArg objects """ - func_sig = self.get_function_signature(func_ea, num_args=num_args) + func_sig = self.get_function_signature(func_ea, num_args=num_args, default_data_type=default_data_type) if not func_sig: return [] return func_sig.arguments - def get_function_arg_values(self, func_ea=None, num_args=None) -> List[int]: + def get_function_arg_values( + self, func_ea: int = None, num_args: int = None, default_data_type: str = None + ) -> List[int]: """ Returns the FunctionArg values for this context based on the given function. """ - return [arg.value for arg in self.get_function_args(func_ea=func_ea, num_args=num_args)] + return [arg.value for arg in self.get_function_args(func_ea=func_ea, num_args=num_args, default_data_type=default_data_type)] @property def function_args(self) -> List[FunctionArgument]: diff --git a/rugosa/emulation/emulator.py b/rugosa/emulation/emulator.py index e8bc2ed..74ffbbd 100644 --- a/rugosa/emulation/emulator.py +++ b/rugosa/emulation/emulator.py @@ -102,6 +102,28 @@ def clear_cache(self): self._flowchart_cache.clear() self._memory_cache.clear() + def enable(self, *names: str): + """ + Enables the use of a specific opcode or function hook. + + The hooks enabled are pulled from the default implementation + of opcodes/functions and will overwrite any custom hook currently in place. + + :param name: Name(s) of opcode/function hook. + NOTE: All the "rep*" opcodes will be enabled if the name is "rep". + """ + for name in names: + name = name.lower() + + if name in self._context_class.OPCODES: + self._opcode_hooks[name] = self._context_class.OPCODES[name] + elif name in call_hooks.BUILTINS: + self._call_hooks[name] = call_hooks.BUILTINS[name] + elif name.startswith("rep"): + self.disabled_rep = False + else: + raise ValueError(f'Opcode/function hook named "{name}" not found.') + def disable(self, name: str): """ Disables the use of a specific opcode or function hook. @@ -125,6 +147,30 @@ def disable(self, name: str): elif name.startswith("rep"): self.disabled_rep = True + def disable_all(self, disable_function_hooks: bool = False): + """ + Disables all opcode hooks for the current emulator instance. + This is meant to be used when only a small number of opcodes need to + be emulated and can greatly help speed up emulation. + + This removes all opcode hooks currently in place, only function hooks + will be emulated. + Function hooks can be disabled as well if desired. + + Enabling specific hooks can be done with :func:`~emulator.Emulator.enable`. + + The simplest way to have all the disabled hooks enabled again is to + create a new emulator instance. + """ + self._opcode_hooks = {} + self._instruction_hooks = collections.defaultdict(list) + self.disabled_rep = True + logger.debug("All opcode/instruction hooks disabled") + + if disable_function_hooks: + self._call_hooks = {} + logger.debug("All function hooks disabled") + def new_context(self) -> ProcessorContext: return self._context_class(self) @@ -204,7 +250,7 @@ def emulate_call(self, name_or_start_ea, call_depth: int = 0): if isinstance(name_or_start_ea, str): name = name_or_start_ea # NOTE: Using from_name because we need to be sure there is actual instructions to emulate. - func = func_utils.from_name(self.disassembler, name) + func = self.disassembler.get_function_by_name(name) func_address = func.start else: func_address = name_or_start_ea @@ -309,7 +355,7 @@ def execute_function(self, address_or_name: Union[int, str], call_depth: int = 0 """ if isinstance(address_or_name, str): name = address_or_name - func = func_utils.from_name(self.disassembler, name, ignore_underscore=True) + func = self.disassembler.get_function_by_name(name) else: address = address_or_name func = self.disassembler.get_function(address) diff --git a/rugosa/emulation/functions.py b/rugosa/emulation/functions.py index 93dc784..5226580 100644 --- a/rugosa/emulation/functions.py +++ b/rugosa/emulation/functions.py @@ -11,7 +11,7 @@ from dragodis import Disassembler, NotExistError from dragodis.interface import ( StackLocation, RelativeRegisterLocation, StaticLocation, RegisterLocation, - RegisterPairLocation + RegisterPairLocation, ArgumentLocation ) from . import utils @@ -142,6 +142,10 @@ def name(self) -> str: def name(self, value: str): self._parameter.name = value + @property + def location(self) -> ArgumentLocation: + return self._parameter.location + @property def type(self) -> str: """User friendly type name.""" @@ -163,13 +167,13 @@ def declaration(self): @property def is_stack(self): """True if argument is on the stack.""" - return isinstance(self._parameter.location, StackLocation) + return isinstance(self.location, StackLocation) # TODO: Refactor to be more processor agnostic @property def addr(self): """Retrieves the address of the argument (if a memory/stack address)""" - location = self._parameter.location + location = self.location disassembler: Disassembler = self._cpu_context.emulator.disassembler if isinstance(location, StackLocation): @@ -208,7 +212,7 @@ def addr(self): def value(self): """Retrieves the value of the argument based on the cpu context.""" # TODO: Pull value data based on type. - location = self._parameter.location + location = self.location # On Stack if isinstance(location, StackLocation): @@ -253,7 +257,7 @@ def value(self): def value(self, value): """Sets the value of the argument to the cpu context.""" # TODO: Pull value data based on type. - location = self._parameter.location + location = self.location # On Stack if isinstance(location, StackLocation): diff --git a/rugosa/emulation/x86_64/opcodes.py b/rugosa/emulation/x86_64/opcodes.py index b6bd98c..774ca05 100644 --- a/rugosa/emulation/x86_64/opcodes.py +++ b/rugosa/emulation/x86_64/opcodes.py @@ -434,27 +434,37 @@ def IDIV(cpu_context: ProcessorContext, instruction: Instruction): RDX_REG_SIZE_MAP = {8: "rdx", 4: "edx", 2: "dx"} operands = instruction.operands - width = operands[0].width - divisor = utils.signed(operands[0].value, width) + # Need to obtain the width of the divisor, to determine the width of the dividend + width = operands[-1].width + divisor = utils.signed(operands[-1].value, width) if divisor == 0: logger.debug("DIV / 0") return - rax_str = RAX_REG_SIZE_MAP[width] - dividend = utils.signed(cpu_context.registers[rax_str], width) + if width == 1: + # When dividing by a 8-bit value, use AX + dividend = utils.signed(cpu_context.registers.ax, 2) + result_reg = "al" + remainder_reg = "ah" + + else: + # When dividing by 16-bits -> combine DX:AX + # When dividing by 32-bits -> combine EDX:EAX + # When dividing by 64-bits -> combine RDX:RAX + rax_str = RAX_REG_SIZE_MAP[width] + rdx_str = RDX_REG_SIZE_MAP[width] + dividend = utils.signed( + (cpu_context.registers[rdx_str] << (width * 8)) | cpu_context.registers[rax_str], + width * 2 + ) + result_reg = rax_str + remainder_reg = rdx_str result = int(dividend / divisor) & utils.get_mask(width) - # TODO: Ideally we would be able to just use result here instead of recalculating. We need to test if - # we can do that without introducing errors and make the change if so. remainder = (dividend - (int(dividend / divisor) * divisor)) & utils.get_mask(width) logger.debug("0x%X / 0x%X = 0x%X", dividend, divisor, result) - if width == 1: - cpu_context.registers.al = result - cpu_context.registers.ah = remainder - else: - rdx_str = RDX_REG_SIZE_MAP[width] - cpu_context.registers[rax_str] = result - cpu_context.registers[rdx_str] = remainder + cpu_context.registers[result_reg] = result + cpu_context.registers[remainder_reg] = remainder @opcode diff --git a/rugosa/emulation/x86_64/operands.py b/rugosa/emulation/x86_64/operands.py index c53fb32..9fb2190 100644 --- a/rugosa/emulation/x86_64/operands.py +++ b/rugosa/emulation/x86_64/operands.py @@ -6,7 +6,7 @@ from typing import Optional from dragodis import OperandType -from dragodis.interface import Phrase, MemoryReference, Immediate +from dragodis.interface import Phrase, MemoryReference, Immediate, Register from .. import utils from ..operands import Operand @@ -80,15 +80,14 @@ def offset(self) -> Optional[int]: e.g. [ebp+ecx*2+8] -> 8 + fs:[eax] -> eax """ phrase = self._operand.value if not isinstance(phrase, Phrase): return None - # NOTE: Offset shouldn't be a register for x86. - # If we get a register, we have a bug. So let it explode. offset = phrase.offset - if not isinstance(offset, int): - raise ValueError(f"Expected an integer offset. Got {type(offset)} for {offset}") + if isinstance(offset, Register): + offset = self._cpu_context.registers[offset.name] return offset @property @@ -136,15 +135,6 @@ def base_addr(self): @property def value(self): - # TODO: Determine if this is still necessary. - # FS, GS (at least) registers are identified as memory addresses. We need to identify them as registers - # and handle them as such - if self.type == OperandType.memory: - if "fs" in self.text: - return self._cpu_context.registers.fs - elif "gs" in self.text: - return self._cpu_context.registers.gs - return super().value @value.setter @@ -154,17 +144,6 @@ def value(self, value): except TypeError: logger.debug("%r -> %s", value, self.text) - # TODO: Determine if this is still necessary. - # FS, GS (at least) registers are identified as memory addresses. We need to identify them as registers - # and handle them as such - if self.type == OperandType.memory: - if "fs" in self.text: - self._cpu_context.registers.fs = value - return - elif "gs" in self.text: - self._cpu_context.registers.gs = value - return - # On 64-bit, the destination register must be set to 0 first (per documentation) # TODO: Check if this happens regardless of the source size if ( diff --git a/rugosa/func_utils.py b/rugosa/func_utils.py index e0ee1de..eab61c9 100644 --- a/rugosa/func_utils.py +++ b/rugosa/func_utils.py @@ -2,6 +2,7 @@ Helper utilities for functions. """ import collections +import warnings import dragodis from dragodis.interface import Function @@ -13,28 +14,14 @@ def from_name(dis: dragodis.Disassembler, func_name: str, ignore_underscore: bool = False) -> Function: - """ - Factory method for obtaining a Function by name. - - e.g. - with dragodis.open_program("input.exe") as dis: - func = functions.from_name(dis, "WriteFile") - - :param dis: Dragodis disassembler - :param str func_name: Name of function to obtain - :param bool ignore_underscore: Whether to ignore underscores in function name. - (Will return the first found function if enabled.) - - :return: Function object - :raises ValueError: If function name was not found. - """ - for func in dis.functions(): - _func_name = func.name - if ignore_underscore: - _func_name = _func_name.strip("_") - if func_name == _func_name: - return func - raise ValueError(f"Unable to find function with name: {func_name}") + warnings.warn( + "func_utils.from_name() is deprecated. Please use Disassembler.get_function_by_name() instead.", + DeprecationWarning + ) + try: + return dis.get_function_by_name(func_name, ignore_underscore=ignore_underscore) + except dragodis.NotExistError as e: + raise ValueError(str(e)) @property diff --git a/rugosa/ghidra_plugin/components/function_arguments.py b/rugosa/ghidra_plugin/components/function_arguments.py index 5c61db1..970405f 100644 --- a/rugosa/ghidra_plugin/components/function_arguments.py +++ b/rugosa/ghidra_plugin/components/function_arguments.py @@ -92,7 +92,7 @@ def _populate_table(self, arguments): for arg in arguments: row = [""] * len(self.HEADERS) row[0] = str(arg.ordinal) - row[1] = str(arg._parameter.location) + row[1] = str(arg.location) row[2] = arg.type row[3] = str(arg.width) row[4] = arg.name diff --git a/rugosa/ida_plugin/components/function_arguments.py b/rugosa/ida_plugin/components/function_arguments.py index 0d8bd0b..6906809 100644 --- a/rugosa/ida_plugin/components/function_arguments.py +++ b/rugosa/ida_plugin/components/function_arguments.py @@ -59,7 +59,7 @@ def _populate_table(self, arguments): table.setRowCount(len(arguments)) for index, arg in enumerate(arguments): table.setItem(index, 0, QtWidgets.QTableWidgetItem(str(arg.ordinal))) - table.setItem(index, 1, QtWidgets.QTableWidgetItem(str(arg._parameter.location))) + table.setItem(index, 1, QtWidgets.QTableWidgetItem(str(arg.location))) table.setItem(index, 2, QtWidgets.QTableWidgetItem(arg.type)) table.setItem(index, 3, QtWidgets.QTableWidgetItem(str(arg.width))) table.setItem(index, 4, QtWidgets.QTableWidgetItem(arg.name)) diff --git a/rugosa/strings.py b/rugosa/strings.py index 951290f..aa72b83 100644 --- a/rugosa/strings.py +++ b/rugosa/strings.py @@ -4,10 +4,13 @@ from __future__ import annotations import logging import re +from string import printable as printable_chars import sys from typing import Iterable, Tuple, Union import dragodis +from rugosa.emulation import Emulator + logger = logging.getLogger(__name__) @@ -22,7 +25,7 @@ "koi8-r", "iso8859-5", "cp1251", "mac-cyrillic", # Cyrillic (cp866, cp855 omitted) "cp949", # Korean (johab, iso2022-kr omitted) "iso8859-6", "cp1256", # Arabic (cp864, cp720 omitted) - "latin1", # If all else fails, latin1 is always is successful. + "latin1", # If all else fails, latin1 is always successful. ] # fmt: on @@ -119,6 +122,220 @@ def get_terminated_bytes(dis: dragodis.Disassembler, addr: int, unit_width: int return dis.get_bytes(addr, terminator_address - addr) +_api_names = [ + ("GetModuleHandleA", 0), + ("GetModuleHandleW", 0), + ("LoadLibraryA", 0), + ("LoadLibraryW", 0), + ("GetProcAddress", 1), +] + + +def find_api_resolve_strings(dis: dragodis.Disassembler) -> Iterable[Tuple[int, str]]: + """ + Finds strings used in API resolution functions (e.g. GetProcAddress) + + :param dis: Dragodis disassembler + + :yields: (address, string) for API string. + """ + seen = set() + emulator = Emulator(dis) + for api_name, arg_index in _api_names: + try: + imp = dis.get_import(api_name) + except dragodis.NotExistError: + continue + for address in imp.calls_to: + try: + ctx = emulator.context_at(address) + args = ctx.function_args + except dragodis.NotExistError as e: + logger.warning(f"Failed to emulate at 0x{address:08x}: {e}") + continue + + if len(args) <= arg_index: + continue + + ptr = args[arg_index].value + if ptr in seen: + continue + + # Only include strings actually found in sample statically. + if not dis.is_loaded(ptr): + continue + + try: + string = ctx.memory.read_string(ptr, wide=api_name.endswith("W")) + yield ptr, string + seen.add(ptr) + except UnicodeDecodeError: + continue + + +def is_library_string(dis: dragodis.Disassembler, address: int) -> bool: + """ + Attempts to determine whether the string at the given address is only used in library functions. + + :param dis: Dragodis disassembler + :param address: Address pointing to string. + :return: + """ + found_function = False + for ref in dis.references_to(address): + try: + func = dis.get_function(ref.from_address) + except dragodis.NotExistError: + continue + found_function = True + if not func.is_library: + return False + return found_function + + +def is_code_string(dis: dragodis.Disassembler, address: int, *, code_segment=None): + """ + Determines whether the string has a reference to an instruction in the code segment. + + :param dis: Dragodis disassembler + :param address: Address of the string + :param code_segment: Segment containing instruction code. + (Determined using entry point if not provided) + :return: + """ + if not code_segment: + code_segment = dis.get_segment(dis.entry_point) + return any(ref.from_address in code_segment for ref in dis.references_to(address)) + + +def find_user_strings( + dis: dragodis.Disassembler, min_length=3, ignore_api=True, ignore_library=True, printable=True, unique=False, + in_code=True, +) -> Iterable[Tuple[int, str]]: + """ + Finds user strings that are used within the code segment. + + :param dis: Dragodis disassembler + :param min_length: The minimum length to count as a string. + :param ignore_api: Whether to attempt to ignore strings used for API resolution (e.g. GetProcAddress parameters) + NOTE: This can be slow. Disable this option if performance is a concern. + :param ignore_library: Whether to ignore strings only used in library functions. + :param printable: Whether to only include strings printable as ASCII. + :param unique: Whether to only include the first instance of a string. + (ie. ignore the same string just with a different addresses) + :param in_code: Whether to only include strings referenced in the main user code. + + :yields: (address, string) + """ + seen = set() + code_segment = dis.get_segment(dis.entry_point) + api_strings = None + + for entry in dis.strings(min_length): + string = str(entry) + + if unique: + if string in seen: + continue + seen.add(string) + + # NOTE: Using string.printable set over str.isprintable() since the latter doesn't count whitespace characters like \n + if printable and not all(c in printable_chars for c in string): + continue + + if in_code and not is_code_string(dis, entry.address, code_segment=code_segment): + continue + + if ignore_library and is_library_string(dis, entry.address): + continue + + if ignore_api: + if api_strings is None: + api_strings = list(find_api_resolve_strings(dis)) + if any(address == entry.address for address, _ in api_strings): + continue + + yield entry.address, string + + +def _num_raw_bytes(string: str) -> int: + """ + Returns the number of raw bytes found in the given unicode string + """ + count = 0 + for char in string: + char = char.encode("unicode-escape") + count += char.startswith(b"\\x") + char.startswith(b"\\u") * 2 + return count + + +def detect_encoding(data: bytes, code_pages=None) -> str: + """ + Detects the best guess string encoding for the given data. + NOTE: This will default to "latin1" as a fallback. + + :param data: Data to detect encoding + :param code_pages: List of possible codecs to try. + There is a default, but feel free to provide your own. + + :returns: Decoded string and encoding used. + """ + if code_pages is None: + code_pages = CODE_PAGES + best_score = len(data) # lowest score is best + best_code_page = None + best_output = None + for code_page in code_pages: + try: + output = data.decode(code_page).rstrip(u"\x00") + except UnicodeDecodeError: + # If it's UTF we may need to strip away some null characters before decoding. + if code_page in ("utf-16-le", "utf-16-be", "utf-32-le", "utf-32-be"): + data_copy = data + while data_copy and data_copy[-1] == 0: + try: + data_copy = data_copy[:-1] + output = data_copy.decode(code_page).rstrip(u"\x00") + except UnicodeDecodeError: + continue + break # successfully decoded + else: + continue + # otherwise the code page isn't correct. + else: + continue + + score = _num_raw_bytes(output) + if not best_output or score < best_score: + best_score = score + best_output = output + best_code_page = code_page + + if best_output: + return best_code_page + + # We shouldn't hit here since "latin1" should at least hit, but just incase... + return "unicode_escape" + + +def force_to_string(data: bytes, code_pages=None) -> str: + """ + Forces given bytes into a string using best guess encoding. + + :param data: Bytes to convert to string. + :param code_pages: List of possible codecs to try. + There is a default, but feel free to provide your own. + + :return: Decoded string. + """ + if code_pages is None: + code_pages = CODE_PAGES + try: + return data.decode(detect_encoding(data, code_pages=code_pages)) + except UnicodeDecodeError: + return data.decode("latin1") + + class DecodedString: """ Holds information about a decoded/decrypted string. @@ -144,7 +361,7 @@ def __init__( ): self.data = dec_data self.enc_data = enc_data - self.encoding = encoding or self.detect_encoding(dec_data) + self.encoding = encoding or detect_encoding(dec_data) self.enc_source = enc_source self.dec_source = dec_source @@ -157,57 +374,6 @@ def __str__(self): def __bytes__(self): return self.data - def _num_raw_bytes(self, string: str) -> int: - """ - Returns the number of raw bytes found in the given unicode string - """ - count = 0 - for char in string: - char = char.encode("unicode-escape") - count += char.startswith(b"\\x") + char.startswith(b"\\u") * 2 - return count - - def detect_encoding(self, data: bytes) -> str: - """ - Detects and decodes data using best guess encoding. - - :returns: Decoded string and encoding used. - """ - best_score = len(data) # lowest score is best - best_code_page = None - best_output = None - for code_page in CODE_PAGES: - try: - output = data.decode(code_page).rstrip(u"\x00") - except UnicodeDecodeError: - # If it's UTF we may need to strip away some null characters before decoding. - if code_page in ("utf-16-le", "utf-16-be", "utf-32-le", "utf-32-be"): - data_copy = data - while data_copy and data_copy[-1] == 0: - try: - data_copy = data_copy[:-1] - output = data_copy.decode(code_page).rstrip(u"\x00") - except UnicodeDecodeError: - continue - break # successfully decoded - else: - continue - # otherwise the code page isn't correct. - else: - continue - - score = self._num_raw_bytes(output) - if not best_output or score < best_score: - best_score = score - best_output = output - best_code_page = code_page - - if best_output: - return best_code_page - - # We shouldn't hit here since "latin1" should at least hit, but just incase... - return "unicode_escape" - @property def display_name(self) -> str: """Returns a disassembler friendly, printable name for the decoded string.""" diff --git a/setup.cfg b/setup.cfg index 8670e99..1d577e1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,7 +20,7 @@ include_package_data = True packages = find: python_requires = >=3.8 install_requires = - dragodis>=0.5.2 + dragodis>=0.7.0 click hexdump pyhidra>=0.3.0 diff --git a/tests/test_emulation/test_cpu_context.py b/tests/test_emulation/test_cpu_context.py index ed54f7a..402d71f 100644 --- a/tests/test_emulation/test_cpu_context.py +++ b/tests/test_emulation/test_cpu_context.py @@ -76,12 +76,12 @@ def test_cpu_context_x86(disassembler): # Test variables data_ptr = operands[0].addr assert sorted(context.variables.names) in ( - ["arg_0", "arg_4"], + ["a1", "a2"], ["param_1", "param_2"], ) assert data_ptr in context.variables var = context.variables[data_ptr] - assert var.name in ("arg_0", "param_1") + assert var.name in ("a1", "param_1") assert not var.history assert var.size == 4 assert var.data_type in ("int", "byte *") diff --git a/tests/test_strings.py b/tests/test_strings.py index 178d53b..59b10fc 100644 --- a/tests/test_strings.py +++ b/tests/test_strings.py @@ -1,3 +1,7 @@ +import pytest + +import rugosa + def test_find_string_data(): from rugosa.strings import find_string_data @@ -32,3 +36,76 @@ def test_find_string_data(): (4, b"hello\x00", "utf-8"), (10, b"world\x00", "utf-8"), ] + + +def test_force_to_string(): + assert rugosa.force_to_string(b"hello") == "hello" + assert rugosa.force_to_string("hello".encode("utf-16-be")) == "hello" + assert rugosa.force_to_string(b"\x4f\xdf\xc6\x4a\xbe\x0a\xff\x76") == "OßÆJ¾\nÿv" + + +USER_STRINGS = { + (0x40c000, 'Idmmn!Vnsme '), + (0x40c010, 'Vgqv"qvpkle"ukvj"ig{"2z20'), + (0x40c02c, 'Wkf#rvj`h#aqltm#el{#ivnsp#lufq#wkf#obyz#gld-'), + (0x40c05c, 'Keo$mw$wpvkjc$ej`$ehwk$cmraw$wle`a*'), + (0x40c080, 'Dfla%gpwkv%mji`v%lk%rjji%fijqm+'), + (0x40c0a0, 'Egru&ghb&biau&cgen&ngrc&rnc&irnct('), + # (0x40c0c4, '\cv}3g{v3pargv3qfg3w|}4g3qavrx3g{v3t'), # TODO: Ghidra fails to find this one. + (0x40c114, '+()./,-"#*'), + (0x40c120, '`QFBWFsQL@FPPb'), + (0x40c130, 'tSUdFS'), + # (0x40c140, '-",5 , v,tr4v,trv4t,v'), # TODO: Ghidra fails to find this one. + (0x40c15c, '@AKJDGBA@KJGDBJKAGDC'), + (0x40c1f8, 'LMFOGHKNLMGFOHKFGNLKHNMLOKGNKGHFGLHKGLMHKGOFNMLHKGFNLMJNMLIJFGNMLOJIMLNGFJHNM'), +} + + +API_RESOLVE_STRINGS = { + (0x40a838, 'KERNEL32.DLL'), + (0x40a1d4, 'mscoree.dll'), + (0x40a9d0, 'USER32.DLL'), + (0x40a1c4, 'CorExitProcess'), + (0x40a828, 'EncodePointer'), + (0x40a854, 'DecodePointer'), + (0x40a884, 'FlsAlloc'), + (0x40a878, 'FlsGetValue'), + (0x40a86c, 'FlsSetValue'), + (0x40a864, 'FlsFree'), + (0x40a9c4, 'MessageBoxA'), + (0x40a9b4, 'GetActiveWindow'), + (0x40a9a0, 'GetLastActivePopup'), + (0x40a984, 'GetUserObjectInformationA'), + (0x40a96c, 'GetProcessWindowStation'), +} + + +def test_find_user_strings(disassembler): + strings = list(rugosa.find_user_strings(disassembler, unique=True)) + print("\n".join(f"{hex(address)}: '{string}'" for address, string in strings)) + assert set(strings) >= USER_STRINGS + # While we could have extras based on disassembler, make sure we are somewhat on target. + assert len(strings) == pytest.approx(len(USER_STRINGS), abs=3) + assert set(strings).isdisjoint(API_RESOLVE_STRINGS) + + strings = list(rugosa.find_user_strings(disassembler, ignore_api=False, ignore_library=False)) + print("\n".join(f"{hex(address)}: '{string}'" for address, string in strings)) + assert set(strings) >= (USER_STRINGS | API_RESOLVE_STRINGS) + + +def test_find_api_resolve_strings(disassembler): + strings = list(rugosa.find_api_resolve_strings(disassembler)) + print("\n".join(f"{hex(address)}: '{string}'" for address, string in strings)) + assert set(strings) == API_RESOLVE_STRINGS + + +def test_is_library_string(disassembler): + assert rugosa.is_library_string(disassembler, 0x40A838) + assert rugosa.is_library_string(disassembler, 0x40A884) + assert not rugosa.is_library_string(disassembler, 0x40C000) + + +def test_is_code_string(disassembler): + assert rugosa.is_code_string(disassembler, 0x40A838) + assert rugosa.is_code_string(disassembler, 0x40C000) + assert not rugosa.is_code_string(disassembler, 0x40B2D4)