Skip to content

Commit 2fb4770

Browse files
committed
feat: add test
1 parent c1ea804 commit 2fb4770

File tree

2 files changed

+172
-0
lines changed

2 files changed

+172
-0
lines changed

casbin/persist/adapters/filtered_file_adapter.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ def load_filtered_policy(self, model, filter):
5252

5353
try:
5454
filter_value = [filter.__dict__["P"]] + [filter.__dict__["G"]]
55+
is_empty_filter = all(not f for f in filter_value) or all(
56+
all(not x.strip() for x in f) if f else True for f in filter_value
57+
)
58+
if is_empty_filter:
59+
return self.load_policy(model)
5560
except:
5661
raise RuntimeError("invalid filter type")
5762

tests/test_filter.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
from unittest import TestCase
1717
from tests.test_enforcer import get_examples
1818
from casbin.persist.adapters import FilteredFileAdapter
19+
from casbin.persist.adapters.filtered_file_adapter import filter_line, filter_words
20+
import tempfile
21+
import shutil
22+
import os
1923

2024

2125
class Filter:
@@ -141,3 +145,166 @@ def test_filtered_adapter_invalid_filepath(self):
141145

142146
with self.assertRaises(RuntimeError):
143147
e.load_filtered_policy(None)
148+
149+
def test_empty_filter_array(self):
150+
"""Test filter for empty array."""
151+
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
152+
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
153+
filter = Filter()
154+
filter.P = []
155+
filter.G = []
156+
157+
e.load_filtered_policy(filter)
158+
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
159+
self.assertTrue(e.has_policy(["admin", "domain2", "data2", "read"]))
160+
161+
def test_empty_string_filter(self):
162+
"""Test the filter for all empty strings."""
163+
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
164+
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
165+
filter = Filter()
166+
filter.P = ["", "", ""]
167+
filter.G = ["", "", ""]
168+
169+
e.load_filtered_policy(filter)
170+
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
171+
self.assertTrue(e.has_policy(["admin", "domain2", "data2", "read"]))
172+
173+
def test_mixed_empty_filter(self):
174+
"""测试混合空字符串和非空字符串的过滤器"""
175+
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
176+
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
177+
filter = Filter()
178+
filter.P = ["", "domain1", ""]
179+
filter.G = ["", "", "domain1"]
180+
181+
e.load_filtered_policy(filter)
182+
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
183+
self.assertFalse(e.has_policy(["admin", "domain2", "data2", "read"]))
184+
185+
def test_nonexistent_domain_filter(self):
186+
"""Testing the filter for a non-existent domain."""
187+
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
188+
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
189+
filter = Filter()
190+
filter.P = ["", "domain3"]
191+
filter.G = ["", "", "domain3"]
192+
193+
e.load_filtered_policy(filter)
194+
self.assertFalse(e.has_policy(["admin", "domain3", "data3", "read"]))
195+
196+
def test_empty_filter_array(self):
197+
"""Test filter for empty array."""
198+
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
199+
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
200+
filter = Filter()
201+
filter.P = []
202+
filter.G = []
203+
204+
try:
205+
e.load_filtered_policy(filter)
206+
except:
207+
raise RuntimeError("unexpected error with empty filter arrays")
208+
209+
self.assertFalse(e.is_filtered(), "Adapter should not be marked as filtered with empty filters")
210+
211+
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
212+
self.assertTrue(e.has_policy(["admin", "domain2", "data2", "read"]))
213+
214+
def test_empty_string_filter(self):
215+
"""Test the filter for all empty strings."""
216+
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
217+
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
218+
filter = Filter()
219+
filter.P = ["", "", ""]
220+
filter.G = ["", "", ""]
221+
222+
try:
223+
e.load_filtered_policy(filter)
224+
except:
225+
raise RuntimeError("unexpected error with empty string filters")
226+
227+
self.assertFalse(e.is_filtered(), "Adapter should not be marked as filtered with empty string filters")
228+
229+
try:
230+
e.save_policy()
231+
except:
232+
raise RuntimeError("unexpected error in SavePolicy with empty string filters")
233+
234+
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
235+
self.assertTrue(e.has_policy(["admin", "domain2", "data2", "read"]))
236+
237+
def test_mixed_empty_filter(self):
238+
"""Test the filter for mixed empty and non-empty strings."""
239+
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
240+
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
241+
filter = Filter()
242+
filter.P = ["", "domain1", ""]
243+
filter.G = ["", "", "domain1"]
244+
245+
try:
246+
e.load_filtered_policy(filter)
247+
except:
248+
raise RuntimeError("unexpected error with mixed empty filters")
249+
250+
self.assertTrue(e.is_filtered(), "Adapter should be marked as filtered")
251+
252+
with self.assertRaises(RuntimeError):
253+
e.save_policy()
254+
255+
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
256+
self.assertFalse(e.has_policy(["admin", "domain2", "data2", "read"]))
257+
258+
def test_whitespace_filter(self):
259+
"""Test the filter for all blank characters."""
260+
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
261+
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
262+
filter = Filter()
263+
filter.P = [" ", " ", "\t"]
264+
filter.G = ["\n", " ", " "]
265+
266+
e.load_filtered_policy(filter)
267+
268+
self.assertFalse(e.is_filtered())
269+
self.assertTrue(e.has_policy(["admin", "domain1", "data1", "read"]))
270+
self.assertTrue(e.has_policy(["admin", "domain2", "data2", "read"]))
271+
272+
def test_filter_line_edge_cases(self):
273+
"""Test the boundary cases of the filter_line function."""
274+
adapter = FilteredFileAdapter(get_examples("rbac_with_domains_policy.csv"))
275+
276+
self.assertFalse(filter_line("", [[""], [""]]))
277+
278+
self.assertFalse(filter_line("invalid_line", [[""], [""]]))
279+
280+
self.assertFalse(filter_line("p, admin, domain1, data1, read", None))
281+
282+
def test_filter_words_edge_cases(self):
283+
"""Test the boundary cases of the filter_words function."""
284+
self.assertTrue(filter_words(["p"], ["filter1", "filter2"]))
285+
286+
self.assertFalse(filter_words(["p", "admin", "domain1"], []))
287+
288+
line = ["admin", "domain1", "data*", "read"]
289+
filter = ["", "", "data1", ""]
290+
self.assertTrue(filter_words(line, filter))
291+
292+
def test_load_filtered_policy_with_comments(self):
293+
"""Test loading filtering policies with comments."""
294+
temp_file = tempfile.NamedTemporaryFile(delete=False)
295+
try:
296+
shutil.copyfile(get_examples("rbac_with_domains_policy.csv"), temp_file.name)
297+
298+
with open(temp_file.name, "a") as f:
299+
f.write("\n# This is a comment\np, admin, domain1, data3, read")
300+
301+
adapter = FilteredFileAdapter(temp_file.name)
302+
e = casbin.Enforcer(get_examples("rbac_with_domains_model.conf"), adapter)
303+
filter = Filter()
304+
filter.P = ["", "domain1"]
305+
filter.G = ["", "", "domain1"]
306+
307+
e.load_filtered_policy(filter)
308+
self.assertTrue(e.has_policy(["admin", "domain1", "data3", "read"]))
309+
finally:
310+
os.unlink(temp_file.name)

0 commit comments

Comments
 (0)