Skip to content

Commit f43fbfb

Browse files
[FR] Update utility path computation to use pathlib (#3699)
* update * Updated to pathlib * Linting * Add string cast where needed * Add additional string conversion as needed * Str conversions to support eql lib * Attack typo * Typo in test script * Updated for more pathlib * Linting * Update to convert string to path object * Fix typo
1 parent f73022b commit f43fbfb

15 files changed

+67
-69
lines changed

detection_rules/__main__.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@
55

66
# coding=utf-8
77
"""Shell for detection-rules."""
8-
import os
98
import sys
9+
from pathlib import Path
1010

1111
import click
1212

1313
assert (3, 12) <= sys.version_info < (4, 0), "Only Python 3.12+ supported"
1414

15+
1516
from .main import root # noqa: E402
1617

17-
CURR_DIR = os.path.dirname(os.path.abspath(__file__))
18-
CLI_DIR = os.path.dirname(CURR_DIR)
19-
ROOT_DIR = os.path.dirname(CLI_DIR)
18+
CURR_DIR = Path(__file__).resolve().parent
19+
CLI_DIR = CURR_DIR.parent
20+
ROOT_DIR = CLI_DIR.parent
2021

2122
BANNER = r"""
2223
█▀▀▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄▄▄ ▄ ▄ █▀▀▄ ▄ ▄ ▄ ▄▄▄ ▄▄▄

detection_rules/attack.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
from .utils import cached, clear_caches, get_etc_path, get_etc_glob_path, read_gzip, gzip_compress
1818

1919
PLATFORMS = ['Windows', 'macOS', 'Linux']
20-
CROSSWALK_FILE = Path(get_etc_path('attack-crosswalk.json'))
21-
TECHNIQUES_REDIRECT_FILE = Path(get_etc_path('attack-technique-redirects.json'))
20+
CROSSWALK_FILE = get_etc_path('attack-crosswalk.json')
21+
TECHNIQUES_REDIRECT_FILE = get_etc_path('attack-technique-redirects.json')
2222

2323
tactics_map = {}
2424

@@ -28,17 +28,17 @@ def load_techniques_redirect() -> dict:
2828
return json.loads(TECHNIQUES_REDIRECT_FILE.read_text())['mapping']
2929

3030

31-
def get_attack_file_path() -> str:
31+
def get_attack_file_path() -> Path:
3232
pattern = 'attack-v*.json.gz'
3333
attack_file = get_etc_glob_path(pattern)
3434
if len(attack_file) < 1:
3535
raise FileNotFoundError(f'Missing required {pattern} file')
3636
elif len(attack_file) != 1:
3737
raise FileExistsError(f'Multiple files found with {pattern} pattern. Only one is allowed')
38-
return attack_file[0]
38+
return Path(attack_file[0])
3939

4040

41-
_, _attack_path_base = get_attack_file_path().split('-v')
41+
_, _attack_path_base = str(get_attack_file_path()).split('-v')
4242
_ext_length = len('.json.gz')
4343
CURRENT_ATTACK_VERSION = _attack_path_base[:-_ext_length]
4444

@@ -98,7 +98,7 @@ def load_attack_gz() -> dict:
9898

9999
def refresh_attack_data(save=True) -> (Optional[dict], Optional[bytes]):
100100
"""Refresh ATT&CK data from Mitre."""
101-
attack_path = Path(get_attack_file_path())
101+
attack_path = get_attack_file_path()
102102
filename, _, _ = attack_path.name.rsplit('.', 2)
103103

104104
def get_version_from_tag(name, pattern='att&ck-v'):
@@ -126,7 +126,7 @@ def get_version_from_tag(name, pattern='att&ck-v'):
126126
compressed = gzip_compress(json.dumps(attack_data, sort_keys=True))
127127

128128
if save:
129-
new_path = Path(get_etc_path(f'attack-v{latest_version}.json.gz'))
129+
new_path = get_etc_path(f'attack-v{latest_version}.json.gz')
130130
new_path.write_bytes(compressed)
131131
attack_path.unlink()
132132
print(f'Replaced file: {attack_path} with {new_path}')

detection_rules/devtools.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def dev_group():
9090
def build_release(config_file, update_version_lock: bool, generate_navigator: bool, generate_docs: str,
9191
update_message: str, release=None, verbose=True):
9292
"""Assemble all the rules into Kibana-ready release files."""
93-
config = load_dump(config_file)['package']
93+
config = load_dump(str(config_file))['package']
9494
registry_data = config['registry_data']
9595

9696
if generate_navigator:
@@ -313,7 +313,7 @@ def prune_staging_area(target_stack_version: str, dry_run: bool, exception_list:
313313
continue
314314

315315
# it's a change to a rule file, load it and check the version
316-
if str(change.path.absolute()).startswith(RULES_DIR) and change.path.suffix == ".toml":
316+
if str(change.path.absolute()).startswith(str(RULES_DIR)) and change.path.suffix == ".toml":
317317
# bypass TOML validation in case there were schema changes
318318
dict_contents = RuleCollection.deserialize_toml_string(change.read())
319319
min_stack_version: Optional[str] = dict_contents.get("metadata", {}).get("min_stack_version")
@@ -441,7 +441,7 @@ def integrations_pr(ctx: click.Context, local_repo: str, token: str, draft: bool
441441
stack_version = Package.load_configs()["name"]
442442
package_version = Package.load_configs()["registry_data"]["version"]
443443

444-
release_dir = Path(RELEASE_DIR) / stack_version / "fleet" / package_version
444+
release_dir = RELEASE_DIR / stack_version / "fleet" / package_version
445445
message = f"[Security Rules] Update security rules package to v{package_version}"
446446

447447
if not release_dir.exists():
@@ -581,7 +581,7 @@ def license_check(ctx, ignore_directory):
581581
"""Check that all code files contain a valid license."""
582582
ignore_directory += ("env",)
583583
failed = False
584-
base_path = Path(get_path())
584+
base_path = get_path()
585585

586586
for path in base_path.rglob('*.py'):
587587
relative_path = path.relative_to(base_path)
@@ -622,7 +622,7 @@ def test_version_lock(branches: tuple, remote: str):
622622

623623
finally:
624624
diff = git('--no-pager', 'diff', get_etc_path('version.lock.json'))
625-
outfile = Path(get_path()).joinpath('lock-diff.txt')
625+
outfile = get_path() / 'lock-diff.txt'
626626
outfile.write_text(diff)
627627
click.echo(f'diff saved to {outfile}')
628628

@@ -740,7 +740,7 @@ def deprecate_rule(ctx: click.Context, rule_file: Path):
740740
deprecation_date=today,
741741
maturity='deprecated')
742742
contents = dataclasses.replace(rule.contents, metadata=new_meta)
743-
new_rule = TOMLRule(contents=contents, path=Path(deprecated_path))
743+
new_rule = TOMLRule(contents=contents, path=deprecated_path)
744744
new_rule.save_toml()
745745

746746
# remove the old rule

detection_rules/ecs.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import json
1010
import os
1111
import shutil
12-
from pathlib import Path
1312

1413
import eql
1514
import eql.types
@@ -88,7 +87,7 @@ def get_max_version(include_master=False):
8887
versions = get_schema_map().keys()
8988

9089
if include_master and any([v.startswith('master') for v in versions]):
91-
return list(Path(ECS_SCHEMAS_DIR).glob('master*'))[0].name
90+
return list(ECS_SCHEMAS_DIR.glob('master*'))[0].name
9291

9392
return str(max([Version.parse(v) for v in versions if not v.startswith('master')]))
9493

@@ -303,9 +302,9 @@ def download_endpoint_schemas(target: str, overwrite: bool = True) -> None:
303302
flattened[f"{root_name}.{f['name']}"] = f['type']
304303

305304
# save schema to disk
306-
Path(ENDPOINT_SCHEMAS_DIR).mkdir(parents=True, exist_ok=True)
305+
ENDPOINT_SCHEMAS_DIR.mkdir(parents=True, exist_ok=True)
307306
compressed = gzip_compress(json.dumps(flattened, sort_keys=True, cls=DateTimeEncoder))
308-
new_path = Path(ENDPOINT_SCHEMAS_DIR) / f"endpoint_{target}.json.gz"
307+
new_path = ENDPOINT_SCHEMAS_DIR / f"endpoint_{target}.json.gz"
309308
if overwrite:
310309
shutil.rmtree(new_path, ignore_errors=True)
311310
with open(new_path, 'wb') as f:

detection_rules/endgame.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@
77
import json
88
import shutil
99
import sys
10-
from pathlib import Path
1110

1211
import eql
1312

1413
from .utils import ETC_DIR, DateTimeEncoder, cached, gzip_compress, read_gzip
1514

16-
ENDGAME_SCHEMA_DIR = Path(ETC_DIR) / "endgame_schemas"
15+
ENDGAME_SCHEMA_DIR = ETC_DIR / "endgame_schemas"
1716

1817

1918
class EndgameSchemaManager:

detection_rules/integrations.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
from .utils import cached, get_etc_path, read_gzip, unzip
2626
from .schemas import definitions
2727

28-
MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz'))
29-
SCHEMA_FILE_PATH = Path(get_etc_path('integration-schemas.json.gz'))
28+
MANIFEST_FILE_PATH = get_etc_path('integration-manifests.json.gz')
29+
SCHEMA_FILE_PATH = get_etc_path('integration-schemas.json.gz')
3030
_notified_integrations = set()
3131

3232

detection_rules/main.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def generate_rules_index(ctx: click.Context, query, overwrite, save_files=True):
7979
bulk_upload_docs, importable_rules_docs = package.create_bulk_index_body()
8080

8181
if save_files:
82-
path = Path(get_path('enriched-rule-indexes', package_hash))
82+
path = get_path('enriched-rule-indexes', package_hash)
8383
path.mkdir(parents=True, exist_ok=overwrite)
8484
bulk_upload_docs.dump(path.joinpath('enriched-rules-index-uploadable.ndjson'), sort_keys=True)
8585
importable_rules_docs.dump(path.joinpath('enriched-rules-index-importable.ndjson'), sort_keys=True)
@@ -431,7 +431,7 @@ def create_dnstwist_index(ctx: click.Context, input_file: click.Path):
431431
es_client: Elasticsearch = ctx.obj['es']
432432

433433
click.echo(f'Attempting to load dnstwist data from {input_file}')
434-
dnstwist_data: dict = load_dump(input_file)
434+
dnstwist_data: dict = load_dump(str(input_file))
435435
click.echo(f'{len(dnstwist_data)} records loaded')
436436

437437
original_domain = next(r['domain-name'] for r in dnstwist_data if r.get('fuzzer', '') == 'original*')
@@ -496,10 +496,10 @@ def create_dnstwist_index(ctx: click.Context, input_file: click.Path):
496496
@click.argument('author')
497497
def prep_rule(author: str):
498498
"""Prep the detection threat match rule for dnstwist data with a rule_id and author."""
499-
rule_template_file = Path(get_etc_path('rule_template_typosquatting_domain.json'))
499+
rule_template_file = get_etc_path('rule_template_typosquatting_domain.json')
500500
template_rule = json.loads(rule_template_file.read_text())
501501
template_rule.update(author=[author], rule_id=str(uuid4()))
502-
updated_rule = Path(get_path('rule_typosquatting_domain.ndjson'))
502+
updated_rule = get_path('rule_typosquatting_domain.ndjson')
503503
updated_rule.write_text(json.dumps(template_rule, sort_keys=True))
504504
click.echo(f'Rule saved to: {updated_rule}. Import this to Kibana to create alerts on all dnstwist-* indexes')
505505
click.echo('Note: you only need to import and enable this rule one time for all dnstwist-* indexes')

detection_rules/mappings.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
# 2.0.
55

66
"""RTA to rule mappings."""
7-
import os
87
from collections import defaultdict
8+
from pathlib import Path
99

1010
from rta import get_available_tests
1111

@@ -71,9 +71,9 @@ def get_rta_files(self, rta_list=None, rule_ids=None):
7171

7272
for rta_name in rta_list:
7373
# rip off the extension and add .py
74-
rta_name, _ = os.path.splitext(os.path.basename(rta_name))
75-
rta_path = os.path.abspath(os.path.join(RTA_DIR, rta_name + ".py"))
76-
if os.path.exists(rta_path):
74+
rta_name = Path(rta_name).stem
75+
rta_path = (RTA_DIR / rta_name).with_suffix(".py").resolve()
76+
if rta_path.exists():
7777
rta_files.add(rta_path)
7878

7979
return list(sorted(rta_files))

detection_rules/misc.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ def load_current_package_version() -> str:
271271

272272

273273
def get_default_config() -> Optional[Path]:
274-
return next(Path(get_path()).glob('.detection-rules-cfg.*'), None)
274+
return next(get_path().glob('.detection-rules-cfg.*'), None)
275275

276276

277277
@cached

detection_rules/ml.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from .utils import get_path, unzip_to_dict
2828

2929

30-
ML_PATH = Path(get_path('machine-learning'))
30+
ML_PATH = get_path('machine-learning')
3131

3232

3333
def info_from_tag(tag: str) -> (Literal['ml'], definitions.MachineLearningType, str, int):

detection_rules/packaging.py

+15-15
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def filter_rule(rule: TOMLRule, config_filter: dict, exclude_fields: Optional[di
6969
return True
7070

7171

72-
CURRENT_RELEASE_PATH = Path(RELEASE_DIR) / load_current_package_version()
72+
CURRENT_RELEASE_PATH = RELEASE_DIR / load_current_package_version()
7373

7474

7575
class Package(object):
@@ -99,8 +99,8 @@ def __init__(self, rules: RuleCollection, name: str, release: Optional[bool] = F
9999

100100
@classmethod
101101
def load_configs(cls):
102-
"""Load configs from packages.yaml."""
103-
return load_etc_dump(PACKAGE_FILE)['package']
102+
"""Load configs from packages.yml."""
103+
return load_etc_dump(str(PACKAGE_FILE))['package']
104104

105105
@staticmethod
106106
def _package_kibana_notice_file(save_dir):
@@ -175,17 +175,17 @@ def get_consolidated(self, as_api=True):
175175

176176
def save(self, verbose=True):
177177
"""Save a package and all artifacts."""
178-
save_dir = os.path.join(RELEASE_DIR, self.name)
179-
rules_dir = os.path.join(save_dir, 'rules')
180-
extras_dir = os.path.join(save_dir, 'extras')
178+
save_dir = RELEASE_DIR / self.name
179+
rules_dir = save_dir / 'rules'
180+
extras_dir = save_dir / 'extras'
181181

182182
# remove anything that existed before
183183
shutil.rmtree(save_dir, ignore_errors=True)
184-
os.makedirs(rules_dir, exist_ok=True)
185-
os.makedirs(extras_dir, exist_ok=True)
184+
rules_dir.mkdir(parents=True, exist_ok=True)
185+
extras_dir.mkdir(parents=True, exist_ok=True)
186186

187187
for rule in self.rules:
188-
rule.save_json(Path(rules_dir).joinpath(rule.path.name).with_suffix('.json'))
188+
rule.save_json(rules_dir / Path(rule.path.name).with_suffix('.json'))
189189

190190
self._package_kibana_notice_file(rules_dir)
191191
self._package_kibana_index_file(rules_dir)
@@ -195,15 +195,15 @@ def save(self, verbose=True):
195195
self.save_release_files(extras_dir, self.changed_ids, self.new_ids, self.removed_ids)
196196

197197
# zip all rules only and place in extras
198-
shutil.make_archive(os.path.join(extras_dir, self.name), 'zip', root_dir=os.path.dirname(rules_dir),
199-
base_dir=os.path.basename(rules_dir))
198+
shutil.make_archive(extras_dir / self.name, 'zip', root_dir=rules_dir.parent, base_dir=rules_dir.name)
200199

201200
# zip everything and place in release root
202-
shutil.make_archive(os.path.join(save_dir, '{}-all'.format(self.name)), 'zip',
203-
root_dir=os.path.dirname(extras_dir), base_dir=os.path.basename(extras_dir))
201+
shutil.make_archive(
202+
save_dir / f"{self.name}-all", "zip", root_dir=extras_dir.parent, base_dir=extras_dir.name
203+
)
204204

205205
if verbose:
206-
click.echo('Package saved to: {}'.format(save_dir))
206+
click.echo(f'Package saved to: {save_dir}')
207207

208208
def export(self, outfile, downgrade_version=None, verbose=True, skip_unsupported=False):
209209
"""Export rules into a consolidated ndjson file."""
@@ -419,7 +419,7 @@ def _generate_registry_package(self, save_dir):
419419
asset_path = rules_dir / f'{asset["id"]}.json'
420420
asset_path.write_text(json.dumps(asset, indent=4, sort_keys=True), encoding="utf-8")
421421

422-
notice_contents = Path(NOTICE_FILE).read_text()
422+
notice_contents = NOTICE_FILE.read_text()
423423
readme_text = textwrap.dedent("""
424424
# Prebuilt Security Detection Rules
425425

detection_rules/rule_loader.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
from .schemas import definitions
2525
from .utils import cached, get_path
2626

27-
DEFAULT_RULES_DIR = Path(get_path("rules"))
28-
DEFAULT_BBR_DIR = Path(get_path("rules_building_block"))
27+
DEFAULT_RULES_DIR = get_path("rules")
28+
DEFAULT_BBR_DIR = get_path("rules_building_block")
2929
DEFAULT_DEPRECATED_DIR = DEFAULT_RULES_DIR / '_deprecated'
3030
RTA_DIR = get_path("rta")
3131
FILE_PATTERN = r'^([a-z0-9_])+\.(json|toml)$'

detection_rules/schemas/__init__.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
# 2.0.
55
import json
66
from collections import OrderedDict
7-
from pathlib import Path
87
from typing import List, Optional
98
from typing import OrderedDict as OrderedDictType
109

@@ -29,7 +28,7 @@
2928
"all_versions",
3029
)
3130

32-
SCHEMA_DIR = Path(get_etc_path("api_schemas"))
31+
SCHEMA_DIR = get_etc_path("api_schemas")
3332
migrations = {}
3433

3534

@@ -54,7 +53,7 @@ def wrapper(f):
5453

5554
@cached
5655
def get_schema_file(version: Version, rule_type: str) -> dict:
57-
path = Path(SCHEMA_DIR) / str(version) / f"{version}.{rule_type}.json"
56+
path = SCHEMA_DIR / str(version) / f"{version}.{rule_type}.json"
5857

5958
if not path.exists():
6059
raise ValueError(f"Unsupported rule type {rule_type}. Unable to downgrade to {version}")

0 commit comments

Comments
 (0)