Skip to content

Commit

Permalink
Keep crypto-policies actors from el8toel9 in el9toel10
Browse files Browse the repository at this point in the history
There are two actors related to crypto-policies in el8toel9 and their
corresponing models. They are still relevant for el9toel10 upgrade
path but we cannot easily store them in common because they are not
relevant for el7toel8 upgrade path. Therefore I am copying mode and
both actors from el8toel9 to el9toel10 with a tiny documentation change
in cryptopoliciescheck actor.

Signed-off-by: Ondrej Moris <[email protected]>
  • Loading branch information
The-Mule committed Jan 29, 2025
1 parent c5accf4 commit 1a7977d
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 0 deletions.
25 changes: 25 additions & 0 deletions repos/system_upgrade/el9toel10/actors/cryptopoliciescheck/actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from leapp.actors import Actor
from leapp.libraries.actor import cryptopoliciescheck
from leapp.models import CryptoPolicyInfo, Report, TargetUserSpacePreupgradeTasks
from leapp.tags import ChecksPhaseTag, IPUWorkflowTag


class CryptoPoliciesCheck(Actor):
"""
This actor consumes previously gathered information about crypto policies on the source
system and does two things:
* warns user if the custom/legacy policy is used and whether there is time to review it
* prepares the container by making sure it will have installed the tools for managing
crypto policies and the custom policies are copied over to the intermediate and target
systems
"""

name = 'crypto_policies_check'
consumes = (CryptoPolicyInfo,)
produces = (TargetUserSpacePreupgradeTasks, Report,)
tags = (IPUWorkflowTag, ChecksPhaseTag,)

def process(self):
cryptopoliciescheck.process(self.consume(CryptoPolicyInfo))

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from leapp import reporting
from leapp.exceptions import StopActorExecutionError
from leapp.libraries.stdlib import api
from leapp.models import CopyFile, TargetUserSpacePreupgradeTasks


def _get_files_to_copy(cpi):
return [f.path for f in cpi.custom_policies + cpi.custom_modules]


def process(cpi_messages):
cpi = next(cpi_messages, None)
if list(cpi_messages):
api.current_logger().warning('Unexpectedly received more than one CryptoPolicyInfo message.')
if not cpi:
raise StopActorExecutionError(
'Could not check crypto policies status',
details={'details': 'No CryptoPolicyInfo facts found.'}
)

if cpi.current_policy != 'DEFAULT':
# If we have to change the crypto policies inside the target userspace container,
# we need update-crypto-policies script inside as well as potential custom policies files
files = [CopyFile(src=f) for f in _get_files_to_copy(cpi)]
api.produce(TargetUserSpacePreupgradeTasks(install_rpms=['crypto-policies-scripts'],
copy_files=files))

# When non-default crypto policy is used, it might be outdated. Recommend user to revisit.
# exceptions are here the FIPS and FUTURE policies, which are more future-proof.
if cpi.current_policy in ('FIPS', 'FUTURE'):
return
reporting.create_report([
reporting.Title('System-wide crypto policy is set to non-DEFAULT policy'),
reporting.Summary((
"The system-wide crypto policies are set to `{}` value. This might be"
" outdated decision, the custom crypto policy might be outdated and no"
" longer meeting the security standards. Please, review the current crypto"
" policies settings. If this is intentional and up-to-date, you can ignore"
" this message. The custom crypto policy will be configured on the updated"
" system."
).format(cpi.current_policy)),
reporting.Severity(reporting.Severity.MEDIUM),
reporting.Groups([reporting.Groups.SECURITY, reporting.Groups.SANITY]),
reporting.Remediation(hint="Review the current policy in /etc/crypto-policies/state/CURRENT.pol"),
reporting.RelatedResource('package', 'crypto-policies'),
)
])
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from leapp.models import (
CopyFile,
CryptoPolicyInfo,
CustomCryptoPolicy,
CustomCryptoPolicyModule,
Report,
TargetUserSpacePreupgradeTasks
)


def test_actor_execution_default(current_actor_context):
current_actor_context.feed(
CryptoPolicyInfo(
current_policy="DEFAULT",
custom_policies=[],
custom_modules=[],
)
)
current_actor_context.run()
assert not current_actor_context.consume(TargetUserSpacePreupgradeTasks)


def test_actor_execution_legacy(current_actor_context):
current_actor_context.feed(
CryptoPolicyInfo(
current_policy="LEGACY",
custom_policies=[],
custom_modules=[],
)
)
current_actor_context.run()

assert current_actor_context.consume(TargetUserSpacePreupgradeTasks)
u = current_actor_context.consume(TargetUserSpacePreupgradeTasks)[0]
assert u.install_rpms == ['crypto-policies-scripts']
assert u.copy_files == []

