Skip to content

Commit b0c7ae7

Browse files
committed
Support domain parameters in get_roles() and get_users().
1 parent a7b0ccd commit b0c7ae7

4 files changed

Lines changed: 125 additions & 116 deletions

File tree

casbin/enforcer.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,23 @@ def get_roles_for_user(self, user):
2121
"""gets the roles that a user has."""
2222
return self.model.model['g']['g'].rm.get_roles(user)
2323

24-
def get_roles_for_user_in_domain(self, user, domain):
24+
def get_roles_for_user_in_domain(self, name, domain):
2525
"""gets the roles that a user has inside a domain."""
26-
res = self.model.model['g']['g'].rm.get_roles(user, domain)
27-
return [] if isinstance(res, RuntimeError) else [r.replace(domain + '::', '') for r in res]
26+
return self.model.model['g']['g'].rm.get_roles(name, domain)
2827

2928
def get_users_for_role(self, role):
3029
"""gets the users that has a role."""
3130
return self.model.model['g']['g'].rm.get_users(role)
3231

33-
def get_users_for_role_in_domain(self, role, domain):
32+
def get_users_for_role_in_domain(self, name, domain):
3433
"""gets the users that has a role inside a domain."""
35-
_role = domain + '::' + role
36-
res = self.model.model['g']['g'].rm.get_users(_role, domain)
37-
return [] if isinstance(res, RuntimeError) else [r.replace(domain + '::', '') for r in res]
34+
return self.model.model['g']['g'].rm.get_users(name, domain)
3835

3936
def has_role_for_user(self, user, role):
4037
"""determines whether a user has a role."""
4138
roles = self.get_roles_for_user(user)
4239

43-
for r in roles:
44-
if r == role:
45-
return True
46-
47-
return False
40+
return role in roles
4841

4942
def add_role_for_user(self, user, role):
5043
"""adds a role for a user."""

casbin/rbac/default_role_manager/role_manager.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ def has_link(self, name1, name2, *domain):
6767
return role1.has_role(name2, self.max_hierarchy_level)
6868

6969
def get_roles(self, name, *domain):
70+
"""
71+
gets the roles that a subject inherits.
72+
domain is a prefix to the roles.
73+
"""
7074
if len(domain) == 1:
7175
name = domain[0] + "::" + name
7276
elif len(domain) > 1:
@@ -77,19 +81,31 @@ def get_roles(self, name, *domain):
7781

7882
roles = self.create_role(name).get_roles()
7983
if len(domain) == 1:
80-
for value in roles:
81-
value = value[len(domain[0]) + 2:]
84+
for key, value in enumerate(roles):
85+
roles[key] = value[len(domain[0]) + 2:]
8286

8387
return roles
8488

8589
def get_users(self, name, *domain):
90+
"""
91+
gets the users that inherits a subject.
92+
domain is an unreferenced parameter here, may be used in other implementations.
93+
"""
94+
if len(domain) == 1:
95+
name = domain[0] + "::" + name
96+
elif len(domain) > 1:
97+
return RuntimeError("error: domain should be 1 parameter")
98+
8699
if not self.has_role(name):
87-
return RuntimeError("error: name does not exist")
100+
return []
88101

89102
names = []
90103
for role in self.all_roles.values():
91104
if role.has_direct_role(name):
92-
names.append(role.name)
105+
if len(domain) == 1:
106+
names.append(role.name[len(domain[0]) + 2:])
107+
else:
108+
names.append(role.name)
93109

94110
return names
95111

tests/test_enforcer.py

