Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions Packs/Pwned/Integrations/PwnedV2/PwnedV2.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,153 @@ def pwned_username(username_list):
return api_res_list


def pwned_breaches_for_domain_list_command(args_dict: dict) -> tuple[list, list, list]:
"""Get all breached email addresses for one or more domains.

API endpoint: GET /breachedDomain/{domain}
The API returns a JSON object where keys are email aliases and values are
lists of breach names, e.g. {"alias1": ["Adobe"], "alias2": ["Adobe", "Gawker"]}.

On HTTP 404 the domain has no breached email addresses.

Args:
args_dict: demisto.args() dictionary. Expected key: ``domain`` (comma-separated list).

Returns:
Tuple of (markdown_list, entry_context_list, api_response_list).
"""
domain_list: list[str] = argToList(args_dict.get("domain", ""))

markdown_list: list[str] = []
entry_context_list: list[dict] = []
api_response_list: list = []

for domain in domain_list:
api_res = http_request("GET", f"breachedDomain/{domain}")

if api_res is None:
markdown_list.append(f"### Breaches for domain: *{domain}*\nThe domain does not have any email addresses")
entry_context_list.append({})
api_response_list.append(None)
continue

table_data = [
{"Domain": domain, "Account": alias, "Breaches": ", ".join(breaches)} for alias, breaches in api_res.items()
]
md = tableToMarkdown(f"Breaches for domain: {domain}", table_data, ["Domain", "Account", "Breaches"])

ec: dict = {"Domain.Pwned-V2.Breaches": api_res}

markdown_list.append(md)
entry_context_list.append(ec)
api_response_list.append(api_res)

return markdown_list, entry_context_list, api_response_list


def pwned_subscribed_domains_list_command(args_dict: dict) -> tuple[list, list, list]:
"""Get the list of subscribed (verified) domains for the API key owner.

API endpoint: GET /subscribedDomains
No input arguments required.

Returns:
Tuple of (markdown_list, entry_context_list, api_response_list).
"""
api_res = http_request("GET", "subscribedDomains")

if api_res is None:
return ["No subscribed domains found."], [{}], [None]

hr_headers = [
"DomainName",
"PwnCount",
"PwnCountExcludingSpamLists",
"PwnCountExcludingSpamListsAtLastSubscriptionRenewal",
"NextSubscriptionRenewal",
]
md = tableToMarkdown("Subscribed Domains", api_res, hr_headers)

ec: dict = {"Pwned-V2.SubscribedDomain": api_res}

return [md], [ec], [api_res]


def pwned_latest_breach_get_command(args_dict: dict) -> tuple[list, list, list]:
"""Get the most recently added breach.

API endpoint: GET /latestBreach
No input arguments required.

The response is a single breach object (same schema as domain/pwned-domain commands).
Should reuse ``domain_to_entry_context()`` for context and dBotScore handling.

Returns:
Tuple of (markdown_list, entry_context_list, api_response_list).
"""
api_res = http_request("GET", "latestBreach")

if api_res is None:
return ["No latest breach found."], [{}], [None]

table_data = [
{
"Latest breach domain name": api_res.get("Domain", ""),
"Breach Date": api_res.get("BreachDate", ""),
"Added Date": api_res.get("AddedDate", ""),
"Pwn Count": api_res.get("PwnCount", ""),
}
]
md = tableToMarkdown("Latest Breach", table_data, ["Latest breach domain name", "Breach Date", "Added Date", "Pwn Count"])

domain = api_res.get("Domain", "")
ec = domain_to_entry_context(domain, [api_res])
Comment thread
TheL0L marked this conversation as resolved.
# append api response to populated context template
ec[outputPaths["domain"]]["Pwned-V2"].update(api_res)

return [md], [ec], [api_res]


def pwned_breach_get_command(args_dict: dict) -> tuple[list, list, list]:
"""Get a single breached site by its breach name.

API endpoint: GET /breach/{breach_name}
The response is a single breach object (same schema as domain/pwned-domain commands).
Should reuse ``domain_to_entry_context()`` for context and dBotScore handling.

Args:
args_dict: demisto.args() dictionary. Expected key: ``breach_name``.

Returns:
Tuple of (markdown_list, entry_context_list, api_response_list).
"""
breach_name: str = args_dict.get("breach_name", "")

api_res = http_request("GET", f"breach/{breach_name}")

if api_res is None:
return [f"No breach found for name: {breach_name}"], [{}], [None]

table_data = [
{
"Latest breach domain name": api_res.get("Domain", ""),
"Breach Date": api_res.get("BreachDate", ""),
"Added Date": api_res.get("AddedDate", ""),
"Pwn Count": api_res.get("PwnCount", ""),
}
]
md = tableToMarkdown(
f"Breach: {breach_name}", table_data, ["Latest breach domain name", "Breach Date", "Added Date", "Pwn Count"]
)

domain = api_res.get("Domain", "")
ec = domain_to_entry_context(domain, [api_res])
Comment thread
TheL0L marked this conversation as resolved.
# append api response to populated context template
ec[outputPaths["domain"]]["Pwned-V2"].update(api_res)

return [md], [ec], [api_res]


def main(): # pragma: no cover
if not API_KEY:
raise DemistoException("API key must be provided.")
Expand All @@ -334,6 +481,10 @@ def main(): # pragma: no cover
"domain": pwned_domain_command,
"pwned-domain": pwned_domain_command,
"pwned-username": pwned_username_command,
"pwned-breaches-for-domain-list": pwned_breaches_for_domain_list_command,
"pwned-subscribed-domains-list": pwned_subscribed_domains_list_command,
"pwned-latest-breach-get": pwned_latest_breach_get_command,
"pwned-breach-get": pwned_breach_get_command,
}
if command in commands:
md_list, ec_list, api_email_res_list = commands[command](demisto.args())
Expand Down
Loading
Loading