Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move crypto-policies actors to common to execute it also in el9toel10 workflow #1337

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from leapp.actors import Actor
from leapp.libraries.actor import cryptopoliciescheck
from leapp.libraries.common.config import version
from leapp.models import CryptoPolicyInfo, Report, TargetUserSpacePreupgradeTasks
from leapp.tags import ChecksPhaseTag, IPUWorkflowTag

Expand All @@ -21,4 +22,8 @@ class CryptoPoliciesCheck(Actor):
tags = (IPUWorkflowTag, ChecksPhaseTag,)

def process(self):
# there are no crypto policies in EL 7
if int(version.get_target_major_version()) < 9:
return

cryptopoliciescheck.process(self.consume(CryptoPolicyInfo))
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import pytest

from leapp.libraries.common.config import version
from leapp.models import (
CopyFile,
CryptoPolicyInfo,
CustomCryptoPolicy,
CustomCryptoPolicyModule,
Report,
TargetUserSpacePreupgradeTasks
)


@pytest.mark.parametrize(('target_version'), [
('8'),
('9'),
('10'),
])
def test_actor_execution_default(monkeypatch, current_actor_context, target_version):
monkeypatch.setattr(version, 'get_target_major_version', lambda: target_version)
current_actor_context.feed(
CryptoPolicyInfo(
current_policy="DEFAULT",
custom_policies=[],
custom_modules=[],
)
)
current_actor_context.run()
assert not current_actor_context.consume(TargetUserSpacePreupgradeTasks)


@pytest.mark.parametrize(('target_version', 'should_run'), [
('8', False),
('9', True),
('10', True),
])
def test_actor_execution_legacy(monkeypatch, current_actor_context, target_version, should_run):
monkeypatch.setattr(version, 'get_target_major_version', lambda: target_version)
current_actor_context.feed(
CryptoPolicyInfo(
current_policy="LEGACY",
custom_policies=[],
custom_modules=[],
)
)
current_actor_context.run()

if should_run:
assert current_actor_context.consume(TargetUserSpacePreupgradeTasks)
u = current_actor_context.consume(TargetUserSpacePreupgradeTasks)[0]
assert u.install_rpms == ['crypto-policies-scripts']
assert u.copy_files == []

assert current_actor_context.consume(Report)
else:
assert not current_actor_context.consume(TargetUserSpacePreupgradeTasks)


@pytest.mark.parametrize(('target_version', 'should_run'), [
('8', False),
('9', True),
('10', True),
])
def test_actor_execution_custom(monkeypatch, current_actor_context, target_version, should_run):
monkeypatch.setattr(version, 'get_target_major_version', lambda: target_version)
current_actor_context.feed(
CryptoPolicyInfo(
current_policy="CUSTOM:SHA2",
custom_policies=[
CustomCryptoPolicy(name='CUSTOM', path='/etc/crypto-policies/policies/CUSTOM.pol'),
],
custom_modules=[
CustomCryptoPolicyModule(name='SHA2', path='/etc/crypto-policies/policies/modules/SHA2.pmod'),
],
)
)
current_actor_context.run()

if should_run:
assert current_actor_context.consume(TargetUserSpacePreupgradeTasks)
u = current_actor_context.consume(TargetUserSpacePreupgradeTasks)[0]
assert u.install_rpms == ['crypto-policies-scripts']
assert u.copy_files == [
CopyFile(src='/etc/crypto-policies/policies/CUSTOM.pol'),
CopyFile(src='/etc/crypto-policies/policies/modules/SHA2.pmod'),
]

assert current_actor_context.consume(Report)
else:
assert not current_actor_context.consume(TargetUserSpacePreupgradeTasks)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from leapp.actors import Actor
from leapp.libraries.actor import scancryptopolicies
from leapp.libraries.common.config import version
from leapp.models import CryptoPolicyInfo
from leapp.tags import FactsPhaseTag, IPUWorkflowTag

Expand All @@ -23,4 +24,8 @@ class ScanCryptoPolicies(Actor):
tags = (IPUWorkflowTag, FactsPhaseTag)

def process(self):
# there are no crypto policies in EL 7
if int(version.get_target_major_version()) < 9:
return

scancryptopolicies.process()
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@
'/usr/share/crypto-policies/policies/modules',)


def read_current_policy(file):
if not os.path.exists(file):
def read_current_policy(filename):
if not os.path.exists(filename):
# NOTE(pstodulk) just seatbelt, I do not expect the file is not present
# skipping tests
raise StopActorExecutionError(
'File not found: {}'.format(file),
'File not found: {}'.format(filename),
details={'details:': 'Cannot check the current set crypto policies.'}
)
current = 'DEFAULT'
with open(file) as fp:
with open(filename) as fp:
current = fp.read().strip()
return current


def _get_name_from_file(file):
def _get_name_from_file(filename):
"""This is just stripping the path and the extension"""
base = os.path.basename(file)
base = os.path.basename(filename)
return os.path.splitext(base)[0]


Expand All @@ -44,10 +44,10 @@ def find_rpm_untracked(files):

# return only untracked files from the list
out = []
for file in files:
exp = "file {} is not owned by any package".format(file)
for filename in files:
exp = "file {} is not owned by any package".format(filename)
if exp in res['stdout']:
out.append(file)
out.append(filename)
return out


