Skip to content

Commit

Permalink
silentpush-forward-padns-lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
yash-metron committed Jan 31, 2025
1 parent c809809 commit 78dfb34
Show file tree
Hide file tree
Showing 2 changed files with 336 additions and 4 deletions.
191 changes: 190 additions & 1 deletion Packs/SilentPush/Integrations/SilentPush/SilentPush.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ASN_REPUTATION = "explore/ipreputation/history/asn"
ASN_TAKEDOWN_REPUTATION = "explore/takedownreputation/asn"
IPV4_REPUTATION = "explore/ipreputation/history/ipv4"
FORWARD_PADNS = "explore/padns/lookup/query"

''' COMMANDS INPUTS '''

Expand Down Expand Up @@ -198,6 +199,47 @@
InputArgument(name='limit',
description='The maximum number of reputation history to retrieve')
]
FORWARD_PADNS_INPUTS = [
InputArgument(name='qtype',
description='DNS record type',
required=True),
InputArgument(name='qname',
description='The DNS record name to lookup',
required=True),
InputArgument(name='netmask',
description='The netmask to filter the lookup results.'),
InputArgument(name='subdomains',
description='Flag to include subdomains in the lookup results.'),
InputArgument(name='regex',
description='Regular expression to filter the DNS records.'),
InputArgument(name='match',
description='Type of match for the query (e.g., exact, partial).'),
InputArgument(name='first_seen_after',
description='Filter results to include only records first seen after this date.'),
InputArgument(name='first_seen_before',
description='Filter results to include only records first seen before this date.'),
InputArgument(name='last_seen_after',
description='Filter results to include only records last seen after this date.'),
InputArgument(name='last_seen_before',
description='Filter results to include only records last seen before this date.'),
InputArgument(name='as_of',
description='Date or time to get the DNS records as of a specific point in time.'),
InputArgument(name='sort',
description='Sort the results by the specified field (e.g., date, score).'),
InputArgument(name='output_format',
description='The format in which the results should be returned (e.g., JSON, XML).'),
InputArgument(name='prefer',
description='Preference for specific DNS servers or sources.'),
InputArgument(name='with_metadata',
description='Flag to include metadata in the DNS records.'),
InputArgument(name='max_wait',
description='Maximum number of seconds to wait for results before timing out.'),
InputArgument(name='skip',
description='Number of results to skip for pagination purposes.'),
InputArgument(name='limit',
description='Maximum number of results to return.')
]




Expand Down Expand Up @@ -432,6 +474,39 @@
OutputArgument(name='IP', output_type=str, description='IPv4 address for which the reputation is calculated.'),
OutputArgument(name='Reputation.Score', output_type=int, description='Reputation score for the given IP address.')
]
FORWARD_PADNS_OUTPUTS = [
OutputArgument(name='qname',
output_type=str,
description='The DNS record name that was looked up.'),
OutputArgument(name='qtype',
output_type=str,
description='The DNS record type queried (e.g., NS).'),
OutputArgument(name='records.answer',
output_type=str,
description='The answer (e.g., name server) for the DNS record.'),
OutputArgument(name='records.count',
output_type=int,
description='The number of occurrences for this DNS record.'),
OutputArgument(name='records.first_seen',
output_type=str,
description='The timestamp when this DNS record was first seen.'),
OutputArgument(name='records.last_seen',
output_type=str,
description='The timestamp when this DNS record was last seen.'),
OutputArgument(name='records.nshash',
output_type=str,
description='Unique hash for the DNS record.'),
OutputArgument(name='records.query',
output_type=str,
description='The DNS record query name (e.g., silentpush.com).'),
OutputArgument(name='records.ttl',
output_type=int,
description='Time to live (TTL) value for the DNS record.'),
OutputArgument(name='records.type',
output_type=str,
description='The type of the DNS record (e.g., NS).')
]




Expand Down Expand Up @@ -1061,6 +1136,28 @@ def get_ipv4_reputation(self, ipv4: str, explain: bool = False, limit: int = Non
ipv4_reputation = raw_response.get('response', {}).get('ip_reputation_history', [])
return ipv4_reputation

def forward_padns_lookup(self, qtype: str, qname: str, **kwargs) -> Dict[str, Any]:
"""
Perform a forward PADNS lookup using various filtering parameters.
Args:
qtype (str): Type of DNS record.
qname (str): The DNS record name to lookup.
**kwargs: Optional parameters for filtering and pagination.
Returns:
Dict[str, Any]: PADNS lookup results.
"""
url_suffix = f"{FORWARD_PADNS}/{qtype}/{qname}"

params = filter_none_values(kwargs)

return self._http_request(
method="GET",
url_suffix=url_suffix,
params=params
)


''' HELPER FUNCTIONS '''
def filter_none_values(params: Dict[str, Any]) -> Dict[str, Any]:
Expand Down Expand Up @@ -2056,7 +2153,7 @@ def get_asn_takedown_reputation_command(client: Client, args: dict) -> CommandRe
@metadata_collector.command(
command_name="silentpush-get-ipv4-reputation",
inputs_list=IPV4_REPUTATION_INPUTS,
outputs_prefix="SilentPush.",
outputs_prefix="SilentPush.IPv4Reputation",
outputs_list=IPV4_REPUTATION_OUTPUTS,
description="This command retrieve the reputation information for an IPv4."
)
Expand Down Expand Up @@ -2114,6 +2211,95 @@ def get_ipv4_reputation_command(client: Client, args: Dict[str, Any]) -> Command
raw_response=raw_response
)

@metadata_collector.command(
command_name="silentpush-forward-padns-lookup",
inputs_list=FORWARD_PADNS_INPUTS,
outputs_prefix="SilentPush.PADNSLookup",
outputs_list=FORWARD_PADNS_OUTPUTS,
description="This command Performs a forward PADNS lookup using various filtering parameters."
)
def forward_padns_lookup_command(client: Client, args: dict) -> CommandResults:
"""
Command function to perform a forward PADNS lookup.
Args:
client (Client): The SilentPush API client.
args (dict): The command arguments containing lookup parameters.
Returns:
CommandResults: The formatted results of the PADNS lookup or an error message if something goes wrong.
"""
qtype = args.get('qtype')
qname = args.get('qname')

# Validate required parameters
if not qtype or not qname:
raise DemistoException("Both 'qtype' and 'qname' are required parameters.")

# Optional parameters
netmask = args.get('netmask')
subdomains = argToBoolean(args.get('subdomains')) if 'subdomains' in args else None
regex = args.get('regex')
match = args.get('match')
first_seen_after = args.get('first_seen_after')
first_seen_before = args.get('first_seen_before')
last_seen_after = args.get('last_seen_after')
last_seen_before = args.get('last_seen_before')
as_of = args.get('as_of')
sort = args.get('sort')
output_format = args.get('output_format')
prefer = args.get('prefer')
with_metadata = argToBoolean(args.get('with_metadata')) if 'with_metadata' in args else None
max_wait = arg_to_number(args.get('max_wait'))
skip = arg_to_number(args.get('skip'))
limit = arg_to_number(args.get('limit'))

# Perform the PADNS lookup
raw_response = client.forward_padns_lookup(
qtype=qtype,
qname=qname,
netmask=netmask,
subdomains=subdomains,
regex=regex,
match=match,
first_seen_after=first_seen_after,
first_seen_before=first_seen_before,
last_seen_after=last_seen_after,
last_seen_before=last_seen_before,
as_of=as_of,
sort=sort,
output_format=output_format,
prefer=prefer,
with_metadata=with_metadata,
max_wait=max_wait,
skip=skip,
limit=limit
)

records = raw_response.get('response', {}).get('records', [])

if not records:
readable_output = f"No records found for {qtype} {qname}"
else:
readable_output = tableToMarkdown(
f"PADNS Lookup Results for {qtype} {qname}",
records,
removeNull=True
)

return CommandResults(
outputs_prefix='SilentPush.PADNSLookup',
outputs_key_field='qname',
outputs={
'qtype': qtype,
'qname': qname,
'records': records
},
readable_output=readable_output,
raw_response=raw_response
)



''' MAIN FUNCTION '''

Expand Down Expand Up @@ -2184,6 +2370,9 @@ def main() -> None:

elif demisto.command() == 'silentpush-get-ipv4-reputation':
return_results(get_ipv4_reputation_command(client, demisto.args()))

elif demisto.command() == 'silentpush-forward-padns-lookup':
return_results(forward_padns_lookup_command(client, demisto.args()))

except Exception as e:
demisto.error(traceback.format_exc()) # print the traceback
Expand Down
Loading

0 comments on commit 78dfb34

Please sign in to comment.