Skip to content

Fixes for diff logic #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ file_generator.py
.coverage
.env.local
Pipfile
test/
test/
logs
20 changes: 20 additions & 0 deletions Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ socketcli [-h] [--api-token API_TOKEN] [--repo REPO] [--integration {api,github,
[--target-path TARGET_PATH] [--sbom-file SBOM_FILE] [--files FILES] [--default-branch] [--pending-head]
[--generate-license] [--enable-debug] [--enable-json] [--enable-sarif] [--disable-overview] [--disable-security-issue]
[--allow-unverified] [--ignore-commit-files] [--disable-blocking] [--scm SCM] [--timeout TIMEOUT]
[--exclude-license-details]
````

If you don't want to provide the Socket API Token every time then you can use the environment variable `SOCKET_SECURITY_API_KEY`
Expand Down Expand Up @@ -58,6 +59,7 @@ If you don't want to provide the Socket API Token every time then you can use th
| --enable-json | False | False | Output in JSON format |
| --enable-sarif | False | False | Enable SARIF output of results instead of table or JSON format|
| --disable-overview | False | False | Disable overview output |
| --exclude-license-details | False | False | Exclude license details from the diff report (boosts performance for large repos) |

#### Security Configuration
| Parameter | Required | Default | Description |
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ dependencies = [
'prettytable',
'GitPython',
'packaging',
'python-dotenv',
'socket-sdk-python>=2.0.5'
'python-dotenv',
'socket-sdk-python>=2.0.7'
]
readme = "README.md"
description = "Socket Security CLI for CI/CD"
Expand Down
2 changes: 1 addition & 1 deletion socketsecurity/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
__author__ = 'socket.dev'
__version__ = '2.0.6'
__version__ = '2.0.7'
8 changes: 8 additions & 0 deletions socketsecurity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class CliConfig:
integration_org_slug: Optional[str] = None
pending_head: bool = False
timeout: Optional[int] = 1200
exclude_license_details: bool = False
@classmethod
def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig':
parser = create_argument_parser()
Expand Down Expand Up @@ -71,6 +72,7 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig':
'integration_type': args.integration,
'pending_head': args.pending_head,
'timeout': args.timeout,
'exclude_license_details': args.exclude_license_details,
}

if args.owner:
Expand Down Expand Up @@ -283,6 +285,12 @@ def create_argument_parser() -> argparse.ArgumentParser:
action="store_true",
help=argparse.SUPPRESS
)
output_group.add_argument(
"--exclude-license-details",
dest="exclude_license_details",
action="store_true",
help="Exclude license details from the diff report (boosts performance for large repos)"
)

# Security Configuration
security_group = parser.add_argument_group('Security Configuration')
Expand Down
148 changes: 90 additions & 58 deletions socketsecurity/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import base64
import json
import logging
import time
import sys
from dataclasses import asdict
from glob import glob
from pathlib import PurePath
from typing import BinaryIO, Dict, List, Optional, Tuple

from typing import BinaryIO, Dict, List, Tuple
from socketdev import socketdev
from socketdev.fullscans import (
FullScanParams,
SocketArtifact,
DiffArtifact,
SocketArtifact
)
from socketdev.org import Organization
from socketdev.repos import RepositoryInfo
Expand All @@ -27,8 +24,9 @@
Purl,
)
from socketsecurity.core.exceptions import (
APIResourceNotFound,
APIResourceNotFound
)
from socketdev.exceptions import APIFailure
from socketsecurity.core.licenses import Licenses

from .socket_config import SocketConfig
Expand Down Expand Up @@ -148,21 +146,24 @@ def find_files(path: str) -> List[str]:
for file_name in patterns:
pattern = Core.to_case_insensitive_regex(patterns[file_name]["pattern"])
file_path = f"{path}/**/{pattern}"
log.debug(f"Globbing {file_path}")
#log.debug(f"Globbing {file_path}")
glob_start = time.time()
glob_files = glob(file_path, recursive=True)
for glob_file in glob_files:
if glob_file not in files:
files.add(glob_file)
glob_end = time.time()
glob_total_time = glob_end - glob_start
log.debug(f"Glob for pattern {file_path} took {glob_total_time:.2f} seconds")
#log.debug(f"Glob for pattern {file_path} took {glob_total_time:.2f} seconds")

log.debug("Finished Find Files")
end_time = time.time()
total_time = end_time - start_time
log.info(f"Found {len(files)} in {total_time:.2f} seconds")
log.debug(f"Files found: {list(files)}")
files_list = list(files)
if len(files_list) > 5:
log.debug(f"{len(files_list)} Files found ({total_time:.2f}s): {', '.join(files_list[:5])}, ...")
else:
log.debug(f"{len(files_list)} Files found ({total_time:.2f}s): {', '.join(files_list)}")
return list(files)