Expand All @@ -56,17 +56,17 @@ def read_policy_dirs(dirs, obj, extension):
files = []
# find all policy files
for d in dirs:
for file in os.listdir(d):
file = os.path.join(d, file)
if not os.path.isfile(file) or not file.endswith(extension):
for filename in os.listdir(d):
filepath = os.path.join(d, filename)
if not os.path.isfile(filepath) or not filepath.endswith(extension):
continue
files.append(file)
files.append(filepath)
# now, check which are not tracked by RPM:
files = find_rpm_untracked(files)
out = []
for file in files:
name = _get_name_from_file(file)
out.append(obj(name=name, path=file))
for filename in files:
name = _get_name_from_file(filename)
out.append(obj(name=name, path=filename))

return out

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import distro

from leapp.libraries.common.config import version
from leapp.models import CryptoPolicyInfo


def test_actor_execution(monkeypatch, current_actor_context):
target_version = int(distro.major_version()) + 1
monkeypatch.setattr(version, 'get_target_major_version', lambda: "{}".format(target_version))
current_actor_context.run()
if target_version > 8:
assert current_actor_context.consume(CryptoPolicyInfo)
else:
assert not current_actor_context.consume(CryptoPolicyInfo)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import shutil
import tempfile

import pytest
Expand Down Expand Up @@ -30,21 +31,26 @@ def test_find_rpm_untracked(current_actor_context):
files = ["/etc/crypto-policies/config"]
assert find_rpm_untracked(files) == []

# the tempfile is not tracked by RPM
with tempfile.NamedTemporaryFile(delete=False) as f:
files = [f.name]
assert find_rpm_untracked(files) == [f.name]
# python2 compatibility :/
dirpath = tempfile.mkdtemp()

try:
# the tempfile is not tracked by RPM
files = [dirpath]
assert find_rpm_untracked(files) == [dirpath]

# not existing files are ignored
files = [NOFILE]
assert find_rpm_untracked(files) == []

# combinations should yield expected results too
files = ["/tmp", f.name, NOFILE]
assert find_rpm_untracked(files) == [f.name]
files = ["/tmp", dirpath, NOFILE]
assert find_rpm_untracked(files) == [dirpath]
# regardless the order
files = [NOFILE, f.name, "/tmp"]
assert find_rpm_untracked(files) == [f.name]
files = [NOFILE, dirpath, "/tmp"]
assert find_rpm_untracked(files) == [dirpath]
finally:
shutil.rmtree(dirpath)


def test_read_current_policy():
Expand All @@ -63,43 +69,53 @@ def test_read_current_policy():


def test_read_policy_dirs(current_actor_context):
with tempfile.TemporaryDirectory() as dir1:
# python2 compatibility :/
dirpath = tempfile.mkdtemp()

try:
# empty
files = read_policy_dirs([dir1], CustomCryptoPolicy, ".pol")
files = read_policy_dirs([dirpath], CustomCryptoPolicy, ".pol")
assert files == []

# first policy module
path1 = os.path.join(dir1, "policy.mpol")
with open(path1, "x") as f:
path1 = os.path.join(dirpath, "policy.mpol")
with open(path1, "w") as f:
f.write('test')
files = read_policy_dirs([dir1], CustomCryptoPolicy, ".pol")
files = read_policy_dirs([dirpath], CustomCryptoPolicy, ".pol")
assert files == []
files = read_policy_dirs([dir1], CustomCryptoPolicyModule, ".mpol")
files = read_policy_dirs([dirpath], CustomCryptoPolicyModule, ".mpol")
assert files == [CustomCryptoPolicyModule(name="policy", path=path1)]

with tempfile.TemporaryDirectory() as dir2:
files = read_policy_dirs([dir1], CustomCryptoPolicy, ".pol")
# python2 compatibility :/
dirpath2 = tempfile.mkdtemp()

try:
files = read_policy_dirs([dirpath], CustomCryptoPolicy, ".pol")
assert files == []
files = read_policy_dirs([dir1, dir2], CustomCryptoPolicyModule, ".mpol")
files = read_policy_dirs([dirpath, dirpath2], CustomCryptoPolicyModule, ".mpol")
assert files == [CustomCryptoPolicyModule(name="policy", path=path1)]

# first policy file
path2 = os.path.join(dir2, "mypolicy.pol")
with open(path2, "x") as f:
path2 = os.path.join(dirpath2, "mypolicy.pol")
with open(path2, "w") as f:
f.write('test2')
# second policy file
path3 = os.path.join(dir2, "other.pol")
with open(path3, "x") as f:
path3 = os.path.join(dirpath2, "other.pol")
with open(path3, "w") as f:
f.write('test3')

files = read_policy_dirs([dir1, dir2], dict, ".pol")
files = read_policy_dirs([dirpath, dirpath2], dict, ".pol")
assert len(files) == 2
assert dict(name="mypolicy", path=path2) in files
assert dict(name="other", path=path3) in files
files = read_policy_dirs([dir1, dir2], CustomCryptoPolicyModule, ".mpol")
files = read_policy_dirs([dirpath, dirpath2], CustomCryptoPolicyModule, ".mpol")
assert files == [CustomCryptoPolicyModule(name="policy", path=path1)]
finally:
shutil.rmtree(dirpath2)

files = read_policy_dirs([dir1], CustomCryptoPolicy, ".pol")
files = read_policy_dirs([dirpath], CustomCryptoPolicy, ".pol")
assert files == []
files = read_policy_dirs([dir1], CustomCryptoPolicyModule, ".mpol")
files = read_policy_dirs([dirpath], CustomCryptoPolicyModule, ".mpol")
assert files == [CustomCryptoPolicyModule(name="policy", path=path1)]
finally:
shutil.rmtree(dirpath)

This file was deleted.

This file was deleted.