Skip to content

Commit

Permalink
OpenAI Token (#530)
Browse files Browse the repository at this point in the history
* renamed provider classes

* docs

* import optimization

* extra files removed

* style

* severity

* OpenAI token rule

* fix
  • Loading branch information
babenek authored Apr 10, 2024
1 parent 8ce8524 commit 85bcbe2
Show file tree
Hide file tree
Showing 28 changed files with 325 additions and 213 deletions.
1 change: 1 addition & 0 deletions credsweeper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
'__version__'
]

# todo: raise to 1.7.0 for release
__version__ = "1.6.2"
12 changes: 6 additions & 6 deletions credsweeper/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from credsweeper import __version__
from credsweeper.app import APP_PATH, CredSweeper
from credsweeper.common.constants import ThresholdPreset, Severity, RuleType, DiffRowType
from credsweeper.file_handler.abstract_provider import AbstractProvider
from credsweeper.file_handler.files_provider import FilesProvider
from credsweeper.file_handler.patch_provider import PatchProvider
from credsweeper.file_handler.text_provider import TextProvider
from credsweeper.file_handler.patches_provider import PatchesProvider
from credsweeper.logger.logger import Logger
from credsweeper.utils import Util

Expand Down Expand Up @@ -257,7 +257,7 @@ def get_json_filenames(json_filename: str):
return added_json_filename, deleted_json_filename