assert current_actor_context.consume(Report)


def test_actor_execution_custom(current_actor_context):
current_actor_context.feed(
CryptoPolicyInfo(
current_policy="CUSTOM:SHA2",
custom_policies=[
CustomCryptoPolicy(name='CUSTOM', path='/etc/crypto-policies/policies/CUSTOM.pol'),
],
custom_modules=[
CustomCryptoPolicyModule(name='SHA2', path='/etc/crypto-policies/policies/modules/SHA2.pmod'),
],
)
)
current_actor_context.run()

assert current_actor_context.consume(TargetUserSpacePreupgradeTasks)
u = current_actor_context.consume(TargetUserSpacePreupgradeTasks)[0]
assert u.install_rpms == ['crypto-policies-scripts']
assert u.copy_files == [
CopyFile(src='/etc/crypto-policies/policies/CUSTOM.pol'),
CopyFile(src='/etc/crypto-policies/policies/modules/SHA2.pmod'),
]

assert current_actor_context.consume(Report)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from leapp.libraries.actor.cryptopoliciescheck import _get_files_to_copy
from leapp.models import CryptoPolicyInfo, CustomCryptoPolicy, CustomCryptoPolicyModule


def test_get_files_to_copy():
cpi = CryptoPolicyInfo(current_policy="DEFAULT", custom_policies=[], custom_modules=[])
assert _get_files_to_copy(cpi) == []

cpi.custom_policies.append(CustomCryptoPolicy(name="CUSTOM", path="/path/to/CUSTOM.pol"))
assert _get_files_to_copy(cpi) == ["/path/to/CUSTOM.pol"]

cpi.custom_modules.append(CustomCryptoPolicyModule(name="FIX", path="/path/to/FIX.mpol"))
assert _get_files_to_copy(cpi) == ["/path/to/CUSTOM.pol", "/path/to/FIX.mpol"]
26 changes: 26 additions & 0 deletions repos/system_upgrade/el9toel10/actors/scancryptopolicies/actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from leapp.actors import Actor
from leapp.libraries.actor import scancryptopolicies
from leapp.models import CryptoPolicyInfo
from leapp.tags import FactsPhaseTag, IPUWorkflowTag


class ScanCryptoPolicies(Actor):
"""
Scan information about system wide set crypto policies including:
* current crypto policy
* installed custom crypto policies
This information is, later in the process useful for the following:
* copy the custom crypto policies files
* notify user about the current setting and to review whether the policy still makes sense
* it might be outdated and no longer meet the best security practices
* if it is based on system policy such as DEFAULT, it might cause unexpected changes
"""

name = 'scancryptopolicies'
consumes = ()
produces = (CryptoPolicyInfo,)
tags = (IPUWorkflowTag, FactsPhaseTag)

def process(self):
scancryptopolicies.process()
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import os

from leapp.exceptions import StopActorExecutionError
from leapp.libraries.stdlib import api, run
from leapp.models import CryptoPolicyInfo, CustomCryptoPolicy, CustomCryptoPolicyModule

CRYPTO_CURRENT_STATE_FILE = '/etc/crypto-policies/state/current'
CRYPTO_POLICIES_POLICY_DIRS = ('/etc/crypto-policies/policies',
'/usr/share/crypto-policies/policies',)
CRYPTO_POLICIES_MODULES_DIRS = ('/etc/crypto-policies/policies/modules',
'/usr/share/crypto-policies/policies/modules',)


def read_current_policy(file):
if not os.path.exists(file):
# NOTE(pstodulk) just seatbelt, I do not expect the file is not present
# skipping tests
raise StopActorExecutionError(
'File not found: {}'.format(file),
details={'details:': 'Cannot check the current set crypto policies.'}
)
current = 'DEFAULT'
with open(file) as fp:
current = fp.read().strip()
return current


def _get_name_from_file(file):
"""This is just stripping the path and the extension"""
base = os.path.basename(file)
return os.path.splitext(base)[0]


def find_rpm_untracked(files):
"""Check if the list of files is tracked by RPM"""
if not files:
return []
try:
res = run(['rpm', '-Vf', ] + files, split=True, checked=False)
except OSError as err:
error = 'Failed to invoke rpm to check untracked files: {}'.format(str(err))
api.current_logger().error(error)
return []

# return only untracked files from the list
out = []
for file in files:
exp = "file {} is not owned by any package".format(file)
if exp in res['stdout']:
out.append(file)
return out


def read_policy_dirs(dirs, obj, extension):
"""List files with given extension in given directories. Returns only the ones that are not tracked by RPM"""
files = []
# find all policy files
for d in dirs:
for file in os.listdir(d):
file = os.path.join(d, file)
if not os.path.isfile(file) or not file.endswith(extension):
continue
files.append(file)
# now, check which are not tracked by RPM:
files = find_rpm_untracked(files)
out = []
for file in files:
name = _get_name_from_file(file)
out.append(obj(name=name, path=file))

