-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathtest_mountingaccessor.py
146 lines (104 loc) · 4.71 KB
/
test_mountingaccessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""pytest tests testing subclasses of xcp.accessor.MountingAccessor using pyfakefs"""
import sys
from io import BytesIO
from typing import TYPE_CHECKING, cast
from mock import patch
from pyfakefs.fake_filesystem import FakeFileOpen, FakeFilesystem
import xcp.accessor
import xcp.mount
from .test_httpaccessor import UTF8TEXT_LITERAL
if sys.version_info >= (3, 6):
from pytest_subprocess.fake_process import FakeProcess
if TYPE_CHECKING:
from typing_extensions import Literal
else:
import pytest
pytest.skip(allow_module_level=True)
sys.exit(0) # Let pyright know that this is a dead end
binary_data = b"\x00\x1b\x5b\x95\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xcc\xdd\xee\xff"
def expect(fp, mount):
fp.register_subprocess(mount) # type: ignore[arg-type]
def test_device_accessor(fs, fp):
# type: (FakeFilesystem, FakeProcess) -> None
assert isinstance(fp, FakeProcess)
# Test xcp.mount.bindMount()
mount = [b"/bin/mount", b"--bind", b"src", b"mountpoint_dest"]
fp.register_subprocess(mount) # type: ignore[arg-type]
assert xcp.mount.bindMount("src", "mountpoint_dest") is None
expect(fp, [b"/bin/mount", b"-t", b"iso9660", b"-o", b"ro", b"/dev/device", b"/tmp"])
accessor = xcp.accessor.createAccessor("dev:///dev/device", False)
assert isinstance(accessor, xcp.accessor.MountingAccessorTypes)
check_mounting_accessor(accessor, fs, fp)
def test_nfs_accessor(fs, fp):
# type: (FakeFilesystem, FakeProcess) -> None
assert isinstance(fp, FakeProcess)
mount = [
b"/bin/mount",
b"-t",
b"nfs",
b"-o",
b"tcp,timeo=100,retrans=1,retry=0",
b"server/path",
b"/tmp",
]
expect(fp, mount)
accessor = xcp.accessor.createAccessor("nfs://server/path", False)
assert isinstance(accessor, xcp.accessor.NFSAccessor)
check_mounting_accessor(accessor, fs, fp)
def check_mounting_accessor(accessor, fs, fp):
# type: (Literal[False] | xcp.accessor.Mount, FakeFilesystem, FakeProcess) -> None
"""Test subclasses of MountingAccessor (with xcp.cmd.runCmd in xcp.mount mocked)"""
assert isinstance(accessor, xcp.accessor.MountingAccessorTypes)
with patch("tempfile.mkdtemp") as tempfile_mkdtemp:
tempfile_mkdtemp.return_value = "/tmp"
accessor.start()
assert accessor.location
assert fs.isdir(accessor.location)
location = accessor.location
if sys.version_info.major >= 3:
fs.add_mount_point(location)
assert check_binary_read(accessor, location, fs)
assert check_binary_write(accessor, location, fs)
assert open_text(accessor, location, fs, UTF8TEXT_LITERAL) == UTF8TEXT_LITERAL
if sys.version_info.major >= 3:
fs.mount_points.pop(location)
umount = [b"/bin/umount", b"-d", b"/tmp"]
fp.register_subprocess(umount) # type: ignore[arg-type]
accessor.finish()
assert not fs.exists(location)
assert not accessor.location
def check_binary_read(accessor, location, fs):
# type: (Literal[False] | xcp.accessor.AnyAccessor, str, FakeFilesystem) -> bool
"""Test the openAddress() method of different types of local Accessor classes"""
assert isinstance(accessor, xcp.accessor.LocalTypes)
name = "binary_file"
path = location + "/" + name
assert fs.create_file(path, contents=cast(str, binary_data))
assert accessor.access(name)
assert accessor.access("nonexisting filename") is False
assert accessor.lastError == 404
binary_file = accessor.openAddress(name)
assert not isinstance(binary_file, bool)
fs.remove(path)
return cast(bytes, binary_file.read()) == binary_data
def check_binary_write(accessor, location, fs):
# type: (Literal[False] | xcp.accessor.AnyAccessor, str, FakeFilesystem) -> bool
"""Test the writeFile() method of different types of local Accessor classes"""
assert isinstance(accessor, xcp.accessor.LocalTypes)
name = "binary_file_written_by_accessor"
accessor.writeFile(BytesIO(binary_data), name)
assert accessor.access(name)
with FakeFileOpen(fs, delete_on_close=True)(location + "/" + name, "rb") as written:
return cast(bytes, written.read()) == binary_data
def open_text(accessor, location, fs, text):
# type: (Literal[False] | xcp.accessor.AnyAccessor, str, FakeFilesystem, str) -> str
"""Test the openText() method of subclasses of xcp.accessor.MountingAccessor"""
assert isinstance(accessor, xcp.accessor.MountingAccessorTypes)
name = "textfile"
path = location + "/" + name
assert fs.create_file(path, contents=text, encoding="utf-8")
assert accessor.access(name)
with accessor.openText(name) as textfile:
assert not isinstance(textfile, bool)
fs.remove(path)
return textfile.read()