-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathoutput.py
129 lines (106 loc) · 5.1 KB
/
output.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import json
import logging
from pathlib import Path
from typing import Any, Dict, Optional
from .core.messages import Messages
from .core.classes import Diff, Issue
from .config import CliConfig
from socketsecurity.plugins.manager import PluginManager
from socketdev import socketdev
class OutputHandler:
config: CliConfig
logger: logging.Logger
def __init__(self, config: CliConfig, sdk: socketdev):
self.config = config
self.logger = logging.getLogger("socketcli")
def handle_output(self, diff_report: Diff) -> None:
"""Main output handler that determines output format"""
if self.config.enable_json:
self.output_console_json(diff_report, self.config.sbom_file)
elif self.config.enable_sarif:
self.output_console_sarif(diff_report, self.config.sbom_file)
else:
self.output_console_comments(diff_report, self.config.sbom_file)
if self.config.jira_plugin.enabled:
jira_config = {
"enabled": self.config.jira_plugin.enabled,
"levels": self.config.jira_plugin.levels or [],
**(self.config.jira_plugin.config or {})
}
plugin_mgr = PluginManager({"jira": jira_config})
plugin_mgr.send(diff_report, config=self.config)
if self.config.slack_plugin.enabled:
slack_config = {
"enabled": self.config.slack_plugin.enabled,
"levels": self.config.slack_plugin.levels or [],
**(self.config.slack_plugin.config or {})
}
plugin_mgr = PluginManager({"slack": slack_config})
plugin_mgr.send(diff_report, config=self.config)
self.save_sbom_file(diff_report, self.config.sbom_file)
def return_exit_code(self, diff_report: Diff) -> int:
if self.config.disable_blocking:
return 0
if not self.report_pass(diff_report):
return 1
if len(diff_report.new_alerts) > 0:
# 5 means warning alerts but no blocking alerts
return 5
return 0
def output_console_comments(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None:
"""Outputs formatted console comments"""
if len(diff_report.new_alerts) == 0:
self.logger.info("No issues found")
return
console_security_comment = Messages.create_console_security_alert_table(diff_report)
self.logger.info("Security issues detected by Socket Security:")
self.logger.info(console_security_comment)
def output_console_json(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None:
"""Outputs JSON formatted results"""
console_security_comment = Messages.create_security_comment_json(diff_report)
self.save_sbom_file(diff_report, sbom_file_name)
self.logger.info(json.dumps(console_security_comment))
def output_console_sarif(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None:
"""
Generate SARIF output from the diff report and print to console.
"""
if diff_report.id != "NO_DIFF_RAN":
# Generate the SARIF structure using Messages
console_security_comment = Messages.create_security_comment_sarif(diff_report)
self.save_sbom_file(diff_report, sbom_file_name)
# Print the SARIF output to the console in JSON format
print(json.dumps(console_security_comment, indent=2))
def report_pass(self, diff_report: Diff) -> bool:
"""Determines if the report passes security checks"""
if not diff_report.new_alerts:
return True
if self.config.disable_blocking:
return True
return not any(issue.error for issue in diff_report.new_alerts)
def save_sbom_file(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None:
"""Saves SBOM file if filename is provided"""
if not sbom_file_name or not diff_report.sbom:
return
sbom_path = Path(sbom_file_name)
sbom_path.parent.mkdir(parents=True, exist_ok=True)
with open(sbom_path, "w") as f:
json.dump(diff_report.sbom, f, indent=2)
def _output_issue(self, issue: Issue) -> None:
"""Helper method to format and output a single issue"""
severity = issue.severity.upper() if issue.severity else "UNKNOWN"
status = "🚫 Blocking" if issue.error else "⚠️ Warning"
self.logger.warning(f"\n{status} - Severity: {severity}")
self.logger.warning(f"Title: {issue.title}")
if issue.description:
self.logger.warning(f"Description: {issue.description}")
if issue.suggestion:
self.logger.warning(f"suggestion: {issue.suggestion}")
def _format_issue(self, issue: Issue) -> Dict[str, Any]:
"""Helper method to format an issue for JSON output"""
return {
"purl": issue.purl,
"title": issue.title,
"description": issue.description,
"severity": issue.severity,
"blocking": issue.error,
}