Lines changed: 0 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -146,103 +146,3 @@ def test_enforce_abac_log_enabled(self):
146146
sub = 'alice'
147147
obj = {'Owner': 'alice', 'id': 'data1'}
148148
self.assertTrue(e.enforce(sub, obj, 'write'))
149-
150-
def test_enforce_implicit_roles_api(self):
151-
e = get_enforcer(get_examples("rbac_model.conf"),
152-
get_examples("rbac_with_hierarchy_policy.csv"))
153-
154-
self.assertTrue(e.get_permissions_for_user('alice') == [["alice", "data1", "read"]])
155-
self.assertTrue(e.get_permissions_for_user('bob') == [["bob", "data2", "write"]])
156-
157-
self.assertTrue(e.get_implicit_roles_for_user('alice') == ['admin', 'data1_admin', 'data2_admin'])
158-
self.assertTrue(e.get_implicit_roles_for_user('bob') == [])
159-
160-
def test_enforce_implicit_roles_with_domain(self):
161-
e = get_enforcer(get_examples("rbac_with_domains_model.conf"),
162-
get_examples("rbac_with_hierarchy_with_domains_policy.csv"))
163-
164-
self.assertTrue(e.get_roles_for_user_in_domain('alice', 'domain1') == ['role:global_admin'])
165-
self.assertTrue(
166-
e.get_implicit_roles_for_user('alice', 'domain1') == ["role:global_admin", "role:reader", "role:writer"])
167-
168-
def test_enforce_implicit_permissions_api(self):
169-
e = get_enforcer(get_examples("rbac_model.conf"),
170-
get_examples("rbac_with_hierarchy_policy.csv"))
171-
self.assertTrue(e.get_permissions_for_user('alice') == [["alice", "data1", "read"]])
172-
self.assertTrue(e.get_permissions_for_user('bob') == [["bob", "data2", "write"]])
173-
self.assertTrue(e.get_implicit_permissions_for_user('alice') == [
174-
['alice', 'data1', 'read'],
175-
['data1_admin', 'data1', 'read'],
176-
['data1_admin', 'data1', 'write'],
177-
['data2_admin', 'data2', 'read'],
178-
['data2_admin', 'data2', 'write']])
179-
self.assertTrue(e.get_implicit_permissions_for_user('bob') == [["bob", "data2", "write"]])
180-
181-
def test_enforce_implicit_permissions_api_with_domain(self):
182-
e = get_enforcer(get_examples("rbac_with_domains_model.conf"),
183-
get_examples("rbac_with_hierarchy_with_domains_policy.csv"))
184-
185-
self.assertTrue(e.get_roles_for_user_in_domain('alice', 'domain1') == ['role:global_admin'])
186-
self.assertTrue(e.get_implicit_roles_for_user('alice', 'domain1') ==
187-
['role:global_admin', 'role:reader', 'role:writer'])
188-
self.assertTrue(e.get_implicit_permissions_for_user('alice', 'domain1') == [
189-
['alice', 'domain1', 'data2', 'read'],
190-
["role:reader", "domain1", "data1", "read"],
191-
["role:writer", "domain1", "data1", "write"]])
192-
self.assertTrue(e.get_implicit_permissions_for_user('bob', 'domain1') == [])
193-
194-
def test_enforce_get_users_in_domain(self):
195-
e = get_enforcer(get_examples("rbac_with_domains_model.conf"),
196-
get_examples("rbac_with_domains_policy.csv"))
197-
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain1') == ['alice'])
198-
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain1') == [])
199-
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain2') == ['bob'])
200-
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain2') == [])
201-
e.delete_roles_for_user_in_domain('alice', 'admin', 'domain1')
202-
e.add_role_for_user_in_domain('bob', 'admin', 'domain1')
203-
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain1') == ['bob'])
204-
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain1') == [])
205-
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain2') == ['bob'])
206-
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain2') == [])
207-
208-
def test_enforce_user_api_with_domain(self):
209-
e = get_enforcer(get_examples("rbac_with_domains_model.conf"),
210-
get_examples("rbac_with_domains_policy.csv"))
211-
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain1') == ['alice'])
212-
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain1') == [])
213-
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain2') == ['bob'])
214-
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain2') == [])
215-
216-
e.delete_roles_for_user_in_domain('alice', 'admin', 'domain1')
217-
e.add_role_for_user_in_domain('bob', 'admin', 'domain1')
218-
219-
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain1') == ['bob'])
220-
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain1') == [])
221-
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain2') == ['bob'])
222-
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain2') == [])
223-
224-
def test_enforce_get_roles_with_domain(self):
225-
e = get_enforcer(get_examples("rbac_with_domains_model.conf"),
226-
get_examples("rbac_with_domains_policy.csv"))
227-
self.assertTrue(e.get_roles_for_user_in_domain('alice', 'domain1') == ['admin'])
228-
self.assertTrue(e.get_roles_for_user_in_domain('bob', 'domain1') == [])
229-
self.assertTrue(e.get_roles_for_user_in_domain('admin', 'domain1') == [])
230-
self.assertTrue(e.get_roles_for_user_in_domain('non_exist', 'domain1') == [])
231-
232-
self.assertTrue(e.get_roles_for_user_in_domain('alice', 'domain2') == [])
233-
self.assertTrue(e.get_roles_for_user_in_domain('bob', 'domain2') == ['admin'])
234-
self.assertTrue(e.get_roles_for_user_in_domain('admin', 'domain2') == [])
235-
self.assertTrue(e.get_roles_for_user_in_domain('non_exist', 'domain2') == [])
236-
237-
e.delete_roles_for_user_in_domain('alice', 'admin', 'domain1')
238-
e.add_role_for_user_in_domain('bob', 'admin', 'domain1')
239-
240-
self.assertTrue(e.get_roles_for_user_in_domain('alice', 'domain1') == [])
241-
self.assertTrue(e.get_roles_for_user_in_domain('bob', 'domain1') == ['admin'])
242-
self.assertTrue(e.get_roles_for_user_in_domain('admin', 'domain1') == [])
243-
self.assertTrue(e.get_roles_for_user_in_domain('non_exist', 'domain1') == [])
244-
245-
self.assertTrue(e.get_roles_for_user_in_domain('alice', 'domain2') == [])
246-
self.assertTrue(e.get_roles_for_user_in_domain('bob', 'domain2') == ['admin'])
247-
self.assertTrue(e.get_roles_for_user_in_domain('admin', 'domain2') == [])
248-
self.assertTrue(e.get_roles_for_user_in_domain('non_exist', 'domain2') == [])