@staticmethod
Expand Down Expand Up @@ -216,7 +217,7 @@ def load_files_for_sending(files: List[str], workspace: str) -> List[Tuple[str,

return send_files

def create_full_scan(self, files: List[str], params: FullScanParams) -> FullScan:
def create_full_scan(self, files: List[str], params: FullScanParams, has_head_scan: bool = False) -> FullScan:
"""
Creates a new full scan via the Socket API.

Expand All @@ -236,10 +237,10 @@ def create_full_scan(self, files: List[str], params: FullScanParams) -> FullScan
raise Exception(f"Error creating full scan: {res.message}, status: {res.status}")

full_scan = FullScan(**asdict(res.data))

full_scan_artifacts_dict = self.get_sbom_data(full_scan.id)
full_scan.sbom_artifacts = self.get_sbom_data_list(full_scan_artifacts_dict)
full_scan.packages = self.create_packages_dict(full_scan.sbom_artifacts)
if not has_head_scan:
full_scan_artifacts_dict = self.get_sbom_data(full_scan.id)
full_scan.sbom_artifacts = self.get_sbom_data_list(full_scan_artifacts_dict)
full_scan.packages = self.create_packages_dict(full_scan.sbom_artifacts)

create_full_end = time.time()
total_time = create_full_end - create_full_start
Expand Down Expand Up @@ -317,24 +318,37 @@ def get_package_license_text(self, package: Package) -> str:

return ""

def get_repo_info(self, repo_slug: str) -> RepositoryInfo:
def get_repo_info(self, repo_slug: str, default_branch: str = "socket-default-branch") -> RepositoryInfo:
"""
Gets repository information from the Socket API.

Args:
repo_slug: Repository slug to get info for
default_branch: Default branch string to use if the repo doesn't exist

Returns:
RepositoryInfo object

Raises:
Exception: If API request fails
"""
response = self.sdk.repos.repo(self.config.org_slug, repo_slug)
if not response.success:
log.error(f"Failed to get repository: {response.status}")
log.error(response.message)
raise Exception(f"Failed to get repository info: {response.status}, message: {response.message}")
try:
response = self.sdk.repos.repo(self.config.org_slug, repo_slug)
if not response.success:
log.error(f"Failed to get repository: {response.status}")
log.error(response.message)
# raise Exception(f"Failed to get repository info: {response.status}, message: {response.message}")
except APIFailure:
log.warning(f"Failed to get repository {repo_slug}, attempting to create it")
create_response = self.sdk.repos.post(self.config.org_slug, name=repo_slug, default_branch=default_branch)
if not create_response.success:
log.error(f"Failed to create repository: {create_response.status}")
log.error(create_response.message)
raise Exception(
f"Failed to create repository: {create_response.status}, message: {create_response.message}"
)
else:
return create_response.data
return response.data

def get_head_scan_for_repo(self, repo_slug: str) -> str:
Expand All @@ -350,24 +364,36 @@ def get_head_scan_for_repo(self, repo_slug: str) -> str:
repo_info = self.get_repo_info(repo_slug)
return repo_info.head_full_scan_id if repo_info.head_full_scan_id else None

def get_added_and_removed_packages(self, head_full_scan: Optional[FullScan], new_full_scan: FullScan) -> Tuple[Dict[str, Package], Dict[str, Package]]:
@staticmethod
def update_package_values(pkg: Package) -> Package:
pkg.purl = f"{pkg.name}@{pkg.version}"
pkg.url = f"https://socket.dev/{pkg.type}/package"
if pkg.namespace:
pkg.purl = f"{pkg.namespace}/{pkg.purl}"
pkg.url += f"/{pkg.namespace}"
pkg.url += f"/{pkg.name}/overview/{pkg.version}"
return pkg

def get_added_and_removed_packages(self, head_full_scan_id: str, new_full_scan: FullScan) -> Tuple[Dict[str, Package], Dict[str, Package]]:
"""
Get packages that were added and removed between scans.

Args:
head_full_scan: Previous scan (may be None if first scan)
new_full_scan: New scan just created
head_full_scan_id: New scan just created

Returns:
Tuple of (added_packages, removed_packages) dictionaries
"""
if head_full_scan is None:
if head_full_scan_id is None:
log.info(f"No head scan found. New scan ID: {new_full_scan.id}")
return new_full_scan.packages, {}

log.info(f"Comparing scans - Head scan ID: {head_full_scan.id}, New scan ID: {new_full_scan.id}")
diff_report = self.sdk.fullscans.stream_diff(self.config.org_slug, head_full_scan.id, new_full_scan.id).data

log.info(f"Comparing scans - Head scan ID: {head_full_scan_id}, New scan ID: {new_full_scan.id}")
diff_start = time.time()
diff_report = self.sdk.fullscans.stream_diff(self.config.org_slug, head_full_scan_id, new_full_scan.id).data
diff_end = time.time()
log.info(f"Diff Report Gathered in {diff_end - diff_start:.2f} seconds")
log.info(f"Diff report artifact counts:")
log.info(f"Added: {len(diff_report.artifacts.added)}")
log.info(f"Removed: {len(diff_report.artifacts.removed)}")
Expand All @@ -384,32 +410,24 @@ def get_added_and_removed_packages(self, head_full_scan: Optional[FullScan], new
for artifact in added_artifacts:
try:
pkg = Package.from_diff_artifact(asdict(artifact))
pkg = Core.update_package_values(pkg)
added_packages[artifact.id] = pkg
except KeyError:
log.error(f"KeyError: Could not create package from added artifact {artifact.id}")
log.error(f"Artifact details - name: {artifact.name}, version: {artifact.version}")
matches = [p for p in new_full_scan.packages.values() if p.name == artifact.name and p.version == artifact.version]
if matches:
log.error(f"Found {len(matches)} packages with matching name/version:")
for m in matches:
log.error(f" ID: {m.id}, name: {m.name}, version: {m.version}")
else:
log.error("No matching packages found in new_full_scan")
log.error("No matching packages found in new_full_scan")

for artifact in removed_artifacts:
try:
pkg = Package.from_diff_artifact(asdict(artifact))
pkg = Core.update_package_values(pkg)
if pkg.namespace:
pkg.purl += f"{pkg.namespace}/{pkg.purl}"
removed_packages[artifact.id] = pkg
except KeyError:
log.error(f"KeyError: Could not create package from removed artifact {artifact.id}")
log.error(f"Artifact details - name: {artifact.name}, version: {artifact.version}")
matches = [p for p in head_full_scan.packages.values() if p.name == artifact.name and p.version == artifact.version]
if matches:
log.error(f"Found {len(matches)} packages with matching name/version:")
for m in matches:
log.error(f" ID: {m.id}, name: {m.name}, version: {m.version}")
else:
log.error("No matching packages found in head_full_scan")
log.error("No matching packages found in head_full_scan")

return added_packages, removed_packages

Expand All @@ -435,36 +453,49 @@ def create_new_diff(
files = self.find_files(path)
files_for_sending = self.load_files_for_sending(files, path)

log.debug(f"files: {files} found at path {path}")
if not files:
return Diff(id="no_diff_id")

head_full_scan_id = None

try:
# Get head scan ID
head_full_scan_id = self.get_head_scan_for_repo(params.repo)
has_head_scan = True
except APIResourceNotFound:
head_full_scan_id = None
has_head_scan = False

# Create new scan
new_scan_start = time.time()
new_full_scan = self.create_full_scan(files_for_sending, params)
new_scan_end = time.time()
log.info(f"Total time to create new full scan: {new_scan_end - new_scan_start:.2f}")


head_full_scan = None
if head_full_scan_id:
head_full_scan = self.get_full_scan(head_full_scan_id)
# Create new scan
try:
new_scan_start = time.time()
new_full_scan = self.create_full_scan(files_for_sending, params, has_head_scan)
new_scan_end = time.time()
log.info(f"Total time to create new full scan: {new_scan_end - new_scan_start:.2f}")
except APIFailure as e:
log.error(f"API Error: {e}")
sys.exit(1)
except Exception as e:
log.error(f"Unexpected error while creating new scan: {e}")
sys.exit(1)

added_packages, removed_packages = self.get_added_and_removed_packages(head_full_scan, new_full_scan)
try:
added_packages, removed_packages = self.get_added_and_removed_packages(head_full_scan_id, new_full_scan)
except APIFailure as e:
log.error(f"API Error: {e}")
sys.exit(1)
except Exception as e:
log.error(f"Unexpected error while comparing packages: {e}")
sys.exit(1)

diff = self.create_diff_report(added_packages, removed_packages)

base_socket = "https://socket.dev/dashboard/org"
diff.id = new_full_scan.id
diff.report_url = f"{base_socket}/{self.config.org_slug}/sbom/{diff.id}"

report_url = f"{base_socket}/{self.config.org_slug}/sbom/{diff.id}"
if not params.include_license_details:
report_url += "?include_license_details=false"
diff.report_url = report_url

if head_full_scan_id is not None:
diff.diff_url = f"{base_socket}/{self.config.org_slug}/diff/{diff.id}/{head_full_scan_id}"
else:
Expand Down Expand Up @@ -609,7 +640,8 @@ def get_source_data(package: Package, packages: dict) -> list:
source = (top_purl, manifests)
introduced_by.append(source)
else:
log.debug(f"Unable to get top level package info for {top_id}")
pass
# log.debug(f"Unable to get top level package info for {top_id}")
return introduced_by

@staticmethod
Expand Down
Loading
Loading