Skip to content

Commit

Permalink
refactored ips_plugins/interface.py, added IPS parser plugin for stac…
Browse files Browse the repository at this point in the history
…ks files.
  • Loading branch information
rick-slin authored and joachimmetz committed Feb 18, 2024
1 parent 72ddaf1 commit cf67c35
Show file tree
Hide file tree
Showing 10 changed files with 14,107 additions and 80 deletions.
25 changes: 24 additions & 1 deletion data/formatters/macos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ message:
- 'Application Name: {application_name}'
- 'Application Version: {application_version}'
- 'Bug Type: {bug_type}'
- 'Crash Reporter_key: {crash_reporter_key}'
- 'Device Model: {device_model}'
- 'Exception Type: {exception_type}'
- 'Incident Identifier: {incident_identifier}'
Expand All @@ -21,7 +22,29 @@ short_message:
- 'Incident Identifier :{incident_identifier}'
- 'OS version :{os_version}'
short_source: 'RecoveryLogd'
source: 'Apple Recovery Logd'
source: 'Apple Recovery IPS'
---
type: 'conditional'
data_type: 'apple:ips_stacks'
message:
- 'Bug Type: {bug_type}'
- 'Crash Reporter_key: {crash_reporter_key}'
- 'Device Model: {device_model}'
- 'Event Time: {event_time}'
- 'Kernel Version: {kernel_version}'
- 'Incident Identifier: {incident_identifier}'
- 'Process List: {process_list}'
- 'OS Version: {os_version}'
- 'Reason: {reason}'
short_message:
- 'Bug Type: {bug_type}'
- 'Crash Reporter_key: {crash_reporter_key}'
- 'Device Model: {device_model}'
- 'Incident Identifier: {incident_identifier}'
- 'OS Version: {os_version}'
- 'Reason: {reason}'
short_source: 'StacksIPS'
source: 'Apple Stacks IPS'
---
type: 'conditional'
data_type: 'imessage:event:chat'
Expand Down
6 changes: 6 additions & 0 deletions data/timeliner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ attributes_mapping:
description: 'Event Time'
place_holder_event: true
---
data_type: 'apple:ips_stacks'
attributes_mapping:
- name: 'event_time'
description: 'Event Time'
place_holder_event: true
---
data_type: 'av:defender:detection_history'
attribute_mappings:
- name: 'recorded_time'
Expand Down
1 change: 1 addition & 0 deletions plaso/parsers/ips_plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
"""Imports for the ips log parser plugins."""

from plaso.parsers.ips_plugins import recovery_logd
from plaso.parsers.ips_plugins import stacks_ips
70 changes: 70 additions & 0 deletions plaso/parsers/ips_plugins/interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# -*- coding: utf-8 -*-
"""Interface for IPS log file parser plugins."""

import abc
import pyparsing

from dfdatetime import time_elements as dfdatetime_time_elements

from plaso.parsers import plugins

Expand All @@ -11,9 +15,75 @@ class IPSPlugin(plugins.BasePlugin):
NAME = 'ips_plugin'
DATA_FORMAT = 'ips log file'

ENCODING = 'utf-8'

REQUIRED_HEADER_KEYS = []
REQUIRED_CONTENT_KEYS = []

_TWO_DIGITS = pyparsing.Word(pyparsing.nums, exact=2).setParseAction(
lambda tokens: int(tokens[0], 10))

_FOUR_DIGITS = pyparsing.Word(pyparsing.nums, exact=4).setParseAction(
lambda tokens: int(tokens[0], 10))

_VARYING_DIGITS = pyparsing.Word(pyparsing.nums)

TIMESTAMP_GRAMMAR = (
_FOUR_DIGITS.setResultsName('year') + pyparsing.Suppress('-') +
_TWO_DIGITS.setResultsName('month') + pyparsing.Suppress('-') +
_TWO_DIGITS.setResultsName('day') +
_TWO_DIGITS.setResultsName('hours') + pyparsing.Suppress(':') +
_TWO_DIGITS.setResultsName('minutes') + pyparsing.Suppress(':') +
_TWO_DIGITS.setResultsName('seconds') + pyparsing.Suppress('.') +
_VARYING_DIGITS.setResultsName('fraction') +
pyparsing.Word(
pyparsing.nums + '+' + '-').setResultsName('timezone_delta'))

def _ParseTimestampValue(self, parser_mediator, timestamp_text):
"""Parses a timestamp string.
Args:
parser_mediator (ParserMediator): parser mediator.
timestamp_text (str): the timestamp to parse.
Returns:
dfdatetime.TimeElements: date and time
or None if not available.
"""
# dfDateTime takes the time zone offset as number of minutes relative from
# UTC. So for Easter Standard Time (EST), which is UTC-5:00 the sign needs
# to be converted, to +300 minutes.