return out


def process():
current = read_current_policy(CRYPTO_CURRENT_STATE_FILE)

policies = read_policy_dirs(CRYPTO_POLICIES_POLICY_DIRS, CustomCryptoPolicy, ".pol")
modules = read_policy_dirs(CRYPTO_POLICIES_MODULES_DIRS, CustomCryptoPolicyModule, ".pmod")

api.produce(CryptoPolicyInfo(current_policy=current,
custom_policies=policies,
custom_modules=modules))
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from leapp.models import CryptoPolicyInfo


def test_actor_execution(current_actor_context):
current_actor_context.run()
assert current_actor_context.consume(CryptoPolicyInfo)
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import os
import tempfile

import pytest

from leapp.exceptions import StopActorExecutionError
from leapp.libraries.actor.scancryptopolicies import (
_get_name_from_file,
find_rpm_untracked,
read_current_policy,
read_policy_dirs
)
from leapp.models import CustomCryptoPolicy, CustomCryptoPolicyModule

NOFILE = "/tmp/non-existing-file-should-not-really-be-here"


def test_get_name_from_file():
assert _get_name_from_file("/path/name.extension") == "name"
assert _get_name_from_file("/othername.e") == "othername"
assert _get_name_from_file("/other.name.e") == "other.name"
assert _get_name_from_file("/some/long/path/other.name") == "other"
assert _get_name_from_file("/some/long/path/no_extension") == "no_extension"


def test_find_rpm_untracked(current_actor_context):
# this is tracked
files = ["/tmp/"]
assert find_rpm_untracked(files) == []
files = ["/etc/crypto-policies/config"]
assert find_rpm_untracked(files) == []

# the tempfile is not tracked by RPM
with tempfile.NamedTemporaryFile(delete=False) as f:
files = [f.name]
assert find_rpm_untracked(files) == [f.name]

# not existing files are ignored
files = [NOFILE]
assert find_rpm_untracked(files) == []

# combinations should yield expected results too
files = ["/tmp", f.name, NOFILE]
assert find_rpm_untracked(files) == [f.name]
# regardless the order
files = [NOFILE, f.name, "/tmp"]
assert find_rpm_untracked(files) == [f.name]


def test_read_current_policy():
with pytest.raises(StopActorExecutionError):
assert read_current_policy(NOFILE)

with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(b'DEFAULT:SHA1')
f.flush()
assert read_current_policy(f.name) == "DEFAULT:SHA1"

f.seek(0)
f.write(b' DEFAULT:SHA1 \n\n ')
f.flush()
assert read_current_policy(f.name) == "DEFAULT:SHA1"


def test_read_policy_dirs(current_actor_context):
with tempfile.TemporaryDirectory() as dir1:
# empty
files = read_policy_dirs([dir1], CustomCryptoPolicy, ".pol")
assert files == []

# first policy module
path1 = os.path.join(dir1, "policy.mpol")
with open(path1, "x") as f:
f.write('test')
files = read_policy_dirs([dir1], CustomCryptoPolicy, ".pol")
assert files == []
files = read_policy_dirs([dir1], CustomCryptoPolicyModule, ".mpol")
assert files == [CustomCryptoPolicyModule(name="policy", path=path1)]

with tempfile.TemporaryDirectory() as dir2:
files = read_policy_dirs([dir1], CustomCryptoPolicy, ".pol")
assert files == []
files = read_policy_dirs([dir1, dir2], CustomCryptoPolicyModule, ".mpol")
assert files == [CustomCryptoPolicyModule(name="policy", path=path1)]

# first policy file
path2 = os.path.join(dir2, "mypolicy.pol")
with open(path2, "x") as f:
f.write('test2')
# second policy file
path3 = os.path.join(dir2, "other.pol")
with open(path3, "x") as f:
f.write('test3')

files = read_policy_dirs([dir1, dir2], dict, ".pol")
assert len(files) == 2
assert dict(name="mypolicy", path=path2) in files
assert dict(name="other", path=path3) in files
files = read_policy_dirs([dir1, dir2], CustomCryptoPolicyModule, ".mpol")
assert files == [CustomCryptoPolicyModule(name="policy", path=path1)]

files = read_policy_dirs([dir1], CustomCryptoPolicy, ".pol")
assert files == []
files = read_policy_dirs([dir1], CustomCryptoPolicyModule, ".mpol")
assert files == [CustomCryptoPolicyModule(name="policy", path=path1)]
Loading

0 comments on commit 1a7977d

Please sign in to comment.