|
26 | 26 | # Create client so its cached/frozen between invocations
|
27 | 27 | KMS_CLIENT = boto3.client("kms", region_name=REGION)
|
28 | 28 |
|
| 29 | +SECURITY_HUB_CLIENT = boto3.client('securityhub', region_name=REGION) |
| 30 | + |
29 | 31 |
|
30 | 32 | class AwsService(Enum):
|
31 | 33 | """AWS service supported by function"""
|
32 | 34 |
|
33 | 35 | cloudwatch = "cloudwatch"
|
34 | 36 | guardduty = "guardduty"
|
| 37 | + securityhub = "securityhub" |
35 | 38 |
|
36 | 39 |
|
37 | 40 | def decrypt_url(encrypted_url: str) -> str:
|
@@ -123,6 +126,148 @@ def format_cloudwatch_alarm(message: Dict[str, Any], region: str) -> Dict[str, A
|
123 | 126 | }
|
124 | 127 |
|
125 | 128 |
|
| 129 | +def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, Any]: |
| 130 | + """ |
| 131 | + Format AWS Security Hub finding event into Slack message format |
| 132 | +
|
| 133 | + :params message: SNS message body containing SecurityHub finding event |
| 134 | + :params region: AWS region where the event originated from |
| 135 | + :returns: formatted Slack message payload |
| 136 | + """ |
| 137 | + service_url = get_service_url(region=region, service="securityhub") |
| 138 | + finding = message["detail"]["findings"][0] |
| 139 | + |
| 140 | + # Switch Status From New To Notified To Prevent Repeated Messages |
| 141 | + try: |
| 142 | + compliance_status = finding["Compliance"].get("Status", "UNKNOWN") |
| 143 | + workflow_status = finding["Workflow"].get("Status", "UNKNOWN") |
| 144 | + if compliance_status == "FAILED" and workflow_status == "NEW": |
| 145 | + notified = SECURITY_HUB_CLIENT.batch_update_findings( |
| 146 | + FindingIdentifiers=[{ |
| 147 | + 'Id': finding.get('Id'), |
| 148 | + 'ProductArn': finding.get("ProductArn") |
| 149 | + }], |
| 150 | + Workflow={"Status": "NOTIFIED"} |
| 151 | + ) |
| 152 | + logging.warning(f"Successfully updated finding status to NOTIFIED: {json.dumps(notified)}") |
| 153 | + except Exception as e: |
| 154 | + logging.error(f"Failed to update finding status: {str(e)}") |
| 155 | + pass |
| 156 | + |
| 157 | + if finding.get("ProductName") == "Inspector": |
| 158 | + severity = finding["Severity"].get("Label", "INFORMATIONAL") |
| 159 | + compliance_status = finding["Compliance"].get("Status", "UNKNOWN") |
| 160 | + |
| 161 | + Id = finding.get("Id", "No ID Provided") |
| 162 | + title = finding.get("Title", "No Title Provided") |
| 163 | + description = finding.get("Description", "No Description Provided") |
| 164 | + control_id = finding['ProductFields'].get('ControlId', 'N/A') |
| 165 | + control_url = service_url + f"#/controls/{control_id}" |
| 166 | + aws_account_id = finding.get('AwsAccountId', 'Unknown Account') |
| 167 | + first_observed = finding.get('FirstObservedAt', 'Unknown Date') |
| 168 | + last_updated = finding.get('UpdatedAt', 'Unknown Date') |
| 169 | + affected_resource = finding['Resources'][0].get('Id', 'Unknown Resource') |
| 170 | + remediation_url = finding.get("Remediation", {}).get("Recommendation", {}).get("Url", "#") |
| 171 | + |
| 172 | + finding_base_path = "#/findings?search=Id%3D%255Coperator%255C%253AEQUALS%255C%253A" |
| 173 | + double_encoded_id = urllib.parse.quote(urllib.parse.quote(Id, safe=''), safe='') |
| 174 | + finding_url = f"{service_url}{finding_base_path}{double_encoded_id}" |
| 175 | + generator_id = finding.get("GeneratorId", "Unknown Generator") |
| 176 | + |
| 177 | + color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value |
| 178 | + if compliance_status == "PASSED": |
| 179 | + color = "#4BB543" |
| 180 | + |
| 181 | + slack_message = { |
| 182 | + "color": color, |
| 183 | + "fallback": f"Inspector Finding: {title}", |
| 184 | + "fields": [ |
| 185 | + {"title": "Title", "value": f"`{title}`", "short": False}, |
| 186 | + {"title": "Description", "value": f"`{description}`", "short": False}, |
| 187 | + {"title": "Compliance Status", "value": f"`{compliance_status}`", "short": True}, |
| 188 | + {"title": "Severity", "value": f"`{severity}`", "short": True}, |
| 189 | + {"title": "Control ID", "value": f"`{control_id}`", "short": True}, |
| 190 | + {"title": "Account ID", "value": f"`{aws_account_id}`", "short": True}, |
| 191 | + {"title": "First Observed", "value": f"`{first_observed}`", "short": True}, |
| 192 | + {"title": "Last Updated", "value": f"`{last_updated}`", "short": True}, |
| 193 | + {"title": "Affected Resource", "value": f"`{affected_resource}`", "short": False}, |
| 194 | + {"title": "Generator", "value": f"`{generator_id}`", "short": False}, |
| 195 | + {"title": "Control Url", "value": f"`{control_url}`", "short": False}, |
| 196 | + {"title": "Finding Url", "value": f"`{finding_url}`", "short": False}, |
| 197 | + {"title": "Remediation", "value": f"`{remediation_url}`", "short": False}, |
| 198 | + ], |
| 199 | + "text": f"AWS Inspector Finding - {title}", |
| 200 | + } |
| 201 | + |
| 202 | + return slack_message |
| 203 | + |
| 204 | + if finding.get("ProductName") == "Security Hub": |
| 205 | + severity = finding["Severity"].get("Label", "INFORMATIONAL") |
| 206 | + compliance_status = finding["Compliance"].get("Status", "UNKNOWN") |
| 207 | + |
| 208 | + Id = finding.get("Id", "No ID Provided") |
| 209 | + title = finding.get("Title", "No Title Provided") |
| 210 | + description = finding.get("Description", "No Description Provided") |
| 211 | + control_id = finding['ProductFields'].get('ControlId', 'N/A') |
| 212 | + control_url = service_url + f"#/controls/{control_id}" |
| 213 | + aws_account_id = finding.get('AwsAccountId', 'Unknown Account') |
| 214 | + first_observed = finding.get('FirstObservedAt', 'Unknown Date') |
| 215 | + last_updated = finding.get('UpdatedAt', 'Unknown Date') |
| 216 | + affected_resource = finding['Resources'][0].get('Id', 'Unknown Resource') |
| 217 | + remediation_url = finding.get("Remediation", {}).get("Recommendation", {}).get("Url", "#") |
| 218 | + generator_id = finding.get("GeneratorId", "Unknown Generator") |
| 219 | + |
| 220 | + finding_base_path = "#/findings?search=Id%3D%255Coperator%255C%253AEQUALS%255C%253A" |
| 221 | + double_encoded_id = urllib.parse.quote(urllib.parse.quote(Id, safe=''), safe='') |
| 222 | + finding_url = f"{service_url}{finding_base_path}{double_encoded_id}" |
| 223 | + |
| 224 | + color = SecurityHubSeverity.get(severity.upper(), SecurityHubSeverity.INFORMATIONAL).value |
| 225 | + if compliance_status == "PASSED": |
| 226 | + color = "#4BB543" |
| 227 | + |
| 228 | + slack_message = { |
| 229 | + "color": color, |
| 230 | + "fallback": f"Security Hub Finding: {title}", |
| 231 | + "fields": [ |
| 232 | + {"title": "Title", "value": f"`{title}`", "short": False}, |
| 233 | + {"title": "Description", "value": f"`{description}`", "short": False}, |
| 234 | + {"title": "Compliance Status", "value": f"`{compliance_status}`", "short": True}, |
| 235 | + {"title": "Severity", "value": f"`{severity}`", "short": True}, |
| 236 | + {"title": "Control ID", "value": f"`{control_id}`", "short": True}, |
| 237 | + {"title": "Account ID", "value": f"`{aws_account_id}`", "short": True}, |
| 238 | + {"title": "First Observed", "value": f"`{first_observed}`", "short": True}, |
| 239 | + {"title": "Last Updated", "value": f"`{last_updated}`", "short": True}, |
| 240 | + {"title": "Affected Resource", "value": f"`{affected_resource}`", "short": False}, |
| 241 | + {"title": "Generator", "value": f"`{generator_id}`", "short": False}, |
| 242 | + {"title": "Control Url", "value": f"`{control_url}`", "short": False}, |
| 243 | + {"title": "Finding Url", "value": f"`{finding_url}`", "short": False}, |
| 244 | + {"title": "Remediation", "value": f"`{remediation_url}`", "short": False}, |
| 245 | + ], |
| 246 | + "text": f"AWS Security Hub Finding - {title}", |
| 247 | + } |
| 248 | + |
| 249 | + return slack_message |
| 250 | + |
| 251 | + return format_default(message=message) |
| 252 | + |
| 253 | + |
| 254 | +class SecurityHubSeverity(Enum): |
| 255 | + """Maps Security Hub finding severity to Slack message format color""" |
| 256 | + |
| 257 | + CRITICAL = "danger" |
| 258 | + HIGH = "danger" |
| 259 | + MEDIUM = "warning" |
| 260 | + LOW = "#777777" |
| 261 | + INFORMATIONAL = "#439FE0" |
| 262 | + |
| 263 | + @staticmethod |
| 264 | + def get(name, default): |
| 265 | + try: |
| 266 | + return SecurityHubSeverity[name] |
| 267 | + except KeyError: |
| 268 | + return default |
| 269 | + |
| 270 | + |
126 | 271 | class GuardDutyFindingSeverity(Enum):
|
127 | 272 | """Maps GuardDuty finding severity to Slack message format color"""
|
128 | 273 |
|
@@ -358,6 +503,28 @@ def format_default(
|
358 | 503 | return attachments
|
359 | 504 |
|
360 | 505 |
|
| 506 | +def parse_notification(message: Dict[str, Any], subject: Optional[str], region: str) -> Optional[Dict]: |
| 507 | + """ |
| 508 | + Parse notification message and format into Slack message payload |
| 509 | +
|
| 510 | + :params message: SNS message body notification payload |
| 511 | + :params subject: Optional subject line for Slack notification |
| 512 | + :params region: AWS region where the event originated from |
| 513 | + :returns: Slack message payload |
| 514 | + """ |
| 515 | + if "AlarmName" in message: |
| 516 | + return format_cloudwatch_alarm(message=message, region=region) |
| 517 | + if isinstance(message, Dict) and message.get("detail-type") == "GuardDuty Finding": |
| 518 | + return format_guardduty_finding(message=message, region=message["region"]) |
| 519 | + if isinstance(message, Dict) and message.get("detail-type") == "Security Hub Findings - Imported": |
| 520 | + return format_aws_security_hub(message=message, region=message["region"]) |
| 521 | + if isinstance(message, Dict) and message.get("detail-type") == "AWS Health Event": |
| 522 | + return format_aws_health(message=message, region=message["region"]) |
| 523 | + if subject == "Notification from AWS Backup": |
| 524 | + return format_aws_backup(message=str(message)) |
| 525 | + return format_default(message=message, subject=subject) |
| 526 | + |
| 527 | + |
361 | 528 | def get_slack_message_payload(
|
362 | 529 | message: Union[str, Dict], region: str, subject: Optional[str] = None
|
363 | 530 | ) -> Dict:
|
@@ -389,31 +556,10 @@ def get_slack_message_payload(
|
389 | 556 |
|
390 | 557 | message = cast(Dict[str, Any], message)
|
391 | 558 |
|
392 |
| - if "AlarmName" in message: |
393 |
| - notification = format_cloudwatch_alarm(message=message, region=region) |
394 |
| - attachment = notification |
395 |
| - |
396 |
| - elif ( |
397 |
| - isinstance(message, Dict) and message.get("detail-type") == "GuardDuty Finding" |
398 |
| - ): |
399 |
| - notification = format_guardduty_finding( |
400 |
| - message=message, region=message["region"] |
401 |
| - ) |
402 |
| - attachment = notification |
403 |
| - |
404 |
| - elif isinstance(message, Dict) and message.get("detail-type") == "AWS Health Event": |
405 |
| - notification = format_aws_health(message=message, region=message["region"]) |
406 |
| - attachment = notification |
407 |
| - |
408 |
| - elif subject == "Notification from AWS Backup": |
409 |
| - notification = format_aws_backup(message=str(message)) |
410 |
| - attachment = notification |
411 |
| - |
412 |
| - elif "attachments" in message or "text" in message: |
| 559 | + if "attachments" in message or "text" in message: |
413 | 560 | payload = {**payload, **message}
|
414 |
| - |
415 | 561 | else:
|
416 |
| - attachment = format_default(message=message, subject=subject) |
| 562 | + attachment = parse_notification(message, subject, region) |
417 | 563 |
|
418 | 564 | if attachment:
|
419 | 565 | payload["attachments"] = [attachment] # type: ignore
|
@@ -453,6 +599,7 @@ def lambda_handler(event: Dict[str, Any], context: Dict[str, Any]) -> str:
|
453 | 599 | :param context: lambda expected context object
|
454 | 600 | :returns: none
|
455 | 601 | """
|
| 602 | + |
456 | 603 | if os.environ.get("LOG_EVENTS", "False") == "True":
|
457 | 604 | logging.info("Event logging enabled: %s", json.dumps(event))
|
458 | 605 |
|
|
0 commit comments