-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Silentpush-final-code #20
base: get-domain-command
Are you sure you want to change the base?
Conversation
@@ -67,7 +67,10 @@ def _http_request(self, method: str, url_suffix: str, params: dict = None, data: | |||
Raises: | |||
DemistoException: If there's an error during the API call. | |||
""" | |||
full_url = f'{self.base_url}{url_suffix}' | |||
if url_suffix == "/api/v2/iocs/threat-ranking": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of checking like this let's add :-
base_url = demisto.params().get('url', 'https://api.silentpush.com') if url_suffix.startswith("/api/v2/") else self.base_url
full_url = f'{base_url}{url_suffix}'
full_url = demisto.params().get('url', 'https://api.silentpush.com') + url_suffix | ||
else: | ||
full_url = f'{self.base_url}{url_suffix}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the above logic and remove these lines.
@@ -83,7 +86,7 @@ def _http_request(self, method: str, url_suffix: str, params: dict = None, data: | |||
return response.text | |||
except Exception as e: | |||
raise DemistoException(f'Error in API call: {str(e)}') | |||
|
|||
def parse_subject(self,subject: Any) -> Dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this parse_subject() function remove redundant elif, first if returns early, the next check doesn’t need elif; a simple if is cleaner.
|
||
def validate_ip_address(ip: str, allow_ipv6: bool = True) -> bool: | ||
def validate_ip_address(self, ip: str, allow_ipv6: bool = True) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a seperate class for these type of functions. Name it as a Helper class and add this function in that.
Also create a single function for both ipv4 and ipv6 to validate them. Use is_ipv6_valid() and is_ip_valid() for the validation check. Refer to this doc
|
||
def test_module(client: Client) -> str: | ||
try: | ||
client.list_domain_information('silentpush.com') | ||
resp = client.search_domains() | ||
if resp.get("status_code") != 200: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure response is a dictionary and check status code
if resp.get("status_code") != 200: | |
if not isinstance(resp, dict) or resp.get("status_code") != 200: | |
for domain_data in response.get('domains', []): | ||
domain = domain_data.get('domain', 'N/A') | ||
markdown.append(f'## Domain: {domain}') | ||
|
||
basic_info = { | ||
'Created Date': domain_info.get('whois_created_date', 'N/A'), | ||
'Updated Date': domain_info.get('whois_updated_date', 'N/A'), | ||
'Expiration Date': domain_info.get('whois_expiration_date', 'N/A'), | ||
'Registrar': domain_info.get('registrar', 'N/A'), | ||
'Status': domain_info.get('status', 'N/A'), | ||
'Name Servers': domain_info.get('nameservers', 'N/A') | ||
'Created Date': domain_data.get('whois_created_date', 'N/A'), | ||
'Updated Date': domain_data.get('whois_updated_date', 'N/A'), | ||
'Expiration Date': domain_data.get('whois_expiration_date', 'N/A'), | ||
'Registrar': domain_data.get('registrar', 'N/A'), | ||
'Status': domain_data.get('status', 'N/A'), | ||
'Name Servers': domain_data.get('nameservers', 'N/A') | ||
} | ||
markdown.append(tableToMarkdown('Domain Information', [basic_info])) | ||
|
||
if fetch_risk_score: | ||
risk_info = { | ||
'Risk Score': domain_info.get('sp_risk_score', 'N/A'), | ||
'Risk Score Explanation': domain_info.get('sp_risk_score_explain', 'N/A') | ||
'Risk Score': domain_data.get('risk_score', 'N/A'), | ||
'Risk Score Explanation': domain_data.get('risk_score_explanation', 'N/A') | ||
} | ||
markdown.append(tableToMarkdown('Risk Assessment', [risk_info])) | ||
|
||
if fetch_whois_info and domain_info.get('whois_info') != 'N/A': | ||
whois_info = domain_info.get('whois_info', {}) | ||
if isinstance(whois_info, dict): | ||
whois_data = { | ||
'Registrant Name': whois_info.get('registrant_name', 'N/A'), | ||
'Registrant Organization': whois_info.get('registrant_organization', 'N/A'), | ||
'Registrant Email': whois_info.get('registrant_email', 'N/A'), | ||
'Admin Email': whois_info.get('admin_email', 'N/A'), | ||
'Tech Email': whois_info.get('tech_email', 'N/A') | ||
} | ||
markdown.append(tableToMarkdown('WHOIS Information', [whois_data])) | ||
if fetch_whois_info: | ||
whois_info = domain_data.get('whois_info', {}) | ||
if whois_info and isinstance(whois_info, dict): | ||
if 'error' in whois_info: | ||
markdown.append(f'WHOIS Error: {whois_info["error"]}') | ||
else: | ||
markdown.append(tableToMarkdown('WHOIS Information', [whois_info])) | ||
|
||
markdown.append('\n---\n') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a separate function to format domain information into markdown tables.
def _format_domain_info(domains_data: List[Dict[str, Any]], fetch_risk_score: bool, fetch_whois_info: bool) -> str:
"""Formats domain information into markdown tables."""
markdown = ["# Domain Information Results\n"]
for domain_data in domains_data:
domain = domain_data.get("domain", "N/A")
markdown.append(f"## Domain: {domain}")
basic_info = {
"Created Date": domain_data.get("whois_created_date", "N/A"),
"Updated Date": domain_data.get("whois_updated_date", "N/A"),
"Expiration Date": domain_data.get("whois_expiration_date", "N/A"),
"Registrar": domain_data.get("registrar", "N/A"),
"Status": domain_data.get("status", "N/A"),
"Name Servers": domain_data.get("nameservers", "N/A"),
}
markdown.append(tableToMarkdown("Domain Information", [basic_info]))
if fetch_risk_score:
risk_info = {
"Risk Score": domain_data.get("risk_score", "N/A"),
"Risk Score Explanation": domain_data.get("risk_score_explanation", "N/A"),
}
markdown.append(tableToMarkdown("Risk Assessment", [risk_info]))
if fetch_whois_info:
whois_info = domain_data.get("whois_info", {})
if whois_info:
if "error" in whois_info:
markdown.append(f"**WHOIS Error:** {whois_info['error']}")
else:
markdown.append(tableToMarkdown("WHOIS Information", [whois_info]))
markdown.append("\n---\n")
return "\n".join(markdown)
if not domains_arg: | ||
raise DemistoException('No domains provided') | ||
|
||
domains = [domain.strip() for domain in domains_arg.split(',') if domain.strip()] | ||
fetch_risk_score = argToBoolean(args.get('fetch_risk_score', False)) | ||
fetch_whois_info = argToBoolean(args.get('fetch_whois_info', False)) | ||
|
||
raw_response = client.list_domain_information(domains, fetch_risk_score, fetch_whois_info) | ||
response = client.list_domain_information(domains, fetch_risk_score, fetch_whois_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
response = client.list_domain_information(domains, fetch_risk_score, fetch_whois_info) | |
response = client.list_domain_information(domains, fetch_risk_score, fetch_whois_info) | |
markdown = _format_domain_info(response.get("domains", []), fetch_risk_score, fetch_whois_info) |
infratags = raw_response.get('response', {}).get('infratags', []) | ||
tag_clusters = raw_response.get('response', {}).get('tag_clusters', []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
infratags = raw_response.get('response', {}).get('infratags', []) | |
tag_clusters = raw_response.get('response', {}).get('tag_clusters', []) | |
iresponse_data = raw_response.get('response', {}) | |
infratags = response_data.get('infratags', []) | |
tag_clusters = response_data.get('tag_clusters', []) |
if cluster and tag_clusters: | ||
cluster_details = [] | ||
for cluster in tag_clusters: | ||
for key, value in cluster.items(): | ||
cluster_details.append({'Cluster Level': key, 'Details': value}) | ||
|
||
readable_output += tableToMarkdown('Domain Tag Clusters', cluster_details) | ||
|
||
if cluster and not tag_clusters: | ||
readable_output += "\n\n**No tag cluster data returned by the API.**" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if cluster and tag_clusters: | |
cluster_details = [] | |
for cluster in tag_clusters: | |
for key, value in cluster.items(): | |
cluster_details.append({'Cluster Level': key, 'Details': value}) | |
readable_output += tableToMarkdown('Domain Tag Clusters', cluster_details) | |
if cluster and not tag_clusters: | |
readable_output += "\n\n**No tag cluster data returned by the API.**" | |
if cluster: | |
if tag_clusters: | |
cluster_details = [ | |
{'Cluster Level': key, 'Details': value} | |
for cluster in tag_clusters | |
for key, value in cluster.items() | |
] | |
readable_output += tableToMarkdown('Domain Tag Clusters', cluster_details) | |
else: | |
readable_output += "\n\n**No tag cluster data returned by the API.**" |
Silentpush-final-code