def scan(args: Namespace, content_provider: FilesProvider, json_filename: Optional[str],
def scan(args: Namespace, content_provider: AbstractProvider, json_filename: Optional[str],
xlsx_filename: Optional[str]) -> int:
"""Scan content_provider data, print results or save them to json_filename is not None
Expand Down Expand Up @@ -315,7 +315,7 @@ def main() -> int:
summary: Dict[str, int] = {}
if args.path:
logger.info(f"Run analyzer on path: {args.path}")
content_provider: FilesProvider = TextProvider(args.path, skip_ignored=args.skip_ignored)
content_provider: AbstractProvider = FilesProvider(args.path, skip_ignored=args.skip_ignored)
credentials_number = scan(args, content_provider, args.json_filename, args.xlsx_filename)
summary["Detected Credentials"] = credentials_number
if 0 <= credentials_number:
Expand All @@ -324,12 +324,12 @@ def main() -> int:
added_json_filename, deleted_json_filename = get_json_filenames(args.json_filename)
# Analyze added data
logger.info(f"Run analyzer on added rows from patch files: {args.diff_path}")
content_provider = PatchProvider(args.diff_path, change_type=DiffRowType.ADDED)
content_provider = PatchesProvider(args.diff_path, change_type=DiffRowType.ADDED)
add_credentials_number = scan(args, content_provider, added_json_filename, args.xlsx_filename)
summary["Added File Credentials"] = add_credentials_number
# Analyze deleted data
logger.info(f"Run analyzer on deleted rows from patch files: {args.diff_path}")
content_provider = PatchProvider(args.diff_path, change_type=DiffRowType.DELETED)
content_provider = PatchesProvider(args.diff_path, change_type=DiffRowType.DELETED)
del_credentials_number = scan(args, content_provider, deleted_json_filename, args.xlsx_filename)
summary["Deleted File Credentials"] = del_credentials_number
if 0 <= add_credentials_number and 0 <= del_credentials_number:
Expand Down
23 changes: 13 additions & 10 deletions credsweeper/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import signal
import sys
from pathlib import Path
from typing import Any, List, Optional, Union, Dict
from typing import Any, List, Optional, Union, Dict, Sequence

import pandas as pd

Expand All @@ -17,7 +17,7 @@
from credsweeper.deep_scanner.deep_scanner import DeepScanner
from credsweeper.file_handler.diff_content_provider import DiffContentProvider
from credsweeper.file_handler.file_path_extractor import FilePathExtractor
from credsweeper.file_handler.files_provider import FilesProvider
from credsweeper.file_handler.abstract_provider import AbstractProvider
from credsweeper.file_handler.text_content_provider import TextContentProvider
from credsweeper.scanner import Scanner
from credsweeper.utils import Util
Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(self,
find_by_ext: bool = False,
depth: int = 0,
doc: bool = False,
severity: Severity = Severity.INFO,
severity: Union[Severity, str] = Severity.INFO,
size_limit: Optional[str] = None,
exclude_lines: Optional[List[str]] = None,
exclude_values: Optional[List[str]] = None,
Expand Down Expand Up @@ -87,13 +87,16 @@ def __init__(self,
"""
self.pool_count: int = int(pool_count) if int(pool_count) > 1 else 1
if not (_severity := Severity.get(severity)):
raise RuntimeError(f"Severity level provided: {severity}"
f" -- must be one of: {' | '.join([i.value for i in Severity])}")
config_dict = self._get_config_dict(config_path=config_path,
api_validation=api_validation,
use_filters=use_filters,
find_by_ext=find_by_ext,
depth=depth,
doc=doc,
severity=severity,
severity=_severity,
size_limit=size_limit,
exclude_lines=exclude_lines,
exclude_values=exclude_values)
Expand Down Expand Up @@ -215,15 +218,15 @@ def config(self, config: Config) -> None:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def run(self, content_provider: FilesProvider) -> int:
def run(self, content_provider: AbstractProvider) -> int:
"""Run an analysis of 'content_provider' object.
Args:
content_provider: path objects to scan
"""
_empty_list: List[Union[DiffContentProvider, TextContentProvider]] = []
file_extractors: List[Union[DiffContentProvider, TextContentProvider]] = \
_empty_list: Sequence[Union[DiffContentProvider, TextContentProvider]] = []
file_extractors: Sequence[Union[DiffContentProvider, TextContentProvider]] = \
content_provider.get_scannable_files(self.config) if content_provider else _empty_list
logger.info(f"Start Scanner for {len(file_extractors)} providers")
self.scan(file_extractors)
Expand All @@ -234,7 +237,7 @@ def run(self, content_provider: FilesProvider) -> int:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def scan(self, content_providers: List[Union[DiffContentProvider, TextContentProvider]]) -> None:
def scan(self, content_providers: Sequence[Union[DiffContentProvider, TextContentProvider]]) -> None:
"""Run scanning of files from an argument "content_providers".
Args:
Expand All @@ -248,7 +251,7 @@ def scan(self, content_providers: List[Union[DiffContentProvider, TextContentPro

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def __single_job_scan(self, content_providers: List[Union[DiffContentProvider, TextContentProvider]]) -> None:
def __single_job_scan(self, content_providers: Sequence[Union[DiffContentProvider, TextContentProvider]]) -> None:
"""Performs scan in main thread"""
all_cred: List[Candidate] = []
for i in content_providers:
Expand All @@ -265,7 +268,7 @@ def __single_job_scan(self, content_providers: List[Union[DiffContentProvider, T

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def __multi_jobs_scan(self, content_providers: List[Union[DiffContentProvider, TextContentProvider]]) -> None:
def __multi_jobs_scan(self, content_providers: Sequence[Union[DiffContentProvider, TextContentProvider]]) -> None:
"""Performs scan with multiple jobs"""
# use this separation to satisfy YAPF formatter
yapfix = "%(asctime)s | %(levelname)s | %(processName)s:%(threadName)s | %(filename)s:%(lineno)s | %(message)s"
Expand Down
2 changes: 1 addition & 1 deletion credsweeper/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __lt__(self, other) -> bool:

@staticmethod
def get(confidence: Union[str, "Confidence"]) -> Optional["Confidence"]:
"""returns Severity value from string or None"""
"""returns Confidence value from string or None"""
if isinstance(confidence, Confidence):
return confidence
if isinstance(confidence, str):
Expand Down
44 changes: 44 additions & 0 deletions credsweeper/file_handler/abstract_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import io
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Union, Tuple, Sequence

from credsweeper.config import Config
from credsweeper.file_handler.diff_content_provider import DiffContentProvider
from credsweeper.file_handler.text_content_provider import TextContentProvider


class AbstractProvider(ABC):
"""Base class for all files provider objects."""

def __init__(self, paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None:
"""Initialize Files Provider object for 'paths'.
Args:
paths: file paths list to scan or io.BytesIO or tuple with both
"""
self.paths = paths

@property
def paths(self) -> Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]:
"""paths getter"""
return self.__paths

@paths.setter
def paths(self, paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None:
"""paths setter"""
self.__paths = paths

@abstractmethod
def get_scannable_files(self, config: Config) -> Sequence[Union[DiffContentProvider, TextContentProvider]]:
"""Get list of file object for analysis based on attribute "paths".
Args:
config: dict of credsweeper configuration
Return:
file objects to analyse
"""
raise NotImplementedError()
65 changes: 41 additions & 24 deletions credsweeper/file_handler/files_provider.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,61 @@
import io
from abc import ABC, abstractmethod
import logging
from pathlib import Path
from typing import List, Union, Tuple
from typing import List, Optional, Union, Tuple, Sequence

from credsweeper import DiffContentProvider
from credsweeper.config import Config
from credsweeper.file_handler.diff_content_provider import DiffContentProvider
from credsweeper.file_handler.abstract_provider import AbstractProvider
from credsweeper.file_handler.file_path_extractor import FilePathExtractor
from credsweeper.file_handler.text_content_provider import TextContentProvider

logger = logging.getLogger(__name__)

class FilesProvider(ABC):
"""Base class for all files provider objects."""

def __init__(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None:
"""Initialize Files Provider object for 'paths'.
class FilesProvider(AbstractProvider):
"""Provider of plain os files to be analysed."""

def __init__(self,
paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]],
skip_ignored: Optional[bool] = None) -> None:
"""Initialize Files Text Provider for files from 'paths'.
Args:
paths: file paths list to scan or io.BytesIO or tuple with both
paths: list of parent paths of files to scan
OR tuple of path (info purpose) and io.BytesIO (reads the data from current pos)
skip_ignored: boolean variable, Checking the directory to the list
of ignored directories from the gitignore file
"""
self.paths = paths

