diff --git a/qubes/app.py b/qubes/app.py index df0b98071..2a00355b7 100644 --- a/qubes/app.py +++ b/qubes/app.py @@ -427,6 +427,10 @@ class VMCollection: def __init__(self, app): self.app = app self._dict = {} + # Recently used disposable IDs: dispid -> destroy seconds since epoch + self._recent_dispids = {} + # Avoid reuse of disposable IDs for one week + self._no_dispid_reuse_period = 7 * 24 * 60 * 60 def close(self): del self.app @@ -538,6 +542,8 @@ def __delitem__(self, key): pass del self._dict[vm.qid] self.app.fire_event('domain-delete', vm=vm) + if getattr(vm, 'dispid', None): + self._recent_dispids[getattr(vm, 'dispid')] = int(time.monotonic()) def __contains__(self, key): return any((key in (vm, vm.qid, vm.name)) @@ -583,6 +589,15 @@ def get_new_unused_dispid(self): for _ in range(int(qubes.config.max_dispid ** 0.5)): dispid = random.SystemRandom().randrange(qubes.config.max_dispid) if not any(getattr(vm, 'dispid', None) == dispid for vm in self): + if dispid in self._recent_dispids: + delta_secs = int(time.monotonic()) - \ + self._recent_dispids[dispid] + if delta_secs < self._no_dispid_reuse_period: + continue + del self._recent_dispids[dispid] + self.app.log.debug( + 'Reused dispID {} after {} hours'.format( + dispid, delta_secs / 60 / 60)) return dispid raise LookupError(( 'https://xkcd.com/221/', diff --git a/qubes/tests/app.py b/qubes/tests/app.py index 1fd1dcf24..d9779e541 100644 --- a/qubes/tests/app.py +++ b/qubes/tests/app.py @@ -32,6 +32,8 @@ import qubes.tests.init import qubes.tests.storage_reflink +import logging +import time class TestApp(qubes.tests.TestEmitter): pass @@ -211,6 +213,7 @@ def setUp(self): super().setUp() self.app = TestApp() self.vms = qubes.app.VMCollection(self.app) + self.app.log = logging.getLogger() self.testvm1 = qubes.tests.init.TestVM( None, None, qid=1, name='testvm1') @@ -315,6 +318,21 @@ def test_100_get_new_unused_qid(self): self.vms.get_new_unused_qid() + def test_999999_get_new_unused_dispid(self): + with mock.patch('random.SystemRandom') as random: + random.return_value.randrange.side_effect = [11, 22, 33, 44, 55, 66] + # Testing overal functionality + self.assertEqual(self.vms.get_new_unused_dispid(), 11) + self.assertEqual(self.vms.get_new_unused_dispid(), 22) + self.assertEqual(self.vms.get_new_unused_dispid(), 33) + # Testing no reuse + self.vms._recent_dispids[44] = time.monotonic() + self.assertNotEqual(self.vms.get_new_unused_dispid(), 44) + # Testing reuse after safe period + self.vms._recent_dispids[66] = time.monotonic() \ + - self.vms._no_dispid_reuse_period - 1 + self.assertEqual(self.vms.get_new_unused_dispid(), 66) + self.assertFalse(66 in self.vms._recent_dispids) # def test_200_get_vms_based_on(self): # pass