parsed_timestamp = self.TIMESTAMP_GRAMMAR.parseString(timestamp_text)

try:
time_delta_hours = int(parsed_timestamp['timezone_delta'][:3], 10)
time_delta_minutes = int(parsed_timestamp['timezone_delta'][3:], 10)
except (TypeError, ValueError):
parser_mediator.ProduceExtractionWarning(
'unsupported timezone offset value')
return None

time_zone_offset = (time_delta_hours * 60) + time_delta_minutes

try:
fraction_float = float(f"0.{parsed_timestamp['fraction']}")
milliseconds = round(fraction_float * 1000)

time_element_object = dfdatetime_time_elements.TimeElementsInMilliseconds(
time_elements_tuple=(
parsed_timestamp['year'], parsed_timestamp['month'],
parsed_timestamp['day'], parsed_timestamp['hours'],
parsed_timestamp['minutes'], parsed_timestamp['seconds'],
milliseconds),
time_zone_offset=time_zone_offset)

except (TypeError, ValueError):
parser_mediator.ProduceExtractionWarning('unsupported date time value')
return None

return time_element_object

def CheckRequiredKeys(self, ips_file):
"""Checks if the ips file's header and content have the keys required by
the plugin.
Expand Down
84 changes: 5 additions & 79 deletions plaso/parsers/ips_plugins/recovery_logd.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# -*- coding: utf-8 -*-
"""IPS file parser plugin for Apple crash recovery report."""

import pyparsing

from dfdatetime import time_elements as dfdatetime_time_elements

from plaso.containers import events
from plaso.parsers import ips_parser
from plaso.parsers.ips_plugins import interface
Expand All @@ -17,7 +13,7 @@ class AppleRecoveryLogdEvent(events.EventData):
application_name (str): name of the application.
application_version (str): version of the application.
bug_type (str): type of bug.
crash_reporter_key (str):
crash_reporter_key (str): Key of the crash reporter.
device_model (str): model of the device.
event_time (dfdatetime.DateTimeValues): date and time of the crash report.
exception_type (str): type of the exception that caused the crash.
Expand Down Expand Up @@ -54,82 +50,14 @@ def __init__(self):
class AppleRecoveryLogdIPSPlugin(interface.IPSPlugin):
"""Parses Apple Crash logs from IPS file."""

NAME = 'apple_crash_log'
DATA_FORMAT = 'IPS application crash log'

ENCODING = 'utf-8'
NAME = 'apple_recovery_ips'
DATA_FORMAT = 'IPS recovery logd crash log'

REQUIRED_HEADER_KEYS = [
'app_name', 'app_version', 'bug_type', 'incident_id', 'os_version',
'timestamp']
REQUIRED_CONTENT_KEYS = [
'captureTime', 'modelCode', 'os_version', 'pid', 'procLaunch', ]

_TWO_DIGITS = pyparsing.Word(pyparsing.nums, exact=2).setParseAction(
lambda tokens: int(tokens[0], 10))

_FOUR_DIGITS = pyparsing.Word(pyparsing.nums, exact=4).setParseAction(
lambda tokens: int(tokens[0], 10))

TIMESTAMP_GRAMMAR = (
_FOUR_DIGITS.setResultsName('year') + pyparsing.Suppress('-') +
_TWO_DIGITS.setResultsName('month') + pyparsing.Suppress('-') +
_TWO_DIGITS.setResultsName('day') +
_TWO_DIGITS.setResultsName('hours') + pyparsing.Suppress(':') +
_TWO_DIGITS.setResultsName('minutes') + pyparsing.Suppress(':') +
_TWO_DIGITS.setResultsName('seconds') + pyparsing.Suppress('.') +
_FOUR_DIGITS.setResultsName('fraction') +
pyparsing.Word(
pyparsing.nums + '+' + '-').setResultsName('timezone_delta'))

def __init__(self):
"""Initializes a text parser plugin."""
super(AppleRecoveryLogdIPSPlugin, self).__init__()
self._event_data = None

def _ParseTimestampValue(self, parser_mediator, timestamp_text):
"""Parses a timestamp string.
Args:
parser_mediator (ParserMediator): parser mediator.
timestamp_text (str): the timestamp to parse.
Returns:
dfdatetime.TimeElements: date and time
or None if not available.
"""
# dfDateTime takes the time zone offset as number of minutes relative from
# UTC. So for Easter Standard Time (EST), which is UTC-5:00 the sign needs
# to be converted, to +300 minutes.

parsed_timestamp = self.TIMESTAMP_GRAMMAR.parseString(timestamp_text)

try:
time_delta_hours = int(parsed_timestamp['timezone_delta'][:3], 10)
time_delta_minutes = int(parsed_timestamp['timezone_delta'][3:], 10)
except (TypeError, ValueError):
parser_mediator.ProduceExtractionWarning(
'unsupported timezone offset value')
return None

time_zone_offset = (time_delta_hours * -60) + time_delta_minutes

try:
milliseconds = round(parsed_timestamp['fraction']/10)

time_element_object = dfdatetime_time_elements.TimeElementsInMilliseconds(
time_elements_tuple=(
parsed_timestamp['year'], parsed_timestamp['month'],
parsed_timestamp['day'], parsed_timestamp['hours'],
parsed_timestamp['minutes'], parsed_timestamp['seconds'],
milliseconds),
time_zone_offset=time_zone_offset)

except (TypeError, ValueError):
parser_mediator.ProduceExtractionWarning('unsupported date time value')
return None

return time_element_object
'captureTime', 'modelCode', 'pid', 'procLaunch']

# pylint: disable=unused-argument
def Process(self, parser_mediator, ips_file=None, **unused_kwargs):
Expand All @@ -144,13 +72,11 @@ def Process(self, parser_mediator, ips_file=None, **unused_kwargs):
if ips_file is None:
raise ValueError('Missing ips_file value')

# This will raise if unhandled keyword arguments are passed.
super(AppleRecoveryLogdIPSPlugin, self).Process(parser_mediator)

event_data = AppleRecoveryLogdEvent()
event_data.application_name = ips_file.header.get('app_name')
event_data.application_version = ips_file.header.get('app_version')
event_data.bug_type = ips_file.header.get('bug_type')
event_data.crash_reporter_key = ips_file.content.get('crashReporterKey')
event_data.device_model = ips_file.content.get('modelCode')
event_data.exception_type = ips_file.content.get(
'exception', {}).get('type')
Expand Down
87 changes: 87 additions & 0 deletions plaso/parsers/ips_plugins/stacks_ips.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
"""IPS file parser plugin for Apple stacks report."""

from plaso.containers import events
from plaso.parsers import ips_parser
from plaso.parsers.ips_plugins import interface


class AppleStacksIPSEvent(events.EventData):
"""Apple stacks crash report.
Attributes:
bug_type (str): type of bug.
crash_reporter_key (str): Key of the crash reporter.
device_model (str): model of the device.
event_time (dfdatetime.DateTimeValues): date and time of the crash report.
kernel_version (str): kernel version.
incident_identifier (str): uuid for crash.
process_list (str): list of process names running at the time of the crash.
os_version (str): version of the operating system.
reason (str): reason for the crash.
"""

DATA_TYPE = 'apple:ips_stacks'

def __init__(self):
"""Initializes event data."""
super(AppleStacksIPSEvent, self).__init__(data_type=self.DATA_TYPE)
self.bug_type = None
self.crash_reporter_key = None
self.device_model = None
self.event_time = None
self.kernel_version = None
self.incident_identifier = None
self.process_list = None
self.os_version = None
self.reason = None


class AppleStacksIPSPlugin(interface.IPSPlugin):
"""Parses Apple stacks crash IPS file."""

NAME = 'apple_stacks_ips'
DATA_FORMAT = 'IPS stacks crash log'

REQUIRED_HEADER_KEYS = ['bug_type', 'incident_id', 'os_version', 'timestamp']
REQUIRED_CONTENT_KEYS = [
'build', 'crashReporterKey', 'kernel', 'product', 'reason']

# pylint: disable=unused-argument
def Process(self, parser_mediator, ips_file=None, **unused_kwargs):
"""Extracts information from an IPS log file.
This is the main method that an IPS plugin needs to implement.
Args:
parser_mediator (ParserMediator): parser mediator.
ips_file (Optional[IpsFile]): database.
Raises:
ValueError: If the file value is missing.
"""
if ips_file is None:
raise ValueError('Missing ips_file value')

event_data = AppleStacksIPSEvent()
event_data.bug_type = ips_file.header.get('bug_type')
event_data.crash_reporter_key = ips_file.content.get('crashReporterKey')
event_data.device_model = ips_file.content.get('product')
event_data.kernel_version = ips_file.content.get('kernel')
event_data.incident_identifier = ips_file.header.get('incident_id')
event_data.os_version = ips_file.header.get('os_version')
event_data.reason = ips_file.content.get('reason')

process_list = [
i.get('procname') for i in ips_file.content.get(
'processByPid').values()]
process_list = list(set(process_list))
process_list.sort()

event_data.process_list = ', '.join(process_list)

event_timestamp = ips_file.header.get('timestamp')
event_data.event_time = self._ParseTimestampValue(
parser_mediator, event_timestamp)

parser_mediator.ProduceEventData(event_data)


ips_parser.IPSParser.RegisterPlugin(AppleStacksIPSPlugin)
Loading

0 comments on commit cf67c35

Please sign in to comment.