From 78809ab358a8feb5efe5670fe9a2c56eaa632f31 Mon Sep 17 00:00:00 2001 From: Guillaume Chinal Date: Wed, 19 Feb 2025 23:44:07 +0100 Subject: [PATCH] custom-persist: add support of metadata --- qubes/ext/custom_persist.py | 56 +++++++++++--- qubes/tests/ext.py | 147 ++++++++++++++++++++++++++++++------ 2 files changed, 169 insertions(+), 34 deletions(-) diff --git a/qubes/ext/custom_persist.py b/qubes/ext/custom_persist.py index ede50cfec..5f27fb7cf 100644 --- a/qubes/ext/custom_persist.py +++ b/qubes/ext/custom_persist.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, see . +import os import qubes.ext import qubes.config @@ -39,20 +40,53 @@ def _is_expected_feature(feature) -> bool: return feature.startswith(FEATURE_PREFIX) @staticmethod - def _is_valid_key(key, vm) -> bool: + def _check_key(key): if not key: - vm.log.warning("Got empty custom-persist key, ignoring") - return False + raise qubes.exc.QubesValueError( + "custom-persist key cannot be empty" + ) # QubesDB key length limit key_maxlen = QDB_KEY_LIMIT - len(QDB_PREFIX) if len(key) > key_maxlen: - vm.log.warning( + raise qubes.exc.QubesValueError( "custom-persist key is too long (max {}), ignoring: " "{}".format(key_maxlen, key) ) - return False - return True + + @staticmethod + def _check_value_path(value): + if not os.path.isabs(value): + raise qubes.exc.QubesValueError(f"invalid path '{value}'") + + def _check_value(self, value): + if value.startswith("/"): + self._check_value_path(value) + else: + options = value.split(":") + if len(options) < 5 or not options[4].startswith("/"): + raise qubes.exc.QubesValueError( + f"invalid value format: '{value}'" + ) + + resource_type = options[0] + mode = options[3] + if resource_type not in ("file", "dir"): + raise qubes.exc.QubesValueError( + f"invalid resource type option '{resource_type}' " + f"in value '{value}'" + ) + try: + if not 0 <= int(mode, 8) <= 0o7777: + raise qubes.exc.QubesValueError( + f"invalid mode option '{mode}' in value '{value}'" + ) + except ValueError: + raise qubes.exc.QubesValueError( + f"invalid mode option '{mode}' in value '{value}'" + ) + + self._check_value_path(":".join(options[4:])) def _write_db_value(self, feature, value, vm): vm.untrusted_qdb.write( @@ -65,9 +99,9 @@ def on_domain_qdb_create(self, vm, event): """Actually export features""" # pylint: disable=unused-argument for feature, value in vm.features.items(): - if self._is_expected_feature(feature) and self._is_valid_key( - self._extract_key_from_feature(feature), vm - ): + if self._is_expected_feature(feature): + self._check_key(self._extract_key_from_feature(feature)) + self._check_value(value) self._write_db_value(feature, value, vm) @qubes.ext.handler("domain-feature-set:*") @@ -78,8 +112,8 @@ def on_domain_feature_set(self, vm, event, feature, value, oldvalue=None): if not self._is_expected_feature(feature): return - if not self._is_valid_key(self._extract_key_from_feature(feature), vm): - return + self._check_key(self._extract_key_from_feature(feature)) + self._check_value(value) if not vm.is_running(): return diff --git a/qubes/tests/ext.py b/qubes/tests/ext.py index 4bf451965..7bf07ae19 100644 --- a/qubes/tests/ext.py +++ b/qubes/tests/ext.py @@ -1100,6 +1100,7 @@ def test_001_feature_set(self): "/newvalue", "", ) + self.assertEqual( sorted(self.vm.untrusted_qdb.mock_calls), [ @@ -1114,49 +1115,149 @@ def test_002_feature_delete(self): self.ext.on_domain_feature_delete( self.vm, "feature-delete:custom-persist.test", "custom-persist.test" ) + self.vm.untrusted_qdb.rm.assert_called_with("/persist/test") + + def test_003_empty_key(self): + with self.assertRaises(qubes.exc.QubesValueError) as e: + self.ext.on_domain_feature_set( + self.vm, + "feature-set:custom-persist.", + "custom-persist.", + "/test", + "", + ) + self.assertEqual(str(e.exception), "custom-persist key cannot be empty") + self.vm.untrusted_qdb.write.assert_not_called() + + def test_004_key_too_long(self): + with self.assertRaises(qubes.exc.QubesValueError) as e: + self.ext.on_domain_feature_set( + self.vm, + "feature-set:custom-persist." + "X" * 55, + "custom-persist." + "X" * 55, + "/test", + "", + ) + self.assertEqual( - self.vm.untrusted_qdb.mock_calls, - [mock.call.rm("/persist/test")], + str(e.exception), + "custom-persist key is too long (max 54), ignoring: " + "X" * 55, ) + self.vm.untrusted_qdb.assert_not_called() - def test_003_empty_key(self): + def test_005_other_feature_deletion(self): + self.ext.on_domain_feature_delete( + self.vm, "feature-delete:otherfeature.test", "otherfeature.test" + ) + self.vm.untrusted_qdb.assert_not_called() + + def test_006_feature_set_while_vm_is_not_running(self): + self.vm.is_running.return_value = False self.ext.on_domain_feature_set( self.vm, - "feature-set:custom-persist.", - "custom-persist.", + "feature-set:custom-persist.test", + "custom-persist.test", "/test", + ) + self.vm.untrusted_qdb.write.assert_not_called() + + def test_007_feature_set_value_with_option(self): + self.ext.on_domain_feature_set( + self.vm, + "feature-set:custom-persist.test", + "custom-persist.test", + "dir:root:root:0755:/var/test", "", ) - self.vm.untrusted_qdb.assert_not_called() - self.vm.log.warning.assert_called_once_with( - "Got empty custom-persist key, ignoring" + self.vm.untrusted_qdb.write.assert_called_with( + "/persist/test", "dir:root:root:0755:/var/test" ) - def test_004_key_too_long(self): + def test_008_feature_set_invalid_path(self): + with self.assertRaises(qubes.exc.QubesValueError): + self.ext.on_domain_feature_set( + self.vm, + "feature-set:custom-persist.test", + "custom-persist.test", + "test", + "", + ) + self.vm.untrusted_qdb.write.assert_not_called() + + def test_009_feature_set_invalid_option_type(self): + with self.assertRaises(qubes.exc.QubesValueError): + self.ext.on_domain_feature_set( + self.vm, + "feature-set:custom-persist.test", + "custom-persist.test", + "bad:root:root:0755:/var/test", + "", + ) + self.vm.untrusted_qdb.write.assert_not_called() + + def test_010_feature_set_invalid_option_mode_too_high(self): + with self.assertRaises(qubes.exc.QubesValueError): + self.ext.on_domain_feature_set( + self.vm, + "feature-set:custom-persist.test", + "custom-persist.test", + "file:root:root:9750:/var/test", + "", + ) + self.vm.untrusted_qdb.write.assert_not_called() + + def test_011_feature_set_invalid_option_mode_negative_high(self): + with self.assertRaises(qubes.exc.QubesValueError): + self.ext.on_domain_feature_set( + self.vm, + "feature-set:custom-persist.test", + "custom-persist.test", + "file:root:root:-755:/var/test", + "", + ) + self.vm.untrusted_qdb.write.assert_not_called() + + def test_012_feature_set_option_mode_without_leading_zero(self): self.ext.on_domain_feature_set( self.vm, - "feature-set:custom-persist." + "X" * 55, - "custom-persist." + "X" * 55, - "/test", + "feature-set:custom-persist.test", + "custom-persist.test", + "file:root:root:755:/var/test", "", ) - self.vm.untrusted_qdb.assert_not_called() - self.vm.log.warning.assert_called_once_with( - "custom-persist key is too long (max 54), ignoring: " + "X" * 55 + self.vm.untrusted_qdb.write.assert_called_with( + "/persist/test", "file:root:root:755:/var/test" ) - def test_005_other_feature_deletion(self): - self.ext.on_domain_feature_delete( - self.vm, "feature-delete:otherfeature.test", "otherfeature.test" + def test_013_feature_set_invalid_path_with_option(self): + with self.assertRaises(qubes.exc.QubesValueError): + self.ext.on_domain_feature_set( + self.vm, + "feature-set:custom-persist.test", + "custom-persist.test", + "dir:root:root:0755:var/test", + "", + ) + self.vm.untrusted_qdb.write.assert_not_called() + + def test_014_feature_set_path_with_colon_with_options(self): + self.ext.on_domain_feature_set( + self.vm, + "feature-set:custom-persist.test", + "custom-persist.test", + "file:root:root:755:/var/test:dir:with:colon", + "", ) - self.vm.untrusted_qdb.assert_not_called() + self.vm.untrusted_qdb.write.assert_called() - def test_006_feature_set_while_vm_is_not_running(self): - self.vm.is_running.return_value = False + def test_015_feature_set_path_with_colon_without_options(self): self.ext.on_domain_feature_set( self.vm, "feature-set:custom-persist.test", "custom-persist.test", - "/test", + "/var/test:dir:with:colon", + "", + ) + self.vm.untrusted_qdb.write.assert_called_with( + "/persist/test", "/var/test:dir:with:colon" ) - self.vm.untrusted_qdb.assert_not_called()