diff --git a/functions/notify_slack.py b/functions/notify_slack.py index 66929a9..1ad4fb3 100644 --- a/functions/notify_slack.py +++ b/functions/notify_slack.py @@ -28,6 +28,7 @@ SECURITY_HUB_CLIENT = boto3.client('securityhub', region_name=REGION) + class AwsService(Enum): """AWS service supported by function""" @@ -141,21 +142,18 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A compliance_status = finding["Compliance"].get("Status", "UNKNOWN") workflow_status = finding["Workflow"].get("Status", "UNKNOWN") if compliance_status == "FAILED" and workflow_status == "NEW": - notified = SECURITY_HUB_CLIENT.batch_update_findings( - FindingIdentifiers=[{ - 'Id': finding.get('Id'), - 'ProductArn': finding.get("ProductArn") - }], - Workflow={"Status": "NOTIFIED"} - ) - logging.warning(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}") + notified = SECURITY_HUB_CLIENT.batch_update_findings( + FindingIdentifiers=[{ + 'Id': finding.get('Id'), + 'ProductArn': finding.get("ProductArn") + }], + Workflow={"Status": "NOTIFIED"} + ) + logging.warning(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}") except Exception as e: logging.error(f"Failed to update finding status: {str(e)}") pass - - - if finding.get("ProductName") == "Inspector": severity = finding["Severity"].get("Label", "INFORMATIONAL") compliance_status = finding["Compliance"].get("Status", "UNKNOWN") @@ -178,7 +176,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value if compliance_status == "PASSED": - color = "#4BB543" + color = "#4BB543" slack_message = { "color": color, @@ -225,7 +223,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value if compliance_status == "PASSED": - color = "#4BB543" + color = "#4BB543" slack_message = { "color": color, @@ -250,9 +248,9 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A return slack_message - return format_default(message=message) + class SecurityHubSeverity(Enum): """Maps Security Hub finding severity to Slack message format color""" @@ -269,6 +267,7 @@ def get(name, default): except KeyError: return default + class GuardDutyFindingSeverity(Enum): """Maps GuardDuty finding severity to Slack message format color""" @@ -276,6 +275,7 @@ class GuardDutyFindingSeverity(Enum): Medium = "warning" High = "danger" + def format_guardduty_finding(message: Dict[str, Any], region: str) -> Dict[str, Any]: """ Format GuardDuty finding event into Slack message format