tests/test_rbac_api.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,103 @@ def test_has_permission_for_user(self):
105105
self.assertFalse(e.has_permission_for_user('alice', *['write']))
106106
self.assertFalse(e.has_permission_for_user('bob', *['read']))
107107
self.assertTrue(e.has_permission_for_user('bob', *['write']))
108+
109+
def test_enforce_implicit_roles_api(self):
110+
e = get_enforcer(get_examples("rbac_model.conf"),
111+
get_examples("rbac_with_hierarchy_policy.csv"))
112+
113+
self.assertTrue(e.get_permissions_for_user('alice') == [["alice", "data1", "read"]])
114+
self.assertTrue(e.get_permissions_for_user('bob') == [["bob", "data2", "write"]])
115+
116+
self.assertTrue(e.get_implicit_roles_for_user('alice') == ['admin', 'data1_admin', 'data2_admin'])
117+
self.assertTrue(e.get_implicit_roles_for_user('bob') == [])
118+
119+
def test_enforce_implicit_roles_with_domain(self):
120+
e = get_enforcer(get_examples("rbac_with_domains_model.conf"),
121+
get_examples("rbac_with_hierarchy_with_domains_policy.csv"))
122+
123+
self.assertTrue(e.get_roles_for_user_in_domain('alice', 'domain1') == ['role:global_admin'])
124+
self.assertTrue(
125+
e.get_implicit_roles_for_user('alice', 'domain1') == ["role:global_admin", "role:reader", "role:writer"])
126+
127+
def test_enforce_implicit_permissions_api(self):
128+
e = get_enforcer(get_examples("rbac_model.conf"),
129+
get_examples("rbac_with_hierarchy_policy.csv"))
130+
self.assertTrue(e.get_permissions_for_user('alice') == [["alice", "data1", "read"]])
131+
self.assertTrue(e.get_permissions_for_user('bob') == [["bob", "data2", "write"]])
132+
self.assertTrue(e.get_implicit_permissions_for_user('alice') == [
133+
['alice', 'data1', 'read'],
134+
['data1_admin', 'data1', 'read'],
135+
['data1_admin', 'data1', 'write'],
136+
['data2_admin', 'data2', 'read'],
137+
['data2_admin', 'data2', 'write']])
138+
self.assertTrue(e.get_implicit_permissions_for_user('bob') == [["bob", "data2", "write"]])
139+
140+
def test_enforce_implicit_permissions_api_with_domain(self):
141+
e = get_enforcer(get_examples("rbac_with_domains_model.conf"),
142+
get_examples("rbac_with_hierarchy_with_domains_policy.csv"))
143+
144+
self.assertTrue(e.get_roles_for_user_in_domain('alice', 'domain1') == ['role:global_admin'])
145+
self.assertTrue(e.get_implicit_roles_for_user('alice', 'domain1') ==
146+
['role:global_admin', 'role:reader', 'role:writer'])
147+
self.assertTrue(e.get_implicit_permissions_for_user('alice', 'domain1') == [
148+
['alice', 'domain1', 'data2', 'read'],
149+
["role:reader", "domain1", "data1", "read"],
150+
["role:writer", "domain1", "data1", "write"]])
151+
self.assertTrue(e.get_implicit_permissions_for_user('bob', 'domain1') == [])
152+
153+
def test_enforce_get_users_in_domain(self):
154+
e = get_enforcer(get_examples("rbac_with_domains_model.conf"),
155+
get_examples("rbac_with_domains_policy.csv"))
156+
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain1') == ['alice'])
157+
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain1') == [])
158+
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain2') == ['bob'])
159+
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain2') == [])
160+
e.delete_roles_for_user_in_domain('alice', 'admin', 'domain1')
161+
e.add_role_for_user_in_domain('bob', 'admin', 'domain1')
162+
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain1') == ['bob'])
163+
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain1') == [])
164+
self.assertTrue(e.get_users_for_role_in_domain('admin', 'domain2') == ['bob'])
165+
self.assertTrue(e.get_users_for_role_in_domain('non_exist', 'domain2') == [])
166+
167+
def test_enforce_user_api_with_domain(self):
168+
e = get_enforcer(get_examples("rbac_with_domains_model.conf"),
169+
get_examples("rbac_with_domains_policy.csv"))
170+
self.assertEqual(e.get_users_for_role_in_domain('admin', 'domain1'), ['alice'])
171+
self.assertEqual(e.get_users_for_role_in_domain('non_exist', 'domain1'), [])
172+
self.assertEqual(e.get_users_for_role_in_domain('admin', 'domain2'), ['bob'])
173+
self.assertEqual(e.get_users_for_role_in_domain('non_exist', 'domain2'), [])
174+
175+
e.delete_roles_for_user_in_domain('alice', 'admin', 'domain1')
176+
e.add_role_for_user_in_domain('bob', 'admin', 'domain1')
177+
178+
self.assertEqual(e.get_users_for_role_in_domain('admin', 'domain1'), ['bob'])
179+
self.assertEqual(e.get_users_for_role_in_domain('non_exist', 'domain1'), [])
180+
self.assertEqual(e.get_users_for_role_in_domain('admin', 'domain2'), ['bob'])
181+
self.assertEqual(e.get_users_for_role_in_domain('non_exist', 'domain2'), [])
182+
183+
def test_enforce_get_roles_with_domain(self):
184+
e = get_enforcer(get_examples("rbac_with_domains_model.conf"),
185+
get_examples("rbac_with_domains_policy.csv"))
186+
self.assertEqual(e.get_roles_for_user_in_domain('alice', 'domain1'), ['admin'])
187+
self.assertEqual(e.get_roles_for_user_in_domain('bob', 'domain1'), [])
188+
self.assertEqual(e.get_roles_for_user_in_domain('admin', 'domain1'), [])
189+
self.assertEqual(e.get_roles_for_user_in_domain('non_exist', 'domain1'), [])
190+
191+
self.assertEqual(e.get_roles_for_user_in_domain('alice', 'domain2'), [])
192+
self.assertEqual(e.get_roles_for_user_in_domain('bob', 'domain2'), ['admin'])
193+
self.assertEqual(e.get_roles_for_user_in_domain('admin', 'domain2'), [])
194+
self.assertEqual(e.get_roles_for_user_in_domain('non_exist', 'domain2'), [])
195+
196+
e.delete_roles_for_user_in_domain('alice', 'admin', 'domain1')
197+
e.add_role_for_user_in_domain('bob', 'admin', 'domain1')
198+
199+
self.assertEqual(e.get_roles_for_user_in_domain('alice', 'domain1'), [])
200+
self.assertEqual(e.get_roles_for_user_in_domain('bob', 'domain1'), ['admin'])
201+
self.assertEqual(e.get_roles_for_user_in_domain('admin', 'domain1'), [])
202+
self.assertEqual(e.get_roles_for_user_in_domain('non_exist', 'domain1'), [])
203+
204+
self.assertEqual(e.get_roles_for_user_in_domain('alice', 'domain2'), [])
205+
self.assertEqual(e.get_roles_for_user_in_domain('bob', 'domain2'), ['admin'])
206+
self.assertEqual(e.get_roles_for_user_in_domain('admin', 'domain2'), [])
207+
self.assertEqual(e.get_roles_for_user_in_domain('non_exist', 'domain2'), [])

0 commit comments

Comments
 (0)