Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 180 additions & 13 deletions Packs/AzureSentinel/Integrations/AzureSentinel/AzureSentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,12 +1475,109 @@ def fetch_incidents_additional_info(client: AzureSentinelClient, incidents: List
incident[info_type] = client.http_request(method, f"incidents/{incident_id}/{info_type}").get(results_key)


def fetch_incidents_lookback(
Comment thread
noydavidi marked this conversation as resolved.
client: AzureSentinelClient,
lookback_start_time: str,
min_severity: str,
statuses_to_fetch: list,
) -> list:
"""Fetch incidents that were modified within the lookback window.

This catches incidents whose severity escalated after initial creation,
which would have been missed by the regular createdTimeUtc-based fetch.

Args:
client: The Azure Sentinel client.
lookback_start_time: The start time of the lookback window.
min_severity: Minimum severity to filter by.
statuses_to_fetch: List of statuses to filter by.

Returns:
List of incidents from the lookback window.
"""
demisto.debug(f"Lookback: querying incidents modified since {lookback_start_time}")

command_args = {
"filter": (
f"properties/lastModifiedTimeUtc ge {lookback_start_time}"
f" {severity_filter(min_severity)}"
f" {status_filter(statuses_to_fetch)}".strip()
),
"orderby": "properties/lastModifiedTimeUtc asc",
Comment thread
noydavidi marked this conversation as resolved.
}
demisto.debug(f"Lookback filter query: {command_args['filter']}")

raw_incidents = list_incidents_command(client, command_args, is_fetch_incidents=True).outputs or []
if isinstance(raw_incidents, dict):
raw_incidents = [raw_incidents]

demisto.debug(f"Lookback: found {len(raw_incidents)} incidents")
return raw_incidents


def dedup_lookback_incidents(
lookback_incidents: list,
previous_lookback_ids: dict,
incidents_ids_from_fetch: list,
look_back: int,
) -> tuple[list, dict]:
"""Dedup lookback incidents and return only new ones.

1. Compare new lookback incident IDs against the previous lookback IDs
(from previous cycles) to find only the truly new ones.
2. From those, remove any that were already fetched by the regular fetch mechanism.

Args:
lookback_incidents: Incidents from the lookback query.
previous_lookback_ids: {id: lastModifiedTimeUtc} from previous cycles.
incidents_ids_from_fetch: IDs already fetched by the regular fetch.
look_back: Lookback time in minutes. IDs older than 2x this value are removed.

Returns:
(deduped_incidents, updated {id: lastModifiedTimeUtc} for next_run).
"""
# Calculate expiry threshold: 2x the lookback window
expiry_threshold = (datetime.now(tz=timezone.utc) - timedelta(minutes=look_back * 2)).strftime(DATE_FORMAT)
demisto.debug(f"Lookback dedup: expiry threshold is {expiry_threshold} (2x {look_back} minutes)")

# Remove expired IDs that are older than 2x the lookback window
active_ids = {
inc_id: modified_time for inc_id, modified_time in previous_lookback_ids.items() if modified_time >= expiry_threshold
}
demisto.debug(f"Lookback dedup: removed {len(previous_lookback_ids) - len(active_ids)} expired IDs")

# Remove incidents already ingested in previous lookback cycles
new_lookback_incidents = [inc for inc in lookback_incidents if inc.get("ID") not in active_ids]
demisto.debug(f"Lookback dedup - after previous-cycle dedup: {len(lookback_incidents)} → {len(new_lookback_incidents)}")

# Remove incidents already in the regular fetch
incidents_ids_from_fetch_set = set(incidents_ids_from_fetch)
deduped_incidents = [inc for inc in new_lookback_incidents if inc.get("ID") not in incidents_ids_from_fetch_set]
demisto.debug(f"Lookback dedup - after regular-fetch dedup: {len(new_lookback_incidents)} → {len(deduped_incidents)}")

# Build updated lookback IDs: start with active previous IDs
updated_lookback_ids = dict(active_ids)

# Add newly ingested lookback incidents
for incident in deduped_incidents:
updated_lookback_ids[incident.get("ID")] = incident.get("LastModifiedTimeUTC")

# Also add incidents that were skipped because they were already in the regular fetch,
# so they won't be re-ingested in future cycles when they may no longer appear in the regular fetch
for incident in new_lookback_incidents:
if incident.get("ID") in incidents_ids_from_fetch_set:
updated_lookback_ids[incident.get("ID")] = incident.get("LastModifiedTimeUTC")

return deduped_incidents, updated_lookback_ids


def fetch_incidents(
client: AzureSentinelClient,
last_run: dict,
first_fetch_time: str,
min_severity: str,
statuses_to_fetch: list = [],
look_back: int = 0,
) -> tuple:
"""Fetching incidents.
Args:
Expand All @@ -1489,6 +1586,8 @@ def fetch_incidents(
last_run: An dictionary of the last run.
min_severity: A minimum severity of incidents to fetch.
statuses_to_fetch: A list of statuses to fetch.
look_back: Lookback time in minutes. When > 0, also fetches incidents
modified within this window to catch severity escalations.

Returns:
(tuple): 1. The LastRun object updated with the last run details.
Expand Down Expand Up @@ -1547,16 +1646,64 @@ def fetch_incidents(
raw_incidents = list(filter(lambda incident: incident["ID"] not in last_fetch_ids, raw_incidents))
demisto.debug(f"raw incidents id after dedup: {[incident['ID'] for incident in raw_incidents]}")

# Lookback mechanism based on fetching incidents by their modified time within the lookback window
lookback_deduped_incidents: list = []
current_lookback_ids: dict = {}
if look_back > 0:
demisto.debug(f"Lookback enabled with {look_back} minutes")

# Calculate the lookback start time
lookback_start_time, _ = get_fetch_run_time_range(
last_run={"time": last_run.get("last_fetch_time")},
first_fetch=first_fetch_time,
look_back=look_back,
date_format=DATE_FORMAT,
)

lookback_incidents = fetch_incidents_lookback(
client=client,
lookback_start_time=lookback_start_time,
min_severity=min_severity,
statuses_to_fetch=statuses_to_fetch,
)

# Dedup lookback incidents using the lookback incidents from loop before and the fetched incidents
incidents_ids_from_fetch = [incident["ID"] for incident in raw_incidents]
previous_lookback_ids = last_run.get("lookback_fetch_ids", {})

lookback_deduped_incidents, current_lookback_ids = dedup_lookback_incidents(
lookback_incidents=lookback_incidents,
previous_lookback_ids=previous_lookback_ids,
incidents_ids_from_fetch=incidents_ids_from_fetch,
look_back=look_back,
)

demisto.debug(f"Lookback: {len(lookback_deduped_incidents)} new incidents to ingest")

fetch_incidents_additional_info(client, raw_incidents)

return process_incidents(raw_incidents, latest_created_time, last_incident_number) # type: ignore[attr-defined]
next_run, incidents = process_incidents(
raw_incidents,
latest_created_time,
last_incident_number,
current_lookback_ids,
)

# Fetch additional info for lookback incidents
fetch_incidents_additional_info(client, lookback_deduped_incidents)

for incident in lookback_deduped_incidents:
incidents.append(convert_incident_to_fetch_format(incident))

return next_run, incidents


def fetch_incidents_command(client, params):
# How much time before the first fetch to retrieve incidents
first_fetch_time = params.get("fetch_time", "3 days").strip()
min_severity = params.get("min_severity", "Informational")
statuses_to_fetch = argToList(params.get("statuses_to_fetch", []))
look_back = arg_to_number(params.get("look_back")) or 0
# Set and define the fetch incidents command to run after activated via integration settings.
last_run = demisto.getLastRun()
demisto.debug(f"Current last run is {last_run}")
Expand All @@ -1566,18 +1713,43 @@ def fetch_incidents_command(client, params):
first_fetch_time=first_fetch_time,
min_severity=min_severity,
statuses_to_fetch=statuses_to_fetch,
look_back=look_back,
)
demisto.debug(f"New last run is {last_run}")
demisto.debug(f"New last run is {next_run}")
demisto.setLastRun(next_run)
demisto.incidents(incidents)


def process_incidents(raw_incidents: list, latest_created_time: datetime, last_incident_number):
def convert_incident_to_fetch_format(incident: dict) -> dict:
"""Convert a raw Sentinel incident to the format expected by the fetch mechanism.

Args:
incident: A raw incident dict from incident_data_to_xsoar_format.

Returns:
A dict with name, occurred, severity, and rawJSON fields.
"""
add_mirroring_fields(incident)
return {
"name": "[Azure Sentinel] " + (incident.get("Title") or ""),
"occurred": incident.get("CreatedTimeUTC"),
"severity": severity_to_level(incident.get("Severity")),
"rawJSON": json.dumps(incident),
}


def process_incidents(
raw_incidents: list,
latest_created_time: datetime,
last_incident_number,
lookback_fetch_ids: dict,
):
"""Processing the raw incidents
Args:
raw_incidents: The incidents that were fetched from the API.
last_incident_number: The last incident number that was fetched.
latest_created_time: The latest created time.
last_incident_number: The last incident number that was fetched.
lookback_fetch_ids: IDs from the current lookback cycle to store in next_run.

Returns:
A next_run dictionary, and an array of incidents.
Expand All @@ -1594,29 +1766,24 @@ def process_incidents(raw_incidents: list, latest_created_time: datetime, last_i

incident_created_time = dateparser.parse(incident.get("CreatedTimeUTC"))
current_fetch_ids.append(incident.get("ID"))
add_mirroring_fields(incident)
xsoar_incident = {
"name": "[Azure Sentinel] " + incident.get("Title"),
"occurred": incident.get("CreatedTimeUTC"),
"severity": incident_severity,
"rawJSON": json.dumps(incident),
}

# Update last run to the latest fetch time
if incident_created_time is None:
raise DemistoException(f"{incident.get('CreatedTimeUTC')=} couldn't be parsed")

incidents.append(xsoar_incident)
incidents.append(convert_incident_to_fetch_format(incident))

if incident_created_time > latest_created_time:
latest_created_time = incident_created_time
if incident.get("IncidentNumber") > last_incident_number:
last_incident_number = incident.get("IncidentNumber")
next_run = {
next_run: dict = {
"last_fetch_time": latest_created_time.strftime(DATE_FORMAT),
"last_fetch_ids": current_fetch_ids,
"last_incident_number": last_incident_number,
"lookback_fetch_ids": lookback_fetch_ids or {},
}

return next_run, incidents


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ configuration:
type: 19
section: Collect
advanced: true
- defaultvalue: 0
display: Minutes to look back when fetching
additionalinfo: Use this parameter to determine how long backward to look in the search for incidents that were created before the last run time and did not match the query when they were created. It is recommended to use a small value (e.g., 1-5 minutes) to avoid performance issues.
name: look_back
type: 0
advanced: true
required: false
section: Collect
description: "Microsoft Sentinel is a scalable, cloud-native solution that provides: Security information and event management (SIEM) Security orchestration, automation, and response (SOAR)."
display: Microsoft Sentinel
name: Azure Sentinel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ If you have a dedicated server URL, enter it in the *Server Url* parameter.

## Get the Additional Instance Parameters

To get the *Subscription ID*, *Workspace Name*, and *Resource Group* parameters, in the Azure Portal navigate to **Azure Sentinel** > your workspace > **Settings** and click the **Workspace Settings** tab.
To get the *Subscription ID*, *Workspace Name*, and *Resource Group* parameters, in the Azure Portal navigate to **Azure Sentinel** > your workspace > **Settings** and click the **Workspace Settings** tab.

## Lookback Parameter Notes
* Increasing the **look-back** parameter value after incidents have already been fetched may cause duplicate incidents during the next fetch. To avoid duplicates, do not increase the look-back value once set.
* Large look-back values can increase memory usage and API calls. We recommend using a small value (e.g., 1-5 minutes).
Loading
Loading