@property
def paths(self) -> List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]:
"""paths getter"""
return self.__paths

@paths.setter
def paths(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None:
"""paths setter"""
self.__paths = paths
super().__init__(paths)
self.skip_ignored = skip_ignored

@abstractmethod
def get_scannable_files(self, config: Config) -> List[Union[DiffContentProvider, TextContentProvider]]:
"""Get list of file object for analysis based on attribute "paths".
def get_scannable_files(self, config: Config) -> Sequence[Union[DiffContentProvider, TextContentProvider]]:
"""Get list of full text file object for analysis of files with parent paths from "paths".
Args:
config: dict of credsweeper configuration
Return:
file objects to analyse
preprocessed file objects for analysis
"""
raise NotImplementedError()
text_content_provider_list: List[Union[DiffContentProvider, TextContentProvider]] = []
for path in self.paths:
if isinstance(path, str) or isinstance(path, Path):
new_files = FilePathExtractor.get_file_paths(config, path)
if self.skip_ignored:
new_files = FilePathExtractor.apply_gitignore(new_files)
for _file in new_files:
text_content_provider_list.append(TextContentProvider(_file))
elif isinstance(path, io.BytesIO):
text_content_provider_list.append(TextContentProvider((":memory:", path)))
elif isinstance(path, tuple) \
and (isinstance(path[0], str) or isinstance(path[0], Path)) \
and isinstance(path[1], io.BytesIO):
# suppose, all the files must be scanned
text_content_provider_list.append(TextContentProvider(path))
else:
logger.error(f"Unknown path type: {path}")

return text_content_provider_list
Original file line number Diff line number Diff line change
@@ -1,40 +1,30 @@
import io
import logging
from pathlib import Path
from typing import List, Union, Tuple
from typing import List, Union, Tuple, Sequence

from credsweeper import TextContentProvider
from credsweeper.common.constants import DiffRowType
from credsweeper.config import Config
from credsweeper.file_handler.abstract_provider import AbstractProvider
from credsweeper.file_handler.diff_content_provider import DiffContentProvider
from credsweeper.file_handler.file_path_extractor import FilePathExtractor
from credsweeper.file_handler.files_provider import FilesProvider
from credsweeper.utils import Util

logger = logging.getLogger(__name__)


class PatchProvider(FilesProvider):
class PatchesProvider(AbstractProvider):
"""Provide data from a list of `.patch` files.
Allows to scan for data that has changed between git commits, rather than the entire project.
Parameters:
paths: file paths list to scan. All files should be in `.patch` format
change_type: string, type of analyses changes in patch (added or deleted)
skip_ignored: boolean variable, Checking the directory to the list
of ignored directories from the gitignore file
"""

def __init__(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]],
def __init__(self, paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]],
change_type: DiffRowType) -> None:
"""Initialize Files Patch Provider for patch files from 'paths'.
Args:
paths: file paths list to scan. All files should be in `.patch` format
change_type: string, type of analyses changes in patch (added or deleted)
skip_ignored: boolean variable, Checking the directory to the list
change_type: DiffRowType, type of analyses changes in patch (added or deleted)
of ignored directories from the gitignore file
"""
Expand All @@ -57,7 +47,8 @@ def load_patch_data(self, config: Config) -> List[List[str]]:

return raw_patches

def get_files_sequence(self, raw_patches: List[List[str]]) -> List[Union[DiffContentProvider, TextContentProvider]]:
def get_files_sequence(self,
raw_patches: List[List[str]]) -> Sequence[Union[DiffContentProvider, TextContentProvider]]:
"""Returns sequence of files"""
files: List[Union[DiffContentProvider, TextContentProvider]] = []
for raw_patch in raw_patches:
Expand All @@ -66,7 +57,7 @@ def get_files_sequence(self, raw_patches: List[List[str]]) -> List[Union[DiffCon
files.append(DiffContentProvider(file_path=file_path, change_type=self.change_type, diff=file_diff))
return files

def get_scannable_files(self, config: Config) -> List[Union[DiffContentProvider, TextContentProvider]]:
def get_scannable_files(self, config: Config) -> Sequence[Union[DiffContentProvider, TextContentProvider]]:
"""Get files to scan. Output based on the `paths` field.
Args:
Expand Down
Loading

0 comments on commit 85bcbe2

Please sign in to comment.