diff --git a/Packs/SilentPush/Integrations/SilentPush/SilentPush.py b/Packs/SilentPush/Integrations/SilentPush/SilentPush.py index fc56db7e38c2..6067119ea0b9 100644 --- a/Packs/SilentPush/Integrations/SilentPush/SilentPush.py +++ b/Packs/SilentPush/Integrations/SilentPush/SilentPush.py @@ -1,36 +1,458 @@ -import demistomock as demisto # noqa: F401 -from CommonServerPython import * # noqa: F401 -"""Base Integration for Cortex XSOAR (aka Demisto) - -This is an integration to interact with the SilentPush API and provide functionality within XSOAR. - -Developer Documentation: https://xsoar.pan.dev/docs/welcome -Code Conventions: https://xsoar.pan.dev/docs/integrations/code-conventions -Linting: https://xsoar.pan.dev/docs/integrations/linting -""" - -from CommonServerUserPython import * # noqa +import ipaddress +import demistomock as demisto +from CommonServerPython import * +from CommonServerUserPython import * +import enum +import json import urllib3 -from typing import Any +import dateparser +import traceback +from typing import Any, Dict, List, Optional, Union, Tuple # Disable insecure warnings urllib3.disable_warnings() +''' CONSTANTS ''' + +DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' -def mock_debug(message): - """Print debug messages to the XSOAR logs""" - print(f"DEBUG: {message}") +# API ENDPOINTS +JOB_STATUS = "explore/job" +NAMESERVER_REPUTATION = "explore/nsreputation/nameserver" +SUBNET_REPUTATION = "explore/ipreputation/history/subnet" +ASNS_DOMAIN = "explore/padns/lookup/domain/asns" +DENSITY_LOOKUP = "explore/padns/lookup/density" +SEARCH_DOMAIN = "explore/domain/search" +DOMAIN_INFRATAGS = "explore/bulk/domain/infratags" +DOMAIN_INFO = "explore/bulk/domaininfo" +RISK_SCORE = "explore/bulk/domain/riskscore" +WHOIS = "explore/domain/whois" +DOMAIN_CERTIFICATE = "explore/domain/certificates" +ENRICHMENT = "explore/enrich" +LIST_IP = "explore/bulk/ip2asn" +ASN_REPUTATION = "explore/ipreputation/history/asn" +''' COMMANDS INPUTS ''' + +JOB_STATUS_INPUTS = [ + InputArgument(name='job_id', # option 1 + description='ID of the job returned by Silent Push actions.', + required=True), + InputArgument(name='max_wait', + description='Number of seconds to wait for results (0-25 seconds).'), + InputArgument(name='result_type', + description='Type of result to include in the response.') + ] +NAMESERVER_REPUTATION_INPUTS = [ + InputArgument(name='nameserver', + description='Nameserver name for which information needs to be retrieved', + required=True), + InputArgument(name='explain', + description='Show the information used to calculate the reputation score'), + InputArgument(name='limit', + description='The maximum number of reputation history to retrieve') + ] +SUBNET_REPUTATION_INPUTS = [ + InputArgument( + name='subnet', + description='IPv4 subnet for which reputation information needs to be retrieved.', + required=True + ), + InputArgument( + name='explain', + description='Show the detailed information used to calculate the reputation score.' + ), + InputArgument( + name='limit', + description='Maximum number of reputation history entries to retrieve.' + ) + ] +ASNS_DOMAIN_INPUTS = [ + InputArgument(name='domain', # option 1 + description='Domain name to search ASNs for. Retrieves ASNs associated with a records for the specified domain and its subdomains in the last 30 days.', + required=True) + ] +DENSITY_LOOKUP_INPUTS = [ + InputArgument(name='qtype', + description='Query type.', + required=True), + InputArgument(name='query', + description='Value to query.', + required=True), + InputArgument(name='scope', + description='Match level (optional).') + ] +SEARCH_DOMAIN_INPUTS = [ + InputArgument(name='domain', + description='Name or wildcard pattern of domain names to search for.'), + InputArgument(name='domain_regex', + description='A valid RE2 regex pattern to match domains. Overrides the domain argument.'), + InputArgument(name='name_server', + description='Name server name or wildcard pattern of the name server used by domains.'), + InputArgument(name='asnum', + description='Autonomous System (AS) number to filter domains.'), + InputArgument(name='asname', + description='Search for all AS numbers where the AS Name begins with the specified value.'), + InputArgument(name='min_ip_diversity', + description='Minimum IP diversity limit to filter domains.'), + InputArgument(name='registrar', + description='Name or partial name of the registrar used to register domains.'), + InputArgument(name='min_asn_diversity', + description='Minimum ASN diversity limit to filter domains.'), + InputArgument(name='certificate_issuer', + description='Filter domains that had SSL certificates issued by the specified certificate issuer. Wildcards supported.'), + InputArgument(name='whois_date_after', + description='Filter domains with a WHOIS creation date after this date (YYYY-MM-DD).'), + InputArgument(name='skip', + description='Number of results to skip in the search query.'), + InputArgument(name='limit', + description='Number of results to return. Defaults to the SilentPush API\'s behavior.') + ] +DOMAIN_INFRATAGS_INPUTS = [ + InputArgument(name='domains', + description='Comma-separated list of domains.', + required=True), + InputArgument(name='cluster', + description='Whether to cluster the results.'), + InputArgument(name='mode', + description='Mode for lookup (live/padns). Defaults to "live".', + default='live'), + InputArgument(name='match', + description='Handling of self-hosted infrastructure. Defaults to "self".', + default='self') + ] +LIST_DOMAIN_INPUTS = [ + InputArgument(name='domains', + description='Comma-separated list of domains to query.', + required=True), + InputArgument(name='fetch_risk_score', + description='Whether to fetch risk scores for the domains.', + required=False), + InputArgument(name='fetch_whois_info', + description='Whether to fetch WHOIS information for the domains.', + required=False) + ] +DOMAIN_CERTIFICATE_INPUTS = [ + InputArgument(name='domain', + description='The domain to query certificates for.', + required=True), + InputArgument(name='domain_regex', + description='Regular expression to match domains.'), + InputArgument(name='certificate_issuer', + description='Filter by certificate issuer.'), + InputArgument(name='date_min', + description='Filter certificates issued on or after this date.'), + InputArgument(name='date_max', + description='Filter certificates issued on or before this date.') + ] +ENRICHMENT_INPUTS = [ + InputArgument(name='resource', + description='The resource to query (domain/IP).', + required=True), + InputArgument(name='value', + description='Type of resource (domain/ipv4/ipv6).', + required=True), + InputArgument(name='explain', + description='Include explanation of data calculations.'), + InputArgument(name='scan_data', + description='Include scan data (IPv4 only).') + ] +LIST_IP_INPUTS = [ + InputArgument(name='ips', + description='Comma-separated list of IP addresses.', + required=True), + InputArgument(name='explain', + description='Include explanation of calculations.'), + InputArgument(name='scan_data', + description='Include scan data (IPv4 only).'), + InputArgument(name='sparse', + description='Specific data to return (asn/asname/sp_risk_score).') + ] +ASN_REPUTATION_INPUTS = [ + InputArgument(name='asn', + description='The ASN to lookup.', + required=True), + InputArgument(name='explain', + description='Show the information used to calculate the reputation score.'), + InputArgument(name='limit', + description='The maximum number of reputation history records to retrieve.') + ] -demisto.debug = mock_debug -''' CONSTANTS ''' -DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ' # ISO8601 format with UTC, default in XSOAR -''' CLIENT CLASS ''' +''' COMMANDS OUTPUTS ''' + +JOB_STATUS_OUTPUTS = [ + OutputArgument(name='get', output_type=str, description='URL to retrieve the job status.'), + OutputArgument(name='job_id', output_type=str, description='Unique identifier for the job.'), + OutputArgument(name='status', output_type=str, description='Current status of the job.') + ] +NAMESERVER_REPUTATION_OUTPUTS = [ + OutputArgument(name='date', output_type=int, description='Date of the reputation history entry (in YYYYMMDD format).'), + OutputArgument(name='ns_server', output_type=str, description='Name of the nameserver associated with the reputation history entry.'), + OutputArgument(name='ns_server_reputation', output_type=int, description='Reputation score of the nameserver on the specified date.'), + OutputArgument(name='ns_server_reputation_explain', output_type=dict, description='Explanation of the reputation score, including domain density and listed domains.'), + OutputArgument(name='ns_server_domain_density', output_type=int, description='Number of domains associated with the nameserver.'), + OutputArgument(name='ns_server_domains_listed', output_type=int, description='Number of domains listed in reputation databases.') + ] +SUBNET_REPUTATION_OUTPUTS = [ + OutputArgument(name='date', output_type=int, description='The date of the subnet reputation record.'), + OutputArgument(name='subnet', output_type=str, description='The subnet associated with the reputation record.'), + OutputArgument(name='subnet_reputation', output_type=int, description='The reputation score of the subnet.'), + OutputArgument(name='ips_in_subnet', output_type=int, description='Total number of IPs in the subnet.'), + OutputArgument(name='ips_num_active', output_type=int, description='Number of active IPs in the subnet.'), + OutputArgument(name='ips_num_listed', output_type=int, description='Number of listed IPs in the subnet.') + ] +ASNS_DOMAIN_OUTPUTS = [ + OutputArgument(name='domain', output_type=str, description='The domain name for which ASNs are retrieved.'), + OutputArgument(name='domain_asns', output_type=dict, description='Dictionary of Autonomous System Numbers (ASNs) associated with the domain.') + ] +DENSITY_LOOKUP_OUTPUTS = [ + OutputArgument(name='density', output_type=int, description='The density value associated with the query result.'), + OutputArgument(name='nssrv', output_type=str, description='The name server (NS) for the query result.') + ] +SEARCH_DOMAIN_OUTPUTS = [ + OutputArgument(name='asn_diversity', output_type=int, description='The diversity of Autonomous System Numbers (ASNs) associated with the domain.'), + OutputArgument(name='host', output_type=str, description='The domain name (host) associated with the record.'), + OutputArgument(name='ip_diversity_all', output_type=int, description='The total number of unique IPs associated with the domain.'), + OutputArgument(name='ip_diversity_groups', output_type=int, description='The number of unique IP groups associated with the domain.') + ] +DOMAIN_INFRATAGS_OUTPUTS = [ + OutputArgument(name='infratags.domain', + output_type=str, + description='The domain associated with the infratag.'), + OutputArgument(name='infratags.mode', + output_type=str, + description='The mode associated with the domain infratag.'), + OutputArgument(name='infratags.tag', + output_type=str, + description='The tag associated with the domain infratag.'), + + OutputArgument(name='tag_clusters.25.domains', + output_type=list, + description='List of domains in the tag cluster with score 25.'), + OutputArgument(name='tag_clusters.25.match', + output_type=str, + description='The match string associated with the domains in the tag cluster with score 25.'), + + OutputArgument(name='tag_clusters.50.domains', + output_type=list, + description='List of domains in the tag cluster with score 50.'), + OutputArgument(name='tag_clusters.50.match', + output_type=str, + description='The match string associated with the domains in the tag cluster with score 50.'), + + OutputArgument(name='tag_clusters.75.domains', + output_type=list, + description='List of domains in the tag cluster with score 75.'), + OutputArgument(name='tag_clusters.75.match', + output_type=str, + description='The match string associated with the domains in the tag cluster with score 75.'), + + OutputArgument(name='tag_clusters.100.domains', + output_type=list, + description='List of domains in the tag cluster with score 100.'), + OutputArgument(name='tag_clusters.100.match', + output_type=str, + description='The match string associated with the domains in the tag cluster with score 100.') + ] +LIST_DOMAIN_OUTPUTS = [ + OutputArgument(name='domain', output_type=str, description='The domain name queried.'), + OutputArgument(name='last_seen', output_type=int, description='The last seen date of the domain in YYYYMMDD format.'), + OutputArgument(name='query', output_type=str, description='The domain name used for the query.'), + OutputArgument(name='whois_age', output_type=int, description='The age of the domain in days based on WHOIS creation date.'), + OutputArgument(name='first_seen', output_type=int, description='The first seen date of the domain in YYYYMMDD format.'), + OutputArgument(name='is_new', output_type=bool, description='Indicates whether the domain is newly observed.'), + OutputArgument(name='zone', output_type=str, description='The top-level domain (TLD) or zone of the queried domain.'), + OutputArgument(name='registrar', output_type=str, description='The registrar responsible for the domain registration.'), + OutputArgument(name='age_score', output_type=int, description='A risk score based on the domain\'s age.'), + OutputArgument(name='whois_created_date', output_type=str, description='The WHOIS creation date of the domain in YYYY-MM-DD HH:MM:SS format.'), + OutputArgument(name='is_new_score', output_type=int, description='A risk score indicating how new the domain is.'), + OutputArgument(name='age', output_type=int, description='The age of the domain in days.') + ] +DOMAIN_CERTIFICATE_OUTPUTS = [ + OutputArgument(name='cert_index', output_type=int, description='Index of the certificate.'), + OutputArgument(name='chain', output_type=list, description='Certificate chain.'), + OutputArgument(name='date', output_type=int, description='Certificate issue date.'), + OutputArgument(name='domain', output_type=str, description='Primary domain of the certificate.'), + OutputArgument(name='domains', output_type=list, description='List of domains covered by the certificate.'), + OutputArgument(name='fingerprint', output_type=str, description='SHA-1 fingerprint of the certificate.'), + OutputArgument(name='fingerprint_md5', output_type=str, description='MD5 fingerprint of the certificate.'), + OutputArgument(name='fingerprint_sha1', output_type=str, description='SHA-1 fingerprint of the certificate.'), + OutputArgument(name='fingerprint_sha256', output_type=str, description='SHA-256 fingerprint of the certificate.'), + OutputArgument(name='host', output_type=str, description='Host associated with the certificate.'), + OutputArgument(name='issuer', output_type=str, description='Issuer of the certificate.'), + OutputArgument(name='not_after', output_type=str, description='Expiration date of the certificate.'), + OutputArgument(name='not_before', output_type=str, description='Start date of the certificate validity.'), + OutputArgument(name='serial_dec', output_type=str, description='Decimal representation of the serial number.'), + OutputArgument(name='serial_hex', output_type=str, description='Hexadecimal representation of the serial number.'), + OutputArgument(name='serial_number', output_type=str, description='Serial number of the certificate.'), + OutputArgument(name='source_name', output_type=str, description='Source log name of the certificate.'), + OutputArgument(name='source_url', output_type=str, description='URL of the certificate log source.'), + OutputArgument(name='subject', output_type=str, description='Subject details of the certificate.'), + OutputArgument(name='wildcard', output_type=int, description='Indicates if the certificate is a wildcard certificate.') + ] +ENRICHMENT_OUTPUTS = [ + OutputArgument(name='ip_is_dsl_dynamic', output_type=bool, description='Indicates if the IP is DSL dynamic.'), + OutputArgument(name='ip_has_expired_certificate', output_type=bool, description='Indicates if the IP has an expired certificate.'), + OutputArgument(name='subnet_allocation_age', output_type=str, description='Age of the subnet allocation.'), + OutputArgument(name='asn_rank_score', output_type=int, description='Score of the ASN rank.'), + OutputArgument(name='asn_allocation_age', output_type=int, description='Age of the ASN allocation.'), + OutputArgument(name='sp_risk_score', output_type=int, description='Risk score for the service provider.'), + OutputArgument(name='ip_reputation_score', output_type=int, description='Reputation score of the IP.'), + OutputArgument(name='ip', output_type=str, description='The IP address.'), + OutputArgument(name='density', output_type=int, description='Density value for the IP address.'), + OutputArgument(name='benign_info.actor', output_type=str, description='Actor associated with the benign information.'), + OutputArgument(name='benign_info.known_benign', output_type=bool, description='Indicates if the resource is known to be benign.'), + OutputArgument(name='benign_info.tags', output_type=object, description='Tags associated with the benign information.'), + OutputArgument(name='asn_allocation_date', output_type=int, description='Date of ASN allocation in YYYYMMDD format.'), + OutputArgument(name='subnet_allocation_date', output_type=str, description='Date of subnet allocation or UNKNOWN if unavailable.'), + OutputArgument(name='asn_takedown_reputation', output_type=int, description='Reputation score for ASN takedown.'), + OutputArgument(name='ip_location.continent_code', output_type=str, description='Continent code where the IP is located.'), + OutputArgument(name='ip_location.continent_name', output_type=str, description='Continent name where the IP is located.'), + OutputArgument(name='ip_location.country_code', output_type=str, description='Country code of the IP location.'), + OutputArgument(name='ip_location.country_is_in_european_union', output_type=bool, description='Indicates if the country is in the European Union.'), + OutputArgument(name='ip_location.country_name', output_type=str, description='Country name where the IP is located.'), + OutputArgument(name='date', output_type=int, description='Date of the record in YYYYMMDD format.'), + OutputArgument(name='subnet_reputation_score', output_type=int, description='Reputation score of the subnet.'), + OutputArgument(name='asn_rank', output_type=int, description='Rank of the ASN.'), + OutputArgument(name='asn_reputation_score', output_type=int, description='Reputation score of the ASN.'), + OutputArgument(name='ip_is_ipfs_node', output_type=bool, description='Indicates if the IP is an IPFS node.'), + OutputArgument(name='value', output_type=str, description='The value associated with the IP or subnet.'), + OutputArgument(name='ip_reputation', output_type=int, description='Reputation score of the IP address.'), + OutputArgument(name='ip_is_dsl_dynamic_score', output_type=int, description='Score indicating if the IP is DSL dynamic.'), + OutputArgument(name='ip_has_open_directory', output_type=bool, description='Indicates if the IP has an open directory.'), + OutputArgument(name='ip_ptr', output_type=str, description='Pointer record (PTR) of the IP address.'), + OutputArgument(name='listing_score', output_type=int, description='Listing score of the IP address or resource.'), + OutputArgument(name='malscore', output_type=int, description='Malware score for the IP address.'), + OutputArgument(name='sinkhole_info.known_sinkhole_ip', output_type=bool, description='Indicates if the IP is associated with a known sinkhole.'), + OutputArgument(name='sinkhole_info.tags', output_type=object, description='Tags associated with the sinkhole information.'), + OutputArgument(name='subnet_reputation', output_type=int, description='Reputation score of the subnet.'), + OutputArgument(name='asn_reputation', output_type=int, description='Reputation score of the ASN.'), + OutputArgument(name='asn', output_type=int, description='Autonomous System Number (ASN) of the IP or subnet.'), + OutputArgument(name='asname', output_type=str, description='Name of the ASN associated with the IP or subnet.'), + OutputArgument(name='subnet', output_type=str, description='Subnet associated with the IP address.'), + OutputArgument(name='ip_is_tor_exit_node', output_type=bool, description='Indicates if the IP is a TOR exit node.'), + OutputArgument(name='asn_takedown_reputation_score', output_type=int, description='Reputation score for ASN takedown.') + ] +LIST_IP_OUTPUTS = [ + OutputArgument(name='ip_is_dsl_dynamic', output_type=bool, description='Indicates if the IP is a DSL dynamic IP.'), + OutputArgument(name='ip_has_expired_certificate', output_type=bool, description='Indicates if the IP has an expired certificate.'), + OutputArgument(name='subnet_allocation_age', output_type=str, description='Age of the subnet allocation.'), + OutputArgument(name='asn_rank_score', output_type=int, description='Rank score of the ASN.'), + OutputArgument(name='asn_allocation_age', output_type=int, description='Age of the ASN allocation in days.'), + OutputArgument(name='sp_risk_score', output_type=int, description='Risk score of the service provider (SP).'), + OutputArgument(name='asn_takedown_reputation_explain.ips_active', output_type=int, description='Number of active IPs in the ASN takedown reputation.'), + OutputArgument(name='asn_takedown_reputation_explain.ips_in_asn', output_type=int, description='Total number of IPs in the ASN.'), + OutputArgument(name='asn_takedown_reputation_explain.ips_num_listed', output_type=int, description='Number of IPs listed in the ASN takedown reputation.'), + OutputArgument(name='asn_takedown_reputation_explain.items_num_listed', output_type=int, description='Number of items listed in the ASN takedown reputation.'), + OutputArgument(name='asn_takedown_reputation_explain.lifetime_avg', output_type=int, description='Average lifetime of items in the ASN takedown reputation.'), + OutputArgument(name='asn_takedown_reputation_explain.lifetime_max', output_type=int, description='Maximum lifetime of items in the ASN takedown reputation.'), + OutputArgument(name='asn_takedown_reputation_explain.lifetime_total', output_type=int, description='Total lifetime of items in the ASN takedown reputation.'), + OutputArgument(name='ip_reputation_score', output_type=int, description='Reputation score of the IP.'), + OutputArgument(name='listing_score_feeds_explain', output_type=str, description='Explanation of the listing score feeds.'), + OutputArgument(name='ip', output_type=str, description='The IP address being evaluated.'), + OutputArgument(name='density', output_type=int, description='Density score of the IP.'), + OutputArgument(name='benign_info.actor', output_type=str, description='Actor associated with the benign info.'), + OutputArgument(name='benign_info.known_benign', output_type=bool, description='Indicates if the IP is known benign.'), + OutputArgument(name='benign_info.tags', output_type=str, description='Tags associated with the benign info.'), + OutputArgument(name='ip_reputation_explain', output_type=str, description='Explanation of the IP reputation.'), + OutputArgument(name='asn_allocation_date', output_type=int, description='The ASN allocation date.'), + OutputArgument(name='subnet_allocation_date', output_type=str, description='The subnet allocation date.'), + OutputArgument(name='asn_takedown_reputation', output_type=int, description='Reputation score of ASN takedown.'), + OutputArgument(name='ip_location.continent_code', output_type=str, description='Continent code of the IP location.'), + OutputArgument(name='ip_location.continent_name', output_type=str, description='Continent name of the IP location.'), + OutputArgument(name='ip_location.country_code', output_type=str, description='Country code of the IP location.'), + OutputArgument(name='ip_location.country_is_in_european_union', output_type=bool, description='Indicates if the country is in the European Union.'), + OutputArgument(name='ip_location.country_name', output_type=str, description='Country name of the IP location.'), + OutputArgument(name='date', output_type=int, description='Date associated with the IP data.'), + OutputArgument(name='subnet_reputation_score', output_type=int, description='Reputation score of the subnet.'), + OutputArgument(name='asn_rank', output_type=int, description='Rank of the ASN.'), + OutputArgument(name='listing_score_explain', output_type=str, description='Explanation of the listing score.'), + OutputArgument(name='asn_reputation_score', output_type=int, description='Reputation score of the ASN.'), + OutputArgument(name='ip_is_ipfs_node', output_type=bool, description='Indicates if the IP is an IPFS node.'), + OutputArgument(name='ip_reputation', output_type=int, description='Reputation score of the IP.'), + OutputArgument(name='subnet_reputation_explain', output_type=str, description='Explanation of the subnet reputation.'), + OutputArgument(name='ip_is_dsl_dynamic_score', output_type=int, description='Score indicating if the IP is a DSL dynamic IP.'), + OutputArgument(name='asn_reputation_explain', output_type=str, description='Explanation of the ASN reputation.'), + OutputArgument(name='ip_has_open_directory', output_type=bool, description='Indicates if the IP has an open directory.'), + OutputArgument(name='ip_ptr', output_type=str, description='Pointer (PTR) record for the IP.'), + OutputArgument(name='listing_score', output_type=int, description='Listing score of the IP.'), + OutputArgument(name='malscore', output_type=int, description='Malware score associated with the IP.'), + OutputArgument(name='sinkhole_info.known_sinkhole_ip', output_type=bool, description='Indicates if the IP is a known sinkhole IP.'), + OutputArgument(name='sinkhole_info.tags', output_type=str, description='Tags associated with the sinkhole information.'), + OutputArgument(name='subnet_reputation', output_type=int, description='Reputation score of the subnet.'), + OutputArgument(name='asn_reputation', output_type=int, description='Reputation score of the ASN.'), + OutputArgument(name='asn', output_type=int, description='Autonomous System Number (ASN) of the IP.'), + OutputArgument(name='sp_risk_score_explain.sp_risk_score_decider', output_type=str, description='Decider for the service provider risk score.'), + OutputArgument(name='asname', output_type=str, description='Name of the ASN.'), + OutputArgument(name='subnet', output_type=str, description='The subnet the IP belongs to.'), + OutputArgument(name='ip_is_tor_exit_node', output_type=bool, description='Indicates if the IP is a TOR exit node.'), + OutputArgument(name='asn_takedown_reputation_score', output_type=int, description='Reputation score of ASN takedown.') + ] +ASN_REPUTATION_OUTPUTS = [ + OutputArgument(name='asn', output_type=str, description='The Autonomous System Number (ASN).'), + OutputArgument(name='reputation_data.asn_reputation', output_type=int, description='Reputation score of the ASN.'), + OutputArgument(name='reputation_data.asname', output_type=str, description='Name of the Autonomous System (AS).'), + OutputArgument(name='reputation_data.date', output_type=int, description='Date the reputation data was recorded (YYYYMMDD).') + ] + + + + +metadata_collector = YMLMetadataCollector( + integration_name="SilentPush", + description=( + "The Silent Push Platform uses first-party data and a proprietary scanning engine to enrich global DNS data " + "with risk and reputation scoring, giving security teams the ability to join the dots across the entire IPv4 and IPv6 range, " + "and identify adversary infrastructure before an attack is launched. The content pack integrates with the Silent Push system " + "to gain insights into domain/IP information, reputations, enrichment, and infratag-related details. It also provides " + "functionality to live-scan URLs and take screenshots of them. Additionally, it allows fetching future attack feeds " + "from the Silent Push system." + ), + display="SilentPush", + category="Data Enrichment & Threat Intelligence", + docker_image="demisto/python3:3.11.10.116949", + is_fetch=False, + long_running=False, + long_running_port=False, + is_runonce=False, + integration_subtype="python3", + integration_type="python", + fromversion="5.0.0", + conf=[ + ConfKey( + name="url", + display="Base URL", + required=True, + default_value="https://api.silentpush.com" + ), + ConfKey( + name="credentials", + display="API Key", + required=False, + key_type=ParameterTypes.AUTH, + ), + ConfKey( + name="insecure", + display="Trust any certificate (not secure)", + required=False, + key_type=ParameterTypes.BOOLEAN + ), + ConfKey( + name="proxy", + display="Use system proxy settings", + required=False, + key_type=ParameterTypes.BOOLEAN + ) + ] +) + + +''' CLIENT CLASS ''' class Client(BaseClient): """Client class to interact with the SilentPush API @@ -44,7 +466,7 @@ class Client(BaseClient): def __init__(self, base_url: str, api_key: str, verify: bool = True, proxy: bool = False): """ Initializes the client with the necessary parameters. - + Args: base_url (str): The base URL for the SilentPush API. api_key (str): The API key for authentication. @@ -59,32 +481,25 @@ def __init__(self, base_url: str, api_key: str, verify: bool = True, proxy: bool 'X-API-Key': api_key, 'Content-Type': 'application/json' } - demisto.debug(f'Initialized client with base URL: {self.base_url}') def _http_request(self, method: str, url_suffix: str, params: dict = None, data: dict = None) -> Any: """ - Handles the HTTP requests to the SilentPush API. - - This function builds the request URL, adds the necessary headers, and sends a request - to the API. It returns the response in JSON format. - + Perform an HTTP request to the SilentPush API. + Args: - method (str): The HTTP method (GET, POST, etc.). - url_suffix (str): The specific endpoint to be appended to the base URL. - params (dict, optional): The URL parameters to be sent with the request. - data (dict, optional): The data to be sent with the request. - + method (str): The HTTP method to use (e.g., 'GET', 'POST'). + url_suffix (str): The endpoint suffix to append to the base URL. + params (dict, optional): Query parameters to include in the request. Defaults to None. + data (dict, optional): JSON data to send in the request body. Defaults to None. + Returns: - Any: The JSON response from the API. - + Any: The JSON response from the API or text response if not JSON. + Raises: - DemistoException: If there is an error in the API response. + DemistoException: If there's an error during the API call. """ - full_url = f'{self.base_url}{url_suffix}' - masked_headers = {k: v if k != 'X-API-Key' else '****' for k, v in self._headers.items()} - demisto.debug(f'Headers: {masked_headers}') - demisto.debug(f'Params: {params}') - demisto.debug(f'Data: {data}') + base_url = demisto.params().get('url', 'https://api.silentpush.com') if url_suffix.startswith("/api/v2/") else self.base_url + full_url = f'{base_url}{url_suffix}' try: response = requests.request( @@ -95,104 +510,1395 @@ def _http_request(self, method: str, url_suffix: str, params: dict = None, data: params=params, json=data ) - demisto.debug(f'Response status code: {response.status_code}') - demisto.debug(f'Response body: {response.text}') - - if response.status_code not in {200, 201}: - raise DemistoException(f'Error in API call [{response.status_code}] - {response.text}') - return response.json() + if response.headers.get('Content-Type', '').startswith('application/json'): + return response.json() + else: + return response.text except Exception as e: - demisto.error(f'Error in API call: {str(e)}') - raise + raise DemistoException(f'Error in API call: {str(e)}') - def list_domain_information(self, domain: str) -> dict: + def get_job_status(self, job_id: str, max_wait: Optional[int] = None, result_type: Optional[str] = None) -> Dict[str, Any]: """ - Fetches domain information such as WHOIS data, domain age, and risk scores. + Retrieve the status of a specific job. + + Args: + job_id (str): The unique identifier of the job to check. + max_wait (int, optional): Maximum wait time in seconds. Must be between 0 and 25. Defaults to None. + result_type (str, optional): Type of result to retrieve. Defaults to None. + + Returns: + Dict[str, Any]: Job status information. + + Raises: + ValueError: If max_wait is invalid or result_type is not in allowed values. + """ + url_suffix = f"{JOB_STATUS}/{job_id}" + params = {} + + if max_wait is not None: + if not (0 <= max_wait <= 25): + raise ValueError("max_wait must be an integer between 0 and 25") + params['max_wait'] = max_wait + + valid_result_types = {'Status', 'Include Metadata', 'Exclude Metadata'} + if result_type and result_type not in valid_result_types: + raise ValueError(f"result_type must be one of {valid_result_types}") + if result_type: + params['result_type'] = result_type + + return self._http_request(method="GET", url_suffix=url_suffix, params=params) + + def get_nameserver_reputation(self, nameserver: str, explain: bool = False, limit: int = None): + """ + Retrieve historical reputation data for the specified nameserver. + Args: - domain (str): The domain to fetch information for. - + nameserver (str): The nameserver for which the reputation data is to be fetched. + explain (bool): Whether to include detailed calculation explanations. + limit (int): Maximum number of reputation entries to return. + Returns: - dict: A dictionary containing domain information fetched from the API. + dict: Reputation history for the given nameserver. """ - demisto.debug(f'Fetching domain information for domain: {domain}') - url_suffix = f'explore/domain/domaininfo/{domain}' - return self._http_request('GET', url_suffix) + url_suffix = f"{NAMESERVER_REPUTATION}/{nameserver}" -def test_module(client: Client) -> str: - """ - Tests connectivity to the SilentPush API and checks the authentication status. - - This function will validate the API key and ensure that the client can successfully connect - to the API. It is called when running the 'Test' button in XSOAR. - - Args: - client (Client): The client instance to use for the connection test. + params = filter_none_values({'explain': explain, 'limit': limit}) + + response = self._http_request(method="GET", url_suffix=url_suffix, params=params) + + # Return the reputation history, or an empty list if not found + return response.get('response', {}).get('ns_server_reputation', []) + + def get_subnet_reputation(self, subnet: str, explain: bool = False, limit: Optional[int] = None) -> Dict[str, Any]: + """ + Retrieve reputation history for a specific subnet. + + Args: + subnet (str): The subnet to query. + explain (bool, optional): Whether to include detailed explanations. Defaults to False. + limit (int, optional): Maximum number of results to return. Defaults to None. + + Returns: + Dict[str, Any]: Subnet reputation history information. + """ + url_suffix = f"{SUBNET_REPUTATION}/{subnet}" + + params = { + "explain": str(explain).lower() if explain else None, + "limit": limit + } + + params = filter_none_values(params) + + return self._http_request(method="GET", url_suffix=url_suffix, params=params) + + def get_asns_for_domain(self, domain: str) -> Dict[str, Any]: + """ + Retrieve Autonomous System Numbers (ASNs) associated with the specified domain. + + Args: + domain (str): The domain to retrieve ASNs for. + + Returns: + Dict[str, Any]: A dictionary containing the ASN information for the domain. + """ + url_suffix = f"{ASNS_DOMAIN}/{domain}" + + # Send the request and return the response directly + return self._http_request(method="GET", url_suffix=url_suffix) + + def density_lookup(self, qtype: str, query: str, **kwargs) -> Dict[str, Any]: + """ + Perform a density lookup based on various query types and optional parameters. + + Args: + qtype (str): Query type to perform the lookup. Options include: nssrv, mxsrv, nshash, mxhash, ipv4, ipv6, asn, chv. + query (str): The value to look up. + **kwargs: Optional parameters (e.g., filters) for scoping the lookup. + + Returns: + Dict[str, Any]: The results of the density lookup, containing relevant information based on the query. + """ + url_suffix = f"{DENSITY_LOOKUP}/{qtype}/{query}" + + params = filter_none_values(kwargs) + + return self._http_request( + method="GET", + url_suffix=url_suffix, + params=params + ) + + def search_domains(self, query: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, + risk_score_min: Optional[int] = None, risk_score_max: Optional[int] = None, limit: int = 100, + domain_regex: Optional[str] = None, name_server: Optional[str] = None, asnum: Optional[int] = None, + asname: Optional[str] = None, min_ip_diversity: Optional[int] = None, registrar: Optional[str] = None, + min_asn_diversity: Optional[int] = None, certificate_issuer: Optional[str] = None, + whois_date_after: Optional[str] = None, skip: Optional[int] = None) -> dict: + """ + Search for domains based on various filtering criteria. + + Args: + query (str, optional): Domain search query. + start_date (str, optional): Start date for domain search (YYYY-MM-DD). + end_date (str, optional): End date for domain search (YYYY-MM-DD). + risk_score_min (int, optional): Minimum risk score filter. + risk_score_max (int, optional): Maximum risk score filter. + limit (int, optional): Maximum number of results to return (defaults to 100). + domain_regex (str, optional): Regular expression to filter domains. + name_server (str, optional): Name server filter. + asnum (int, optional): Autonomous System Number (ASN) filter. + asname (str, optional): ASN Name filter. + min_ip_diversity (int, optional): Minimum IP diversity filter. + registrar (str, optional): Domain registrar filter. + min_asn_diversity (int, optional): Minimum ASN diversity filter. + certificate_issuer (str, optional): Filter domains by certificate issuer. + whois_date_after (str, optional): Filter domains based on WHOIS date (YYYY-MM-DD). + skip (int, optional): Number of results to skip. + + Returns: + dict: Search results matching the specified criteria. + """ + url_suffix = SEARCH_DOMAIN + + # Prepare parameters and filter out None values using filter_none_values helper function + params = filter_none_values({ + 'domain': query, + 'start_date': start_date, + 'end_date': end_date, + 'risk_score_min': risk_score_min, + 'risk_score_max': risk_score_max, + 'limit': limit, + 'domain_regex': domain_regex, + 'name_server': name_server, + 'asnum': asnum, + 'asname': asname, + 'min_ip_diversity': min_ip_diversity, + 'registrar': registrar, + 'min_asn_diversity': min_asn_diversity, + 'certificate_issuer': certificate_issuer, + 'whois_date_after': whois_date_after, + 'skip': skip, + }) + + # Make the request with the filtered parameters + return self._http_request('GET', url_suffix, params=params) + + def list_domain_infratags( + self, + domains: list, + cluster: bool = False, + mode: str = 'live', + match: str = 'self', + as_of: Optional[str] = None, + origin_uid: Optional[str] = None, + use_get: bool = False + ) -> dict: + """ + Retrieve infrastructure tags for specified domains, supporting both GET and POST methods. + + Args: + domains (list): List of domains to fetch infrastructure tags for. + cluster (bool): Whether to include cluster information (default: False). + mode (str): Tag retrieval mode (default: 'live'). + match (str): Matching criteria (default: 'self'). + as_of (Optional[str]): Specific timestamp for tag retrieval. + origin_uid (Optional[str]): Unique identifier for the API user. + use_get (bool): Use GET method instead of POST (default: False). + + Returns: + dict: API response containing infratags and optional tag clusters. + """ + url_suffix = DOMAIN_INFRATAGS + + # Construct the params dictionary + params = { + 'mode': mode, + 'match': match, + 'clusters': int(cluster), + 'as_of': as_of, + 'origin_uid': origin_uid + } + + # Remove any None values from params using filter_none_values helper function + params = filter_none_values(params) + + if use_get: + # Use GET method + response = self._http_request( + method='GET', + url_suffix=url_suffix, + params=params + ) + else: + # Use POST method + payload = {'domains': domains} + response = self._http_request( + method='POST', + url_suffix=url_suffix, + params=params, + data=payload + ) + + return response + + def fetch_bulk_domain_info(self, domains: List[str]) -> Dict[str, Any]: + """Fetch basic domain information for a list of domains.""" + response = self._http_request( + method='POST', + url_suffix=DOMAIN_INFO, + data={'domains': domains} + ) + domain_info_list = response.get('response', {}).get('domaininfo', []) + return {item['domain']: item for item in domain_info_list} + + def fetch_risk_scores(self, domains: List[str]) -> Dict[str, Any]: + """Fetch risk scores for a list of domains.""" + response = self._http_request( + method='POST', + url_suffix=RISK_SCORE, + data={'domains': domains} + ) + risk_score_list = response.get('response', []) + return {item['domain']: item for item in risk_score_list} + + def fetch_whois_info(self, domain: str) -> Dict[str, Any]: + """Fetch WHOIS information for a single domain.""" + try: + response = self._http_request( + method='GET', + url_suffix=f'{WHOIS}/{domain}' + ) + whois_data = response.get('response', {}).get('whois', [{}])[0] + + return { + 'Registrant Name': whois_data.get('name', 'N/A'), + 'Registrant Organization': whois_data.get('org', 'N/A'), + 'Registrant Address': ', '.join(whois_data.get('address', [])) if isinstance(whois_data.get('address'), list) else whois_data.get('address', 'N/A'), + 'Registrant City': whois_data.get('city', 'N/A'), + 'Registrant State': whois_data.get('state', 'N/A'), + 'Registrant Country': whois_data.get('country', 'N/A'), + 'Registrant Zipcode': whois_data.get('zipcode', 'N/A'), + 'Creation Date': whois_data.get('created', 'N/A'), + 'Updated Date': whois_data.get('updated', 'N/A'), + 'Expiration Date': whois_data.get('expires', 'N/A'), + 'Registrar': whois_data.get('registrar', 'N/A'), + 'WHOIS Server': whois_data.get('whois_server', 'N/A'), + 'Nameservers': ', '.join(whois_data.get('nameservers', [])), + 'Emails': ', '.join(whois_data.get('emails', [])) + } + except Exception as e: + return {'error': str(e)} + + def list_domain_information(self, domains: List[str], fetch_risk_score: Optional[bool] = False, fetch_whois_info: Optional[bool] = False) -> Dict[str, Any]: + """ + Retrieve domain information along with optional risk scores and WHOIS data. + + Args: + http_request (function): HTTP request function for making API calls. + domains (List[str]): List of domains to get information for. + fetch_risk_score (bool, optional): Whether to fetch risk scores. Defaults to False. + fetch_whois_info (bool, optional): Whether to fetch WHOIS information. Defaults to False. + + Returns: + Dict[str, Any]: Dictionary containing domain information with optional risk scores and WHOIS data. + + Raises: + ValueError: If more than 100 domains are provided. + """ + if len(domains) > 100: + raise ValueError("Maximum of 100 domains can be submitted in a single request.") + + domain_info_dict = self.fetch_bulk_domain_info(domains) + + risk_score_dict = self.fetch_risk_scores(domains) if fetch_risk_score else {} + + whois_info_dict = {domain: self.fetch_whois_info(domain) for domain in domains} if fetch_whois_info else {} + + results = [] + for domain in domains: + domain_info = { + 'domain': domain, + **domain_info_dict.get(domain, {}), + } + + if fetch_risk_score: + risk_data = risk_score_dict.get(domain, {}) + domain_info.update({ + 'risk_score': risk_data.get('sp_risk_score', 'N/A'), + 'risk_score_explanation': risk_data.get('sp_risk_score_explain', 'N/A') + }) + + if fetch_whois_info: + domain_info['whois_info'] = whois_info_dict.get(domain, {}) + + results.append(domain_info) + + return {'domains': results} + + def get_domain_certificates(self, domain: str, **kwargs) -> Dict[str, Any]: + """ + Retrieve SSL certificate details associated with a given domain. + + Args: + domain (str): The domain for which SSL certificate details are retrieved. + **kwargs: Optional query parameters for filtering the results. + + Returns: + Dict[str, Any]: SSL certificate details for the specified domain. + """ + url_suffix = f"{DOMAIN_CERTIFICATE}/{domain}" + params = filter_none_values(kwargs) + return self._http_request( + method="GET", + url_suffix=url_suffix, + params=params + ) - Returns: - str: 'ok' if the connection is successful, otherwise returns an error message. + def parse_subject(self, subject: Any) -> Dict[str, Any]: + """ + Parse the subject of a certificate or domain record. + + Args: + subject (Any): The subject to parse, which can be a dictionary, string, or other type. + + Returns: + Dict[str, Any]: A dictionary representation of the subject, + with a fallback to {'CN': subject} or {'CN': 'N/A'} if parsing fails. + """ + if isinstance(subject, dict): + return subject + if isinstance(subject, str): + parsed_subject = json.loads(subject.replace("'", '"')) if subject else {'CN': 'N/A'} + return parsed_subject if isinstance(parsed_subject, dict) else {'CN': subject} + return {'CN': 'N/A'} + + def validate_ip_address(self, ip: str, allow_ipv6: bool = True) -> bool: + """ + Validate an IP address. + + Args: + self: The instance of the class. + ip (str): IP address to validate. + allow_ipv6 (bool, optional): Whether to allow IPv6 addresses. Defaults to True. + + Returns: + bool: True if valid IP address, False otherwise. + """ + try: + ip = ip.strip() + ip_obj = ipaddress.ip_address(ip) + + return not (not allow_ipv6 and ip_obj.version == 6) + except ValueError: + return False + + def get_enrichment_data(self, resource: str, value: str, explain: Optional[bool] = False, scan_data: Optional[bool] = False) -> dict: + """ + Retrieve enrichment data for a specific resource. + + Args: + resource (str): Type of resource (e.g., 'ip', 'domain'). + value (str): The specific value to enrich. + explain (bool, optional): Whether to include detailed explanations. Defaults to False. + scan_data (bool, optional): Whether to include scan data. Defaults to False. + + Returns: + dict: Enrichment data for the specified resource. + """ + endpoint = f"{ENRICHMENT}/{resource}/{value}" + + query_params = { + "explain": int(explain) if explain else 0, + "scan_data": int(scan_data) if scan_data else 0 + } + response = self._http_request( + method="GET", + url_suffix=endpoint, + params=query_params + ) + # Handle the response based on resource type + if resource in ["ip", "ipv4", "ipv6"]: + ip2asn_data = response.get("response", {}).get("ip2asn", []) + return ip2asn_data[0] if isinstance(ip2asn_data, list) and ip2asn_data else {} + return response.get("response", {}).get("domaininfo", {}) + + def validate_ips(self, ips: List[str]) -> None: + """Validates the number of IPs in the list.""" + if len(ips) > 100: + raise DemistoException("Maximum of 100 IPs can be submitted in a single request.") + + def list_ip_information(self, ips: List[str], resource: str) -> Dict: + """ + Retrieve information for multiple IP addresses. + + Args: + ips (List[str]): List of IPv4 or IPv6 addresses to fetch information for. + resource (str): The resource type ('ipv4' or 'ipv6'). + + Returns: + Dict: API response containing IP information. + """ + self.validate_ips(ips) + + ip_data = {"ips": ips} + url_suffix = f"{LIST_IP}/{resource}" + + return self._http_request("POST", url_suffix, data=ip_data) + + def get_asn_reputation(self, asn: int, limit: Optional[int] = None, explain: Optional[bool] = False) -> Dict[str, Any]: + """ + Retrieve reputation history for a specific Autonomous System Number (ASN). + + Args: + asn (int): The Autonomous System Number to query. + limit (int, optional): Maximum number of results to return. Defaults to None. + explain (bool, optional): Whether to include explanation for reputation score. Defaults to False. + + Returns: + Dict[str, Any]: ASN reputation history information. + """ + url_suffix = f"{ASN_REPUTATION}/{asn}" + query_params = {} + + if limit: + query_params['limit'] = limit + if explain: + query_params['explain'] = explain + + return self._http_request( + method="GET", + url_suffix=url_suffix, + params=query_params + ) + + + +''' HELPER FUNCTIONS ''' +def filter_none_values(params: Dict[str, Any]) -> Dict[str, Any]: + """Removes None values from a dictionary.""" + return {k: v for k, v in params.items() if v is not None} + + +''' COMMAND FUNCTIONS ''' + + +def test_module(client: Client, first_fetch_time: int) -> str: + """Tests API connectivity and authentication' + + Returning 'ok' indicates that the integration works like it is supposed to. + Connection to the service is successful. + Raises exceptions if something goes wrong. + + :type client: ``Client`` + :param Client: SilentPush client to use + + :type name: ``str`` + :param name: name to append to the 'Hello' string + + :return: 'ok' if test passed, anything else will fail the test. + :rtype: ``str`` """ - demisto.debug('Running test module...') + + # INTEGRATION DEVELOPER TIP + # Client class should raise the exceptions, but if the test fails + # the exception text is printed to the Cortex XSOAR UI. + # If you have some specific errors you want to capture (i.e., auth failure) + # you should catch the exception here and return a string with a more + # readable output (for example return 'Authentication Error, API Key + # invalid'). + # Cortex XSOAR will print everything you return that is different than 'ok' as + # an error. try: - client.list_domain_information('silentpush.com') - demisto.debug('Test module completed successfully') + resp = client.search_domains("job_id", "max_wait", "result_type") + if resp.get("status_code") != 200: + return f"Connection failed :- {resp.get('errors')}" return 'ok' except DemistoException as e: - demisto.debug(f'Test module failed: {str(e)}') if 'Forbidden' in str(e) or 'Authorization' in str(e): return 'Authorization Error: make sure API Key is correctly set' raise e + + +@metadata_collector.command( + command_name="silentpush-get-job-status", + inputs_list=JOB_STATUS_INPUTS, + outputs_prefix="SilentPush.JobStatus", + outputs_list=JOB_STATUS_OUTPUTS, + description="This command retrieve status of running job or results from completed job.", +) +def get_job_status_command(client: Client, args: dict) -> CommandResults: + """ + Retrieves the status of a job based on the provided job ID and other optional parameters. + + Args: + client (Client): The client instance that interacts with the service to fetch job status. + args (dict): A dictionary of arguments, which should include: + - 'job_id' (str): The unique identifier of the job for which status is being retrieved. + - 'max_wait' (Optional[int]): The maximum wait time in seconds (default is None). + - 'result_type' (Optional[str]): Type of result to retrieve. Valid options are 'Status', + 'Include Metadata', or 'Exclude Metadata' (default is None). + + Returns: + CommandResults: The command results containing: + - 'outputs_prefix' (str): The prefix for the output context. + - 'outputs_key_field' (str): The field used as the key in the outputs. + - 'outputs' (dict): A dictionary with job ID and job status information. + - 'readable_output' (str): A formatted string that represents the job status in a human-readable format. + - 'raw_response' (dict): The raw response received from the service. + + Raises: + DemistoException: If the 'job_id' parameter is missing or if no job status is found for the given job ID. + """ + job_id = args.get('job_id') + max_wait = arg_to_number(args.get('max_wait')) + result_type = args.get('result_type') + + if not job_id: + raise DemistoException("job_id is a required parameter") + + raw_response = client.get_job_status(job_id, max_wait, result_type) + job_status = raw_response.get('response', {}) + + if not job_status: + raise DemistoException(f"No job status found for Job ID: {job_id}") + + readable_output = tableToMarkdown( + f"Job Status for Job ID: {job_id}", + [job_status], + headers=list(job_status.keys()), + removeNull=True + ) + return CommandResults( + outputs_prefix='SilentPush.JobStatus', + outputs_key_field='job_id', + outputs={'job_id': job_id, **job_status}, + readable_output=readable_output, + raw_response=raw_response + ) + + +@metadata_collector.command( + command_name="silentpush-get-nameserver-reputation", + inputs_list=NAMESERVER_REPUTATION_INPUTS, + outputs_prefix="SilentPush.SubnetReputation", + outputs_list=NAMESERVER_REPUTATION_OUTPUTS, + description="This command retrieve historical reputation data for a specified nameserver, including reputation scores and optional detailed calculation information.", +) +def get_nameserver_reputation_command(client: Client, args: dict) -> CommandResults: + """ + Command handler for retrieving nameserver reputation. + + Args: + client (Client): The API client instance. + args (dict): Command arguments. + + Returns: + CommandResults: The command results containing nameserver reputation data. + """ + nameserver = args.get("nameserver") + explain = argToBoolean(args.get("explain", False)) + limit = arg_to_number(args.get("limit")) + + if not nameserver: + raise ValueError("Nameserver is required.") + + # Fetch reputation data + reputation_data = client.get_nameserver_reputation(nameserver, explain, limit) + + # Prepare the readable output + if reputation_data: + readable_output = tableToMarkdown( + f"Nameserver Reputation for {nameserver}", + reputation_data, + headers=list(reputation_data[0].keys()), + removeNull=True + ) + else: + readable_output = f"No reputation history found for nameserver: {nameserver}" + + # Return command results + return CommandResults( + outputs_prefix="SilentPush.NameserverReputation", + outputs_key_field="ns_server", + outputs={"nameserver": nameserver, "reputation_data": reputation_data}, + readable_output=readable_output, + raw_response=reputation_data + ) + +@metadata_collector.command( + command_name="silentpush-get-subnet-reputation", + inputs_list=SUBNET_REPUTATION_INPUTS, + outputs_prefix="SilentPush.SubnetReputation", + outputs_list=SUBNET_REPUTATION_OUTPUTS, + description="This command retrieves the reputation history for a specific subnet." +) +def get_subnet_reputation_command(client: Client, args: dict) -> CommandResults: + """ + Retrieves the reputation history of a given subnet. + + Args: + client (Client): The API client instance. + args (dict): Command arguments containing: + - subnet (str): The subnet to query. + - explain (bool, optional): Whether to include an explanation. + - limit (int, optional): Limit the number of reputation records. + + Returns: + CommandResults: The command result containing the subnet reputation data. + """ + subnet = args.get('subnet') + if not subnet: + raise DemistoException("Subnet is a required parameter.") + + explain = argToBoolean(args.get('explain', False)) + limit = arg_to_number(args.get('limit')) + + raw_response = client.get_subnet_reputation(subnet, explain, limit) + subnet_reputation = raw_response.get('response', {}).get('subnet_reputation_history', []) + + readable_output = ( + f"No reputation history found for subnet: {subnet}" + if not subnet_reputation + else tableToMarkdown(f"Subnet Reputation for {subnet}", subnet_reputation, removeNull=True) + ) + + return CommandResults( + outputs_prefix='SilentPush.SubnetReputation', + outputs_key_field='subnet', + outputs={'subnet': subnet, 'reputation_history': subnet_reputation}, + readable_output=readable_output, + raw_response=raw_response + ) + + +@metadata_collector.command( + command_name="silentpush-get-asns-for-domain", + inputs_list=ASNS_DOMAIN_INPUTS, + outputs_prefix="SilentPush.DomainASNs", + outputs_list=ASNS_DOMAIN_OUTPUTS, + description="This command retrieves Autonomous System Numbers (ASNs) associated with a domain." +) +def get_asns_for_domain_command(client: Client, args: dict) -> CommandResults: + """ + Retrieves Autonomous System Numbers (ASNs) for the specified domain. + + Args: + client (Client): The client object used to interact with the service. + args (dict): Arguments passed to the command, including the domain. + + Returns: + CommandResults: The results containing ASNs for the domain or an error message. + """ + domain = args.get('domain') + + if not domain: + raise DemistoException("Domain is a required parameter.") + + raw_response = client.get_asns_for_domain(domain) + records = raw_response.get('response', {}).get('records', []) + + if not records or 'domain_asns' not in records[0]: + readable_output = f"No ASNs found for domain: {domain}" + asns = [] + else: + domain_asns = records[0]['domain_asns'] + asns = [{'ASN': asn, 'Description': description} + for asn, description in domain_asns.items()] + + readable_output = tableToMarkdown( + f"ASNs for Domain: {domain}", + asns, + headers=['ASN', 'Description'] + ) + + return CommandResults( + outputs_prefix='SilentPush.DomainASNs', + outputs_key_field='domain', + outputs={ + 'domain': domain, + 'asns': asns + }, + readable_output=readable_output, + raw_response=raw_response + ) + +@metadata_collector.command( + command_name="silentpush-density-lookup", + inputs_list=DENSITY_LOOKUP_INPUTS, + outputs_prefix="SilentPush.DensityLookup", + outputs_list=DENSITY_LOOKUP_OUTPUTS, + description="This command queries granular DNS/IP parameters (e.g., NS servers, MX servers, IPaddresses, ASNs) for density information." +) +def density_lookup_command(client: Client, args: dict) -> CommandResults: + """ + Command function to perform a density lookup on the SilentPush API. + + Args: + client (Client): SilentPush API client. + args (dict): Command arguments containing 'qtype' and 'query', and optionally 'scope'. + + Returns: + CommandResults: Formatted results of the density lookup, including either the density records or an error message. + """ + qtype = args.get('qtype') + query = args.get('query') + + if not qtype or not query: + raise DemistoException("Both 'qtype' and 'query' are required parameters.") + + scope = args.get('scope') + + raw_response = client.density_lookup(qtype=qtype, query=query, scope=scope) + + records = raw_response.get('response', {}).get('records', []) + + readable_output = ( + f"No density records found for {qtype} {query}" + if not records + else tableToMarkdown(f"Density Lookup Results for {qtype} {query}", records, removeNull=True) + ) + + return CommandResults( + outputs_prefix='SilentPush.DensityLookup', + outputs_key_field='query', + outputs={'qtype': qtype, 'query': query, 'records': records}, + readable_output=readable_output, + raw_response=raw_response + ) + +@metadata_collector.command( + command_name="silentpush-search-domains", + inputs_list=SEARCH_DOMAIN_INPUTS, + outputs_prefix="SilentPush.Domain", + outputs_list=SEARCH_DOMAIN_OUTPUTS, + description="This command search for domains with optional filters." +) +def search_domains_command(client: Client, args: dict) -> CommandResults: + """ + Command to search for domains based on various filter parameters. + + Args: + client (Client): The client instance to interact with the external service. + args (dict): Arguments containing filter parameters for domain search. + + Returns: + CommandResults: The results of the domain search, including readable output and raw response. + """ + # Extract arguments + query = args.get('query') + start_date = args.get('start_date') + end_date = args.get('end_date') + risk_score_min = arg_to_number(args.get('risk_score_min')) + risk_score_max = arg_to_number(args.get('risk_score_max')) + limit = arg_to_number(args.get('limit', 100)) + domain_regex = args.get('domain_regex') + name_server = args.get('name_server') + asnum = arg_to_number(args.get('asnum')) + asname = args.get('asname') + min_ip_diversity = arg_to_number(args.get('min_ip_diversity')) + registrar = args.get('registrar') + min_asn_diversity = arg_to_number(args.get('min_asn_diversity')) + certificate_issuer = args.get('certificate_issuer') + whois_date_after = args.get('whois_date_after') + skip = arg_to_number(args.get('skip')) + + # Call the client method to search domains + raw_response = client.search_domains( + query=query, + start_date=start_date, + end_date=end_date, + risk_score_min=risk_score_min, + risk_score_max=risk_score_max, + limit=limit, + domain_regex=domain_regex, + name_server=name_server, + asnum=asnum, + asname=asname, + min_ip_diversity=min_ip_diversity, + registrar=registrar, + min_asn_diversity=min_asn_diversity, + certificate_issuer=certificate_issuer, + whois_date_after=whois_date_after, + skip=skip + ) + + records = raw_response.get('response', {}).get('records', []) + + if not records: + return CommandResults( + readable_output="No domains found.", + raw_response=raw_response, + outputs_prefix='SilentPush.Domain', + outputs_key_field='domain', + outputs=records + ) + + readable_output = tableToMarkdown('Domain Search Results', records) + + return CommandResults( + outputs_prefix='SilentPush.Domain', + outputs_key_field='domain', + outputs=records, + readable_output=readable_output, + raw_response=raw_response + ) + + +def format_tag_clusters(tag_clusters: list) -> str: + """ + Helper function to format the tag clusters output. + + Args: + tag_clusters (list): List of domain tag clusters. + + Returns: + str: Formatted table output for tag clusters. + """ + if not tag_clusters: + return "\n\n**No tag cluster data returned by the API.**" + + cluster_details = [{'Cluster Level': key, 'Details': value} for cluster in tag_clusters for key, value in cluster.items()] + return tableToMarkdown('Domain Tag Clusters', cluster_details) + +@metadata_collector.command( + command_name="silentpush-list-domain-infratags", + inputs_list=DOMAIN_INFRATAGS_INPUTS, + outputs_prefix="SilentPush.InfraTags", + outputs_list=DOMAIN_INFRATAGS_OUTPUTS, + description="This command get infratags for multiple domains with optional clustering." +) +def list_domain_infratags_command(client: Client, args: dict) -> CommandResults: + """ + Command function to retrieve domain infratags with optional cluster details. + + Args: + client (Client): SilentPush API client. + args (dict): Command arguments. + + Returns: + CommandResults: Formatted results of the infratags lookup. + """ + domains = argToList(args.get('domains', '')) + cluster = argToBoolean(args.get('cluster', False)) + mode = args.get('mode', 'live') + match = args.get('match', 'self') + as_of = args.get('as_of', None) + origin_uid = args.get('origin_uid', None) + use_get = argToBoolean(args.get('use_get', False)) + + if not domains and not use_get: + raise ValueError('"domains" argument is required when using POST.') + + + raw_response = client.list_domain_infratags(domains, cluster, mode, match, as_of, origin_uid, use_get) + infratags = raw_response.get('response', {}).get('infratags', []) + tag_clusters = raw_response.get('response', {}).get('tag_clusters', []) + + readable_output = tableToMarkdown('Domain Infratags', infratags) + if cluster: + readable_output += format_tag_clusters(tag_clusters) -''' COMMAND FUNCTIONS ''' + return CommandResults( + outputs_prefix='SilentPush.InfraTags', + outputs_key_field='domain', + outputs=raw_response, + readable_output=readable_output, + raw_response=raw_response + ) -def list_domain_information_command(client: Client, args: dict) -> CommandResults: +@metadata_collector.command( + command_name="silentpush-list-domain-information", + inputs_list=LIST_DOMAIN_INPUTS, + outputs_prefix="SilentPush.Domain", + outputs_list=LIST_DOMAIN_OUTPUTS, + description="This command get domain information along with Silent Push risk score and live whois information for multiple domains." +) +def list_domain_information_command(client: Client, args: Dict[str, Any]) -> CommandResults: """ - Command handler for fetching domain information. + Handle the list-domain-information command execution. + + Args: + client (Client): The client object for making API calls + args (Dict[str, Any]): Command arguments + + Returns: + CommandResults: Results for XSOAR + """ + domains, fetch_risk_score, fetch_whois_info = parse_arguments(args) + response = client.list_domain_information(domains, fetch_risk_score, fetch_whois_info) + markdown = format_domain_information(response, fetch_risk_score, fetch_whois_info) - This function processes the command for 'silentpush-list-domain-information', retrieves the - domain information using the client, and formats it for XSOAR output. + return CommandResults( + outputs_prefix='SilentPush.Domain', + outputs_key_field='domain', + outputs=response.get('domains', []), + readable_output=markdown, + raw_response=response + ) + +def parse_arguments(args: Dict[str, Any]) -> Tuple[List[str], bool, bool]: + """ + Parse and validate command arguments. + + Args: + args (Dict[str, Any]): Command arguments + + Returns: + Tuple[List[str], bool, bool]: Parsed domains, risk score flag, and WHOIS flag + """ + domains_arg = args.get('domains', '') + if not domains_arg: + raise DemistoException('No domains provided') + domains = [domain.strip() for domain in domains_arg.split(',') if domain.strip()] + fetch_risk_score = argToBoolean(args.get('fetch_risk_score', False)) + fetch_whois_info = argToBoolean(args.get('fetch_whois_info', False)) + + return domains, fetch_risk_score, fetch_whois_info + +def format_domain_information(response: Dict[str, Any], fetch_risk_score: bool, fetch_whois_info: bool) -> str: + """ + Format the response data into markdown format. + Args: - client (Client): The client instance to fetch the data. - args (dict): The arguments passed to the command, including the domain. + response (Dict[str, Any]): API response data + fetch_risk_score (bool): Whether to include risk score data + fetch_whois_info (bool): Whether to include WHOIS data + + Returns: + str: Markdown-formatted response + """ + markdown = ['# Domain Information Results\n'] + for domain_data in response.get('domains', []): + domain = domain_data.get('domain', 'N/A') + markdown.append(f'## Domain: {domain}') + + basic_info = { + 'Created Date': domain_data.get('whois_created_date', 'N/A'), + 'Updated Date': domain_data.get('whois_updated_date', 'N/A'), + 'Expiration Date': domain_data.get('whois_expiration_date', 'N/A'), + 'Registrar': domain_data.get('registrar', 'N/A'), + 'Status': domain_data.get('status', 'N/A'), + 'Name Servers': domain_data.get('nameservers', 'N/A') + } + markdown.append(tableToMarkdown('Domain Information', [basic_info])) + + if fetch_risk_score: + risk_info = { + 'Risk Score': domain_data.get('risk_score', 'N/A'), + 'Risk Score Explanation': domain_data.get('risk_score_explanation', 'N/A') + } + markdown.append(tableToMarkdown('Risk Assessment', [risk_info])) + + if fetch_whois_info: + whois_info = domain_data.get('whois_info', {}) + if whois_info and isinstance(whois_info, dict): + if 'error' in whois_info: + markdown.append(f'WHOIS Error: {whois_info["error"]}') + else: + markdown.append(tableToMarkdown('WHOIS Information', [whois_info])) + + markdown.append('\n---\n') + + return '\n'.join(markdown) + + +@metadata_collector.command( + command_name="silentpush-get-domain-certificates", + inputs_list=LIST_DOMAIN_INPUTS, + outputs_prefix="SilentPush.Certificate", + outputs_list=LIST_DOMAIN_OUTPUTS, + description="This command get certificate data collected from domain scanning." +) +def get_domain_certificates_command(client: Client, args: Dict[str, Any]) -> CommandResults: + """ + Retrieves SSL/TLS certificates for a given domain. + + Args: + client (Client): The API client to interact with SilentPush. + args (Dict[str, Any]): Command arguments including: + - domain (str, required): The domain name to search for certificates. + - domain_regex (str, optional): RE2 regex pattern to match domains. + - certificate_issuer (str, optional): Filter certificates by issuer. + - date_min (str, optional): Minimum issuance date (YYYY-MM-DD). + - date_max (str, optional): Maximum issuance date (YYYY-MM-DD). + - prefer (str, optional): Preference parameter for API filtering. + - max_wait (int, optional): Maximum time to wait for results. + - with_metadata (bool, optional): Whether to include metadata. + - skip (int, optional): Number of records to skip. + - limit (int, optional): Maximum number of results to return. + Returns: - CommandResults: The command results containing readable output and the raw response. + CommandResults: The results containing the retrieved certificates. """ - domain = args.get('domain', 'silentpush.com') - demisto.debug(f'Processing domain: {domain}') + domain = args.get('domain') + if not domain: + raise DemistoException("The 'domain' parameter is required.") + + params = filter_none_values({ + 'domain_regex': args.get('domain_regex'), + 'certificate_issuer': args.get('certificate_issuer'), + 'date_min': args.get('date_min'), + 'date_max': args.get('date_max'), + 'prefer': args.get('prefer'), + 'max_wait': arg_to_number(args.get('max_wait')), + 'with_metadata': argToBoolean(args.get('with_metadata')) if 'with_metadata' in args else None, + 'skip': arg_to_number(args.get('skip')), + 'limit': arg_to_number(args.get('limit')) + }) - raw_response = client.list_domain_information(domain) - demisto.debug(f'Response from API: {raw_response}') + raw_response = client.get_domain_certificates(domain, **params) + certificates = raw_response.get('response', {}).get('domain_certificates', []) + metadata = raw_response.get('response', {}).get('metadata', {}) + + if not certificates: + return CommandResults( + readable_output=f"No certificates found for domain: {domain}", + outputs_prefix='SilentPush.Certificate', + outputs_key_field='domain', + outputs={'domain': domain, 'certificates': [], 'metadata': metadata}, + raw_response=raw_response + ) - readable_output = tableToMarkdown('Domain Information', raw_response) + markdown = [f"# SSL/TLS Certificate Information for Domain: {domain}\n"] + for cert in certificates: + cert_info = format_certificate_info(cert, client) + markdown.append(tableToMarkdown('Certificate Information', [cert_info])) return CommandResults( - outputs_prefix='SilentPush.Domain', + outputs_prefix='SilentPush.Certificate', outputs_key_field='domain', - outputs=raw_response, + outputs={'domain': domain, 'certificates': certificates, 'metadata': metadata}, + readable_output='\n'.join(markdown), + raw_response=raw_response + ) + +def format_certificate_info(cert: Dict[str, Any], client: Client) -> Dict[str, str]: + """ + Formats certificate information into a structured dictionary. + + Args: + cert (Dict[str, Any]): Certificate details from the API response. + client (Client): API client used for parsing the subject. + + Returns: + Dict[str, str]: Formatted certificate details. + """ + subject = client.parse_subject(cert.get('subject', {})) + return { + 'Issuer': cert.get('issuer', 'N/A'), + 'Issued On': cert.get('not_before', 'N/A'), + 'Expires On': cert.get('not_after', 'N/A'), + 'Common Name': subject.get('CN', 'N/A'), + 'Subject Alternative Names': ', '.join(cert.get('domains', [])), + 'Serial Number': cert.get('serial_number', 'N/A'), + 'Fingerprint SHA256': cert.get('fingerprint_sha256', 'N/A'), + } + +@metadata_collector.command( + command_name="silentpush-get-enrichment-data", + inputs_list=ENRICHMENT_INPUTS, + outputs_prefix="SilentPush.Enrichment", + outputs_list=ENRICHMENT_OUTPUTS, + description="This command retrieves comprehensive enrichment information for a given resource (domain, IPv4, or IPv6)." +) +def get_enrichment_data_command(client: Client, args: dict) -> CommandResults: + """ + Retrieve enrichment data for a specific resource and value. + + Args: + client (Client): The client object to interact with the enrichment service. + args (dict): Arguments containing the resource type, value, explain flag, and scan_data flag. + + Returns: + CommandResults: The results of the enrichment data retrieval, including readable output and raw response. + """ + # Retrieve arguments + resource = args.get("resource") + value = args.get("value") + explain = argToBoolean(args.get("explain", False)) + scan_data = argToBoolean(args.get("scan_data", False)) + + if not resource or not value: + raise ValueError("Both 'resource' and 'value' arguments are required.") + + if resource in ["ipv4", "ipv6"]: + validate_ip(client, resource, value) + + # Retrieve enrichment data + enrichment_data = client.get_enrichment_data(resource, value, explain, scan_data) + + # Return results based on data availability + if not enrichment_data: + return CommandResults( + readable_output=f"No enrichment data found for resource: {value}", + outputs_prefix="SilentPush.Enrichment", + outputs_key_field="value", + outputs={"value": value, "data": enrichment_data}, + raw_response=enrichment_data + ) + + readable_output = tableToMarkdown(f"Enrichment Data for {value}", enrichment_data, removeNull=True) + + return CommandResults( + outputs_prefix="SilentPush.Enrichment", + outputs_key_field="value", + outputs={"value": value, **enrichment_data}, + readable_output=readable_output, + raw_response=enrichment_data + ) + +def validate_ip(client: Client, resource: str, value: str) -> None: + """ + Validate the IP address based on the resource type. + + Args: + client (Client): The client object to interact with the enrichment service. + resource (str): The resource type (ipv4 or ipv6). + value (str): The IP address to validate. + + Raises: + DemistoException: If the IP address is invalid for the given resource type. + """ + is_valid_ip = client.validate_ip_address(value, allow_ipv6=(resource == "ipv6")) + if not is_valid_ip: + raise DemistoException(f"Invalid {resource.upper()} address: {value}") + +@metadata_collector.command( + command_name="silentpush-list-ip-information", + inputs_list=LIST_IP_INPUTS, + outputs_prefix="SilentPush.IPInformation", + outputs_list=LIST_IP_OUTPUTS, + description="This command get IP information for multiple IPv4s and IPv6s." +) +def list_ip_information_command(client: Client, args: Dict[str, Any]) -> CommandResults: + """ + Command to list IP information for a given set of IP addresses, categorized by IPv4 and IPv6. + + Args: + client (Client): The client instance to interact with the IP data. + args (Dict[str, Any]): Dictionary of command arguments. + + Returns: + CommandResults: Command results containing the IP information. + """ + ips = argToList(args.get("ips", "")) + + if not ips: + return CommandResults( + readable_output="The 'ips' parameter is required.", + outputs_prefix="SilentPush.IPInformation", + outputs_key_field="ip", + outputs=[], + raw_response={"ips": ips}, + ) + + ipv4_addresses, ipv6_addresses = validate_ips(ips, client) + + results = [] + if ipv4_addresses: + results.extend(gather_ip_information(client, ipv4_addresses, resource="ipv4")) + + if ipv6_addresses: + results.extend(gather_ip_information(client, ipv6_addresses, resource="ipv6")) + + if not results: + return CommandResults( + readable_output=f"No information found for IPs: {', '.join(ips)}", + outputs_prefix="SilentPush.IPInformation", + outputs_key_field="ip", + outputs=[], + raw_response={"ips": ips, "results": results}, + ) + + readable_output = tableToMarkdown( + "Comprehensive IP Information", + results, + removeNull=True, + ) + + return CommandResults( + outputs_prefix="SilentPush.IPInformation", + outputs_key_field="ip", + outputs=results, + readable_output=readable_output, + raw_response={"ips": ips, "results": results}, + ) + +def validate_ips(ips: list, client: Client) -> tuple: + """ + Validates and categorizes the IPs into IPv4 and IPv6 addresses. + + Args: + ips (list): List of IPs to validate. + client (Client): The client instance to use for validation. + + Returns: + tuple: A tuple containing two lists: (ipv4_addresses, ipv6_addresses) + """ + ipv4_addresses = [] + ipv6_addresses = [] + + for ip in ips: + if client.validate_ip_address(ip, allow_ipv6=False): # IPv4 + ipv4_addresses.append(ip) + elif client.validate_ip_address(ip, allow_ipv6=True): # IPv6 + ipv6_addresses.append(ip) + + return ipv4_addresses, ipv6_addresses + +def gather_ip_information(client: Client, ip_addresses: list, resource: str) -> list: + """ + Gathers IP information for a given list of IP addresses. + + Args: + client (Client): The client instance to query IP information. + ip_addresses (list): The list of IPs to gather information for. + resource (str): The resource type ('ipv4' or 'ipv6'). + + Returns: + list: A list of IP to ASN information. + """ + ip_info = client.list_ip_information(ip_addresses, resource=resource) + return ip_info.get("response", {}).get("ip2asn", []) + +@metadata_collector.command( + command_name="silentpush-get-asn-reputation", + inputs_list=ASN_REPUTATION_INPUTS, + outputs_prefix="SilentPush.ASNReputation", + outputs_list=ASN_REPUTATION_OUTPUTS, + description="This command retrieve the reputation information for an IPv4." +) +def get_asn_reputation_command(client: Client, args: dict) -> CommandResults: + """ + Command handler for retrieving ASN reputation data. + + Args: + client (Client): The API client instance + args (dict): Command arguments containing: + - asn: ASN number + - limit (optional): Maximum results to return + - explain (optional): Whether to include explanation + + Returns: + CommandResults: Formatted command results for XSOAR + """ + asn = args.get("asn") + limit = arg_to_number(args.get("limit", None)) + explain = argToBoolean(args.get("explain", False)) + + if not asn: + raise ValueError("ASN is required.") + + raw_response = client.get_asn_reputation(asn, limit, explain) + asn_reputation = extract_and_sort_asn_reputation(raw_response, explain) + + if not asn_reputation: + return generate_no_reputation_response(asn, raw_response) + + data_for_table = prepare_asn_reputation_table(asn_reputation, explain) + readable_output = tableToMarkdown(f'ASN Reputation for {asn}', data_for_table, headers=get_table_headers(explain)) + + return CommandResults( + outputs_prefix="SilentPush.ASNReputation", + outputs_key_field="asn", + outputs={'asn': asn, 'reputation_data': asn_reputation}, readable_output=readable_output, raw_response=raw_response ) +def extract_and_sort_asn_reputation(raw_response: dict, explain: bool) -> list: + """ + Extract ASN reputation data and sort by date. -''' MAIN FUNCTION ''' + Args: + raw_response (dict): Raw response data from API. + explain (bool): Whether to include explanations. + + Returns: + list: Sorted ASN reputation data. + """ + response_data = raw_response.get('response', {}) + asn_reputation = response_data.get('asn_reputation') or response_data.get('asn_reputation_history', []) + # Sort by date in descending order + return sorted(asn_reputation, key=lambda x: x.get('date', ''), reverse=True) -def main(): +def generate_no_reputation_response(asn: str, raw_response: dict) -> CommandResults: """ - Main function to initialize the client and process the commands. - - This function parses the parameters, sets up the client, and routes the command to - the appropriate function. - - It handles the setup of authentication, base URL, SSL verification, and proxy configuration. - Also, it routes the `test-module` and `silentpush-list-domain-information` commands to the - corresponding functions. + Generate a response when no ASN reputation data is found. + + Args: + asn (str): The ASN for which data was searched. + raw_response (dict): Raw response data from the API. + + Returns: + CommandResults: The no data response. """ + return CommandResults( + readable_output=f"No reputation data found for ASN {asn}.", + outputs_prefix="SilentPush.ASNReputation", + outputs_key_field="asn", + outputs=[], + raw_response=raw_response + ) + +def prepare_asn_reputation_table(asn_reputation: list, explain: bool) -> list: + """ + Prepare the data for the ASN reputation table. + + Args: + asn_reputation (list): List of ASN reputation entries. + explain (bool): Whether to include explanations in the table. + + Returns: + list: Data formatted for the table. + """ + data_for_table = [] + for entry in asn_reputation: + row = { + 'ASN': entry.get('asn'), + 'Reputation': entry.get('asn_reputation'), + 'ASName': entry.get('asname'), + 'Date': entry.get('date') + } + if explain and entry.get('explanation'): + row['Explanation'] = entry.get('explanation') + data_for_table.append(row) + return data_for_table + +def get_table_headers(explain: bool) -> list: + """ + Get the table headers based on the explain flag. + + Args: + explain (bool): Whether to include explanations in the table. + + Returns: + list: List of table headers. + """ + headers = ['ASN', 'Reputation', 'ASName', 'Date'] + if explain: + headers.append('Explanation') + return headers + + +''' MAIN FUNCTION ''' + + +def main() -> None: + """main function, parses params and runs command functions + + :return: + :rtype: + """ + try: params = demisto.params() api_key = params.get('credentials', {}).get('password') @@ -200,9 +1906,6 @@ def main(): verify_ssl = not params.get('insecure', False) proxy = params.get('proxy', False) - demisto.debug(f'Base URL: {base_url}') - demisto.debug('Initializing client...') - client = Client( base_url=base_url, api_key=api_key, @@ -210,26 +1913,52 @@ def main(): proxy=proxy ) - command = demisto.command() - demisto.debug(f'Command being called is {command}') - - if command == 'test-module': - result = test_module(client) + if demisto.command() == 'test-module': + result = test_module(client, demisto.args()) return_results(result) + + elif demisto.command() == 'silentpush-get-job-status': + return_results(get_job_status_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-get-nameserver-reputation': + return_results(get_nameserver_reputation_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-get-subnet-reputation': + return_results(get_subnet_reputation_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-get-asns-for-domain': + return_results(get_asns_for_domain_command(client, demisto.args())) - elif command == 'silentpush-list-domain-information': + elif demisto.command() == 'silentpush-density-lookup': + return_results(density_lookup_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-search-domains': + return_results(search_domains_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-list-domain-infratags': + return_results(list_domain_infratags_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-list-domain-information': return_results(list_domain_information_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-get-domain-certificates': + return_results(get_domain_certificates_command(client, demisto.args())) - else: - raise DemistoException(f'Unsupported command: {command}') + elif demisto.command() == 'silentpush-get-enrichment-data': + return_results(get_enrichment_data_command(client, demisto.args())) + + elif demisto.command() == 'silentpush-list-ip-information': + return_results(list_ip_information_command(client, demisto.args())) + elif demisto.command() == 'silentpush-get-asn-reputation ': + return_results(get_asn_reputation_command(client, demisto.args())) + except Exception as e: - demisto.error(f'Failed to execute {demisto.command()} command. Error: {str(e)}') - return_error(f'Failed to execute {demisto.command()} command. Error: {str(e)}') + demisto.error(traceback.format_exc()) # print the traceback + return_error(f'Failed to execute {demisto.command()} command.\nError:\n{str(e)}') ''' ENTRY POINT ''' - if __name__ in ('__main__', '__builtin__', 'builtins'): main() diff --git a/Packs/SilentPush/Integrations/SilentPush/SilentPush.yml b/Packs/SilentPush/Integrations/SilentPush/SilentPush.yml index 098cfeef1186..92de99ee19d3 100644 --- a/Packs/SilentPush/Integrations/SilentPush/SilentPush.yml +++ b/Packs/SilentPush/Integrations/SilentPush/SilentPush.yml @@ -1,109 +1,845 @@ +category: Data Enrichment & Threat Intelligence +description: The Silent Push Platform uses first-party data and a proprietary scanning engine to enrich global DNS data with risk and reputation scoring, giving security teams the ability to join the dots across the entire IPv4 and IPv6 range, and identify adversary infrastructure before an attack is launched. The content pack integrates with the Silent Push system to gain insights into domain/IP information, reputations, enrichment, and infratag-related details. It also provides functionality to live-scan URLs and take screenshots of them. Additionally, it allows fetching future attack feeds from the Silent Push system. commonfields: id: SilentPush version: -1 - name: SilentPush +name: SilentPush +display: SilentPush +configuration: +- display: Base URL + name: url + type: 0 + required: true + defaultvalue: https://api.silentpush.com +- display: API Key + name: credentials + type: 9 + required: false +- display: Trust any certificate (not secure) + name: insecure + type: 8 + required: false +- display: Use system proxy settings + name: proxy + type: 8 + required: false +script: + commands: + - deprecated: false + description: This command queries granular DNS/IP parameters (e.g., NS servers, MX servers, IPaddresses, ASNs) for density information. + name: silentpush-density-lookup + arguments: + - name: qtype + isArray: false + description: Query type. + required: true + secret: false + default: false + - name: query + isArray: false + description: Value to query. + required: true + secret: false + default: false + - name: scope + isArray: false + description: Match level (optional). + required: false + secret: false + default: false + outputs: + - contextPath: SilentPush.DensityLookup.density + description: The density value associated with the query result. + type: Number + - contextPath: SilentPush.DensityLookup.nssrv + description: The name server (NS) for the query result. + type: String + - deprecated: false + description: This command retrieve the reputation information for an IPv4. + name: silentpush-get-asn-reputation + arguments: + - name: asn + isArray: false + description: The ASN to lookup. + required: true + secret: false + default: false + - name: explain + isArray: false + description: Show the information used to calculate the reputation score. + required: false + secret: false + default: false + - name: limit + isArray: false + description: The maximum number of reputation history records to retrieve. + required: false + secret: false + default: false + outputs: + - contextPath: SilentPush.ASNReputation.asn + description: The Autonomous System Number (ASN). + type: String + - contextPath: SilentPush.ASNReputation.reputation_data.asn_reputation + description: Reputation score of the ASN. + type: Number + - contextPath: SilentPush.ASNReputation.reputation_data.asname + description: Name of the Autonomous System (AS). + type: String + - contextPath: SilentPush.ASNReputation.reputation_data.date + description: Date the reputation data was recorded (YYYYMMDD). + type: Number + - deprecated: false + description: This command retrieves Autonomous System Numbers (ASNs) associated with a domain. + name: silentpush-get-asns-for-domain + arguments: + - name: domain + isArray: false + description: Domain name to search ASNs for. Retrieves ASNs associated with a records for the specified domain and its subdomains in the last 30 days. + required: true + secret: false + default: false + outputs: + - contextPath: SilentPush.DomainASNs.domain + description: The domain name for which ASNs are retrieved. + type: String + - contextPath: SilentPush.DomainASNs.domain_asns + description: Dictionary of Autonomous System Numbers (ASNs) associated with the domain. + type: Unknown + - deprecated: false + description: This command get certificate data collected from domain scanning. + name: silentpush-get-domain-certificates + arguments: + - name: domains + isArray: false + description: Comma-separated list of domains to query. + required: true + secret: false + default: false + - name: fetch_risk_score + isArray: false + description: Whether to fetch risk scores for the domains. + required: false + secret: false + default: false + - name: fetch_whois_info + isArray: false + description: Whether to fetch WHOIS information for the domains. + required: false + secret: false + default: false + outputs: + - contextPath: SilentPush.Certificate.domain + description: The domain name queried. + type: String + - contextPath: SilentPush.Certificate.last_seen + description: The last seen date of the domain in YYYYMMDD format. + type: Number + - contextPath: SilentPush.Certificate.query + description: The domain name used for the query. + type: String + - contextPath: SilentPush.Certificate.whois_age + description: The age of the domain in days based on WHOIS creation date. + type: Number + - contextPath: SilentPush.Certificate.first_seen + description: The first seen date of the domain in YYYYMMDD format. + type: Number + - contextPath: SilentPush.Certificate.is_new + description: Indicates whether the domain is newly observed. + type: Boolean + - contextPath: SilentPush.Certificate.zone + description: The top-level domain (TLD) or zone of the queried domain. + type: String + - contextPath: SilentPush.Certificate.registrar + description: The registrar responsible for the domain registration. + type: String + - contextPath: SilentPush.Certificate.age_score + description: A risk score based on the domain's age. + type: Number + - contextPath: SilentPush.Certificate.whois_created_date + description: The WHOIS creation date of the domain in YYYY-MM-DD HH:MM:SS format. + type: String + - contextPath: SilentPush.Certificate.is_new_score + description: A risk score indicating how new the domain is. + type: Number + - contextPath: SilentPush.Certificate.age + description: The age of the domain in days. + type: Number + - deprecated: false + description: This command retrieves comprehensive enrichment information for a given resource (domain, IPv4, or IPv6). + name: silentpush-get-enrichment-data + arguments: + - name: resource + isArray: false + description: The resource to query (domain/IP). + required: true + secret: false + default: false + - name: value + isArray: false + description: Type of resource (domain/ipv4/ipv6). + required: true + secret: false + default: false + - name: explain + isArray: false + description: Include explanation of data calculations. + required: false + secret: false + default: false + - name: scan_data + isArray: false + description: Include scan data (IPv4 only). + required: false + secret: false + default: false + outputs: + - contextPath: SilentPush.Enrichment.ip_is_dsl_dynamic + description: Indicates if the IP is DSL dynamic. + type: Boolean + - contextPath: SilentPush.Enrichment.ip_has_expired_certificate + description: Indicates if the IP has an expired certificate. + type: Boolean + - contextPath: SilentPush.Enrichment.subnet_allocation_age + description: Age of the subnet allocation. + type: String + - contextPath: SilentPush.Enrichment.asn_rank_score + description: Score of the ASN rank. + type: Number + - contextPath: SilentPush.Enrichment.asn_allocation_age + description: Age of the ASN allocation. + type: Number + - contextPath: SilentPush.Enrichment.sp_risk_score + description: Risk score for the service provider. + type: Number + - contextPath: SilentPush.Enrichment.ip_reputation_score + description: Reputation score of the IP. + type: Number + - contextPath: SilentPush.Enrichment.ip + description: The IP address. + type: String + - contextPath: SilentPush.Enrichment.density + description: Density value for the IP address. + type: Number + - contextPath: SilentPush.Enrichment.benign_info.actor + description: Actor associated with the benign information. + type: String + - contextPath: SilentPush.Enrichment.benign_info.known_benign + description: Indicates if the resource is known to be benign. + type: Boolean + - contextPath: SilentPush.Enrichment.benign_info.tags + description: Tags associated with the benign information. + type: Unknown + - contextPath: SilentPush.Enrichment.asn_allocation_date + description: Date of ASN allocation in YYYYMMDD format. + type: Number + - contextPath: SilentPush.Enrichment.subnet_allocation_date + description: Date of subnet allocation or UNKNOWN if unavailable. + type: String + - contextPath: SilentPush.Enrichment.asn_takedown_reputation + description: Reputation score for ASN takedown. + type: Number + - contextPath: SilentPush.Enrichment.ip_location.continent_code + description: Continent code where the IP is located. + type: String + - contextPath: SilentPush.Enrichment.ip_location.continent_name + description: Continent name where the IP is located. + type: String + - contextPath: SilentPush.Enrichment.ip_location.country_code + description: Country code of the IP location. + type: String + - contextPath: SilentPush.Enrichment.ip_location.country_is_in_european_union + description: Indicates if the country is in the European Union. + type: Boolean + - contextPath: SilentPush.Enrichment.ip_location.country_name + description: Country name where the IP is located. + type: String + - contextPath: SilentPush.Enrichment.date + description: Date of the record in YYYYMMDD format. + type: Number + - contextPath: SilentPush.Enrichment.subnet_reputation_score + description: Reputation score of the subnet. + type: Number + - contextPath: SilentPush.Enrichment.asn_rank + description: Rank of the ASN. + type: Number + - contextPath: SilentPush.Enrichment.asn_reputation_score + description: Reputation score of the ASN. + type: Number + - contextPath: SilentPush.Enrichment.ip_is_ipfs_node + description: Indicates if the IP is an IPFS node. + type: Boolean + - contextPath: SilentPush.Enrichment.value + description: The value associated with the IP or subnet. + type: String + - contextPath: SilentPush.Enrichment.ip_reputation + description: Reputation score of the IP address. + type: Number + - contextPath: SilentPush.Enrichment.ip_is_dsl_dynamic_score + description: Score indicating if the IP is DSL dynamic. + type: Number + - contextPath: SilentPush.Enrichment.ip_has_open_directory + description: Indicates if the IP has an open directory. + type: Boolean + - contextPath: SilentPush.Enrichment.ip_ptr + description: Pointer record (PTR) of the IP address. + type: String + - contextPath: SilentPush.Enrichment.listing_score + description: Listing score of the IP address or resource. + type: Number + - contextPath: SilentPush.Enrichment.malscore + description: Malware score for the IP address. + type: Number + - contextPath: SilentPush.Enrichment.sinkhole_info.known_sinkhole_ip + description: Indicates if the IP is associated with a known sinkhole. + type: Boolean + - contextPath: SilentPush.Enrichment.sinkhole_info.tags + description: Tags associated with the sinkhole information. + type: Unknown + - contextPath: SilentPush.Enrichment.subnet_reputation + description: Reputation score of the subnet. + type: Number + - contextPath: SilentPush.Enrichment.asn_reputation + description: Reputation score of the ASN. + type: Number + - contextPath: SilentPush.Enrichment.asn + description: Autonomous System Number (ASN) of the IP or subnet. + type: Number + - contextPath: SilentPush.Enrichment.asname + description: Name of the ASN associated with the IP or subnet. + type: String + - contextPath: SilentPush.Enrichment.subnet + description: Subnet associated with the IP address. + type: String + - contextPath: SilentPush.Enrichment.ip_is_tor_exit_node + description: Indicates if the IP is a TOR exit node. + type: Boolean + - contextPath: SilentPush.Enrichment.asn_takedown_reputation_score + description: Reputation score for ASN takedown. + type: Number + - deprecated: false + description: This command retrieve status of running job or results from completed job. + name: silentpush-get-job-status + arguments: + - name: job_id + isArray: false + description: ID of the job returned by Silent Push actions. + required: true + secret: false + default: false + - name: max_wait + isArray: false + description: Number of seconds to wait for results (0-25 seconds). + required: false + secret: false + default: false + - name: result_type + isArray: false + description: Type of result to include in the response. + required: false + secret: false + default: false + outputs: + - contextPath: SilentPush.JobStatus.get + description: URL to retrieve the job status. + type: String + - contextPath: SilentPush.JobStatus.job_id + description: Unique identifier for the job. + type: String + - contextPath: SilentPush.JobStatus.status + description: Current status of the job. + type: String + - deprecated: false + description: This command retrieve historical reputation data for a specified nameserver, including reputation scores and optional detailed calculation information. + name: silentpush-get-nameserver-reputation + arguments: + - name: nameserver + isArray: false + description: Nameserver name for which information needs to be retrieved + required: true + secret: false + default: false + - name: explain + isArray: false + description: Show the information used to calculate the reputation score + required: false + secret: false + default: false + - name: limit + isArray: false + description: The maximum number of reputation history to retrieve + required: false + secret: false + default: false + outputs: + - contextPath: SilentPush.SubnetReputation.date + description: Date of the reputation history entry (in YYYYMMDD format). + type: Number + - contextPath: SilentPush.SubnetReputation.ns_server + description: Name of the nameserver associated with the reputation history entry. + type: String + - contextPath: SilentPush.SubnetReputation.ns_server_reputation + description: Reputation score of the nameserver on the specified date. + type: Number + - contextPath: SilentPush.SubnetReputation.ns_server_reputation_explain + description: Explanation of the reputation score, including domain density and listed domains. + type: Unknown + - contextPath: SilentPush.SubnetReputation.ns_server_domain_density + description: Number of domains associated with the nameserver. + type: Number + - contextPath: SilentPush.SubnetReputation.ns_server_domains_listed + description: Number of domains listed in reputation databases. + type: Number + - deprecated: false + description: This command retrieves the reputation history for a specific subnet. + name: silentpush-get-subnet-reputation + arguments: + - name: subnet + isArray: false + description: IPv4 subnet for which reputation information needs to be retrieved. + required: true + secret: false + default: false + - name: explain + isArray: false + description: Show the detailed information used to calculate the reputation score. + required: false + secret: false + default: false + - name: limit + isArray: false + description: Maximum number of reputation history entries to retrieve. + required: false + secret: false + default: false + outputs: + - contextPath: SilentPush.SubnetReputation.date + description: The date of the subnet reputation record. + type: Number + - contextPath: SilentPush.SubnetReputation.subnet + description: The subnet associated with the reputation record. + type: String + - contextPath: SilentPush.SubnetReputation.subnet_reputation + description: The reputation score of the subnet. + type: Number + - contextPath: SilentPush.SubnetReputation.ips_in_subnet + description: Total number of IPs in the subnet. + type: Number + - contextPath: SilentPush.SubnetReputation.ips_num_active + description: Number of active IPs in the subnet. + type: Number + - contextPath: SilentPush.SubnetReputation.ips_num_listed + description: Number of listed IPs in the subnet. + type: Number + - deprecated: false + description: This command get domain information along with Silent Push risk score and live whois information for multiple domains. + name: silentpush-list-domain-information + arguments: + - name: domains + isArray: false + description: Comma-separated list of domains to query. + required: true + secret: false + default: false + - name: fetch_risk_score + isArray: false + description: Whether to fetch risk scores for the domains. + required: false + secret: false + default: false + - name: fetch_whois_info + isArray: false + description: Whether to fetch WHOIS information for the domains. + required: false + secret: false + default: false + outputs: + - contextPath: SilentPush.Domain.domain + description: The domain name queried. + type: String + - contextPath: SilentPush.Domain.last_seen + description: The last seen date of the domain in YYYYMMDD format. + type: Number + - contextPath: SilentPush.Domain.query + description: The domain name used for the query. + type: String + - contextPath: SilentPush.Domain.whois_age + description: The age of the domain in days based on WHOIS creation date. + type: Number + - contextPath: SilentPush.Domain.first_seen + description: The first seen date of the domain in YYYYMMDD format. + type: Number + - contextPath: SilentPush.Domain.is_new + description: Indicates whether the domain is newly observed. + type: Boolean + - contextPath: SilentPush.Domain.zone + description: The top-level domain (TLD) or zone of the queried domain. + type: String + - contextPath: SilentPush.Domain.registrar + description: The registrar responsible for the domain registration. + type: String + - contextPath: SilentPush.Domain.age_score + description: A risk score based on the domain's age. + type: Number + - contextPath: SilentPush.Domain.whois_created_date + description: The WHOIS creation date of the domain in YYYY-MM-DD HH:MM:SS format. + type: String + - contextPath: SilentPush.Domain.is_new_score + description: A risk score indicating how new the domain is. + type: Number + - contextPath: SilentPush.Domain.age + description: The age of the domain in days. + type: Number + - deprecated: false + description: This command get infratags for multiple domains with optional clustering. + name: silentpush-list-domain-infratags + arguments: + - name: domains + isArray: false + description: Comma-separated list of domains. + required: true + secret: false + default: false + - name: cluster + isArray: false + description: Whether to cluster the results. + required: false + secret: false + default: false + - name: mode + isArray: false + description: Mode for lookup (live/padns). Defaults to "live". + required: false + secret: false + default: false + defaultValue: live + - name: match + isArray: false + description: Handling of self-hosted infrastructure. Defaults to "self". + required: false + secret: false + default: false + defaultValue: self + outputs: + - contextPath: SilentPush.InfraTags.infratags.domain + description: The domain associated with the infratag. + type: String + - contextPath: SilentPush.InfraTags.infratags.mode + description: The mode associated with the domain infratag. + type: String + - contextPath: SilentPush.InfraTags.infratags.tag + description: The tag associated with the domain infratag. + type: String + - contextPath: SilentPush.InfraTags.tag_clusters.25.domains + description: List of domains in the tag cluster with score 25. + type: Unknown + - contextPath: SilentPush.InfraTags.tag_clusters.25.match + description: The match string associated with the domains in the tag cluster with score 25. + type: String + - contextPath: SilentPush.InfraTags.tag_clusters.50.domains + description: List of domains in the tag cluster with score 50. + type: Unknown + - contextPath: SilentPush.InfraTags.tag_clusters.50.match + description: The match string associated with the domains in the tag cluster with score 50. + type: String + - contextPath: SilentPush.InfraTags.tag_clusters.75.domains + description: List of domains in the tag cluster with score 75. + type: Unknown + - contextPath: SilentPush.InfraTags.tag_clusters.75.match + description: The match string associated with the domains in the tag cluster with score 75. + type: String + - contextPath: SilentPush.InfraTags.tag_clusters.100.domains + description: List of domains in the tag cluster with score 100. + type: Unknown + - contextPath: SilentPush.InfraTags.tag_clusters.100.match + description: The match string associated with the domains in the tag cluster with score 100. + type: String + - deprecated: false + description: This command get IP information for multiple IPv4s and IPv6s. + name: silentpush-list-ip-information + arguments: + - name: ips + isArray: false + description: Comma-separated list of IP addresses. + required: true + secret: false + default: false + - name: explain + isArray: false + description: Include explanation of calculations. + required: false + secret: false + default: false + - name: scan_data + isArray: false + description: Include scan data (IPv4 only). + required: false + secret: false + default: false + - name: sparse + isArray: false + description: Specific data to return (asn/asname/sp_risk_score). + required: false + secret: false + default: false + outputs: + - contextPath: SilentPush.IPInformation.ip_is_dsl_dynamic + description: Indicates if the IP is a DSL dynamic IP. + type: Boolean + - contextPath: SilentPush.IPInformation.ip_has_expired_certificate + description: Indicates if the IP has an expired certificate. + type: Boolean + - contextPath: SilentPush.IPInformation.subnet_allocation_age + description: Age of the subnet allocation. + type: String + - contextPath: SilentPush.IPInformation.asn_rank_score + description: Rank score of the ASN. + type: Number + - contextPath: SilentPush.IPInformation.asn_allocation_age + description: Age of the ASN allocation in days. + type: Number + - contextPath: SilentPush.IPInformation.sp_risk_score + description: Risk score of the service provider (SP). + type: Number + - contextPath: SilentPush.IPInformation.asn_takedown_reputation_explain.ips_active + description: Number of active IPs in the ASN takedown reputation. + type: Number + - contextPath: SilentPush.IPInformation.asn_takedown_reputation_explain.ips_in_asn + description: Total number of IPs in the ASN. + type: Number + - contextPath: SilentPush.IPInformation.asn_takedown_reputation_explain.ips_num_listed + description: Number of IPs listed in the ASN takedown reputation. + type: Number + - contextPath: SilentPush.IPInformation.asn_takedown_reputation_explain.items_num_listed + description: Number of items listed in the ASN takedown reputation. + type: Number + - contextPath: SilentPush.IPInformation.asn_takedown_reputation_explain.lifetime_avg + description: Average lifetime of items in the ASN takedown reputation. + type: Number + - contextPath: SilentPush.IPInformation.asn_takedown_reputation_explain.lifetime_max + description: Maximum lifetime of items in the ASN takedown reputation. + type: Number + - contextPath: SilentPush.IPInformation.asn_takedown_reputation_explain.lifetime_total + description: Total lifetime of items in the ASN takedown reputation. + type: Number + - contextPath: SilentPush.IPInformation.ip_reputation_score + description: Reputation score of the IP. + type: Number + - contextPath: SilentPush.IPInformation.listing_score_feeds_explain + description: Explanation of the listing score feeds. + type: String + - contextPath: SilentPush.IPInformation.ip + description: The IP address being evaluated. + type: String + - contextPath: SilentPush.IPInformation.density + description: Density score of the IP. + type: Number + - contextPath: SilentPush.IPInformation.benign_info.actor + description: Actor associated with the benign info. + type: String + - contextPath: SilentPush.IPInformation.benign_info.known_benign + description: Indicates if the IP is known benign. + type: Boolean + - contextPath: SilentPush.IPInformation.benign_info.tags + description: Tags associated with the benign info. + type: String + - contextPath: SilentPush.IPInformation.ip_reputation_explain + description: Explanation of the IP reputation. + type: String + - contextPath: SilentPush.IPInformation.asn_allocation_date + description: The ASN allocation date. + type: Number + - contextPath: SilentPush.IPInformation.subnet_allocation_date + description: The subnet allocation date. + type: String + - contextPath: SilentPush.IPInformation.asn_takedown_reputation + description: Reputation score of ASN takedown. + type: Number + - contextPath: SilentPush.IPInformation.ip_location.continent_code + description: Continent code of the IP location. + type: String + - contextPath: SilentPush.IPInformation.ip_location.continent_name + description: Continent name of the IP location. + type: String + - contextPath: SilentPush.IPInformation.ip_location.country_code + description: Country code of the IP location. + type: String + - contextPath: SilentPush.IPInformation.ip_location.country_is_in_european_union + description: Indicates if the country is in the European Union. + type: Boolean + - contextPath: SilentPush.IPInformation.ip_location.country_name + description: Country name of the IP location. + type: String + - contextPath: SilentPush.IPInformation.date + description: Date associated with the IP data. + type: Number + - contextPath: SilentPush.IPInformation.subnet_reputation_score + description: Reputation score of the subnet. + type: Number + - contextPath: SilentPush.IPInformation.asn_rank + description: Rank of the ASN. + type: Number + - contextPath: SilentPush.IPInformation.listing_score_explain + description: Explanation of the listing score. + type: String + - contextPath: SilentPush.IPInformation.asn_reputation_score + description: Reputation score of the ASN. + type: Number + - contextPath: SilentPush.IPInformation.ip_is_ipfs_node + description: Indicates if the IP is an IPFS node. + type: Boolean + - contextPath: SilentPush.IPInformation.ip_reputation + description: Reputation score of the IP. + type: Number + - contextPath: SilentPush.IPInformation.subnet_reputation_explain + description: Explanation of the subnet reputation. + type: String + - contextPath: SilentPush.IPInformation.ip_is_dsl_dynamic_score + description: Score indicating if the IP is a DSL dynamic IP. + type: Number + - contextPath: SilentPush.IPInformation.asn_reputation_explain + description: Explanation of the ASN reputation. + type: String + - contextPath: SilentPush.IPInformation.ip_has_open_directory + description: Indicates if the IP has an open directory. + type: Boolean + - contextPath: SilentPush.IPInformation.ip_ptr + description: Pointer (PTR) record for the IP. + type: String + - contextPath: SilentPush.IPInformation.listing_score + description: Listing score of the IP. + type: Number + - contextPath: SilentPush.IPInformation.malscore + description: Malware score associated with the IP. + type: Number + - contextPath: SilentPush.IPInformation.sinkhole_info.known_sinkhole_ip + description: Indicates if the IP is a known sinkhole IP. + type: Boolean + - contextPath: SilentPush.IPInformation.sinkhole_info.tags + description: Tags associated with the sinkhole information. + type: String + - contextPath: SilentPush.IPInformation.subnet_reputation + description: Reputation score of the subnet. + type: Number + - contextPath: SilentPush.IPInformation.asn_reputation + description: Reputation score of the ASN. + type: Number + - contextPath: SilentPush.IPInformation.asn + description: Autonomous System Number (ASN) of the IP. + type: Number + - contextPath: SilentPush.IPInformation.sp_risk_score_explain.sp_risk_score_decider + description: Decider for the service provider risk score. + type: String + - contextPath: SilentPush.IPInformation.asname + description: Name of the ASN. + type: String + - contextPath: SilentPush.IPInformation.subnet + description: The subnet the IP belongs to. + type: String + - contextPath: SilentPush.IPInformation.ip_is_tor_exit_node + description: Indicates if the IP is a TOR exit node. + type: Boolean + - contextPath: SilentPush.IPInformation.asn_takedown_reputation_score + description: Reputation score of ASN takedown. + type: Number + - deprecated: false + description: This command search for domains with optional filters. + name: silentpush-search-domains + arguments: + - name: domain + isArray: false + description: Name or wildcard pattern of domain names to search for. + required: false + secret: false + default: false + - name: domain_regex + isArray: false + description: A valid RE2 regex pattern to match domains. Overrides the domain argument. + required: false + secret: false + default: false + - name: name_server + isArray: false + description: Name server name or wildcard pattern of the name server used by domains. + required: false + secret: false + default: false + - name: asnum + isArray: false + description: Autonomous System (AS) number to filter domains. + required: false + secret: false + default: false + - name: asname + isArray: false + description: Search for all AS numbers where the AS Name begins with the specified value. + required: false + secret: false + default: false + - name: min_ip_diversity + isArray: false + description: Minimum IP diversity limit to filter domains. + required: false + secret: false + default: false + - name: registrar + isArray: false + description: Name or partial name of the registrar used to register domains. + required: false + secret: false + default: false + - name: min_asn_diversity + isArray: false + description: Minimum ASN diversity limit to filter domains. + required: false + secret: false + default: false + - name: certificate_issuer + isArray: false + description: Filter domains that had SSL certificates issued by the specified certificate issuer. Wildcards supported. + required: false + secret: false + default: false + - name: whois_date_after + isArray: false + description: Filter domains with a WHOIS creation date after this date (YYYY-MM-DD). + required: false + secret: false + default: false + - name: skip + isArray: false + description: Number of results to skip in the search query. + required: false + secret: false + default: false + - name: limit + isArray: false + description: Number of results to return. Defaults to the SilentPush API's behavior. + required: false + secret: false + default: false + outputs: + - contextPath: SilentPush.Domain.asn_diversity + description: The diversity of Autonomous System Numbers (ASNs) associated with the domain. + type: Number + - contextPath: SilentPush.Domain.host + description: The domain name (host) associated with the record. + type: String + - contextPath: SilentPush.Domain.ip_diversity_all + description: The total number of unique IPs associated with the domain. + type: Number + - contextPath: SilentPush.Domain.ip_diversity_groups + description: The number of unique IP groups associated with the domain. + type: Number + script: '-' type: python - subType: python3 - description: | - This integration allows fetching domain information from the SilentPush API. It includes commands to get domain-related information such as WHOIS data, domain age, and risk scores. - tags: [] - enabled: true - manufacturer: SilentPush - comment: '' - minVersion: -1 - dependencies: - - CommonServerPython - - CommonServerUserPython - -scripts: - - path: SilentPush.py - comment: | - Integration for SilentPush that enables fetching domain information, including WHOIS data, domain age, and risk scores. - -commands: - - name: test-module - description: | - Tests the connectivity to the SilentPush API and checks the authentication status. - isArray: false - argContext: - - id: base_url - type: string - description: The base URL for the SilentPush API. - - id: api_key - type: string - description: The API key used to authenticate requests. - - id: verify_ssl - type: boolean - description: Flag to determine whether SSL verification is enabled. - examples: | - !test-module - - - name: silentpush-list-domain-information - description: | - Fetches domain information, such as WHOIS data, domain age, and risk scores. - isArray: false - argContext: - - id: domain - type: string - description: The domain name to fetch information for. - examples: | - !silentpush-list-domain-information domain=example.com - -args: - - id: domain - isArray: false - description: | - The domain to fetch information for. - type: string - -outputs: - - id: SilentPush.Domain - type: complex - description: | - The domain information fetched from SilentPush API, including WHOIS data, domain age, and risk scores. - contents: - - name: domain - type: string - - name: whois_data - type: string - - name: domain_age - type: integer - - name: risk_score - type: float - + subtype: python3 + dockerimage: demisto/python3:3.11.10.116949 + feed: false + isfetch: false + runonce: false + longRunning: false + longRunningPort: false +fromversion: 5.0.0 tests: - - name: Test SilentPush Integration - description: Test the integration with the SilentPush API. - steps: - - script: test-module - name: Test SilentPush API Connectivity - args: - base_url: https://api.silentpush.com - api_key: 'your_api_key' - -# Optional: Adding the configuration section for any configuration-related parameters -configurations: - - default: true - isArray: false - description: The configuration parameters required for connecting to SilentPush API. - context: - - id: base_url - type: string - description: The base URL for the SilentPush API. - - id: api_key - type: string - description: The API key used to authenticate requests. - - id: verify_ssl - type: boolean - description: Flag to determine whether SSL verification is enabled. - - id: proxy - type: boolean - description: Flag to determine whether to use a proxy. - -errorHandling: - - errorCode: 403 - description: | - If an authorization error is encountered, it could indicate an incorrect or expired API key. - - errorCode: 400 - description: | - Bad Request error, likely due to incorrect input format or invalid parameters in the request. +- No tests diff --git a/Packs/SilentPush/Integrations/SilentPush/SilentPush_test.py b/Packs/SilentPush/Integrations/SilentPush/SilentPush_test.py index d957c5edf2a0..c42f4e3a8b39 100644 --- a/Packs/SilentPush/Integrations/SilentPush/SilentPush_test.py +++ b/Packs/SilentPush/Integrations/SilentPush/SilentPush_test.py @@ -10,11 +10,13 @@ you are implementing with your integration """ +from demisto_sdk.commands.common.handlers import JSON_Handler + import json def util_load_json(path): - with open(path, encoding='utf-8') as f: + with open(path, encoding="utf-8") as f: return json.loads(f.read()) @@ -29,13 +31,11 @@ def test_baseintegration_dummy(): """ from BaseIntegration import Client, baseintegration_dummy_command - client = Client(base_url='some_mock_url', verify=False) - args = { - 'dummy': 'this is a dummy response' - } + client = Client(base_url="some_mock_url", verify=False) + args = {"dummy": "this is a dummy response", "dummy2": "a dummy value"} response = baseintegration_dummy_command(client, args) - mock_response = util_load_json('test_data/baseintegration-dummy.json') + assert response.outputs == args + - assert response.outputs == mock_response # TODO: ADD HERE unit tests for every command diff --git a/Packs/SilentPush/Integrations/SilentPush/test_data/baseintegration-dummy.json b/Packs/SilentPush/Integrations/SilentPush/test_data/baseintegration-dummy.json deleted file mode 100644 index 37fa47b18cd0..000000000000 --- a/Packs/SilentPush/Integrations/SilentPush/test_data/baseintegration-dummy.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "dummy": "this is a dummy response" -} \ No newline at end of file diff --git a/Packs/SilentPush/pack_metadata.json b/Packs/SilentPush/pack_metadata.json index b5cd72cb3e81..8cfd8027eea4 100644 --- a/Packs/SilentPush/pack_metadata.json +++ b/Packs/SilentPush/pack_metadata.json @@ -1,6 +1,6 @@ { "name": "Silent Push", - "description": "The Silent Push platform exposes Indicators of Future Attack (IOFA) by applying unique behavioral fingerprints to attacker activity. By searching our dataset, security teams can identify new impending attacks, rather than relying on known IOCs.", + "description": "SilentPush integration for domain and IP intelligence\u001b[C", "support": "partner", "currentVersion": "1.0.0", "author": "Silent Push", @@ -9,10 +9,7 @@ "categories": [ "Data Enrichment & Threat Intelligence" ], - "tags": [ - "IoC", - "IoFA" - ], + "tags": [], "useCases": [], "keywords": [], "marketplaces": [