14
14
import pytest
15
15
from pyfakefs .fake_filesystem import FakeFileOpen , FakeFilesystem
16
16
17
- from xcp .cpiofile import CpioFile
17
+ from xcp .cpiofile import CpioFile , StreamError
18
18
19
19
binary_data = b"\x00 \x1b \x5b \x95 \xb1 \xb2 \xb3 \xb4 \xb5 \xb6 \xb7 \xb8 \xb9 \xcc \xdd \xee \xff "
20
20
@@ -48,53 +48,79 @@ def test_cpiofile_modes(fs):
48
48
if comp == "xz" and filetype == ":" :
49
49
continue # streaming xz is not implemented (supported only as file)
50
50
check_archive_mode (filetype + comp , fs )
51
+ if filetype == "|" :
52
+ check_archive_mode (filetype + comp , fs , filename = "archive." + comp )
51
53
52
54
53
- def check_archive_mode ( archive_mode , fs ):
54
- # type: (str, FakeFilesystem ) -> None
55
+ def create_cpio_archive ( fs , archive_mode , filename = None ):
56
+ # type: (FakeFilesystem, str, str | None ) -> io.BytesIO | None
55
57
"""
56
- Test CpioFile in the given archive mode with verification of the archive contents .
58
+ Create a CpioFile archive with files and directories from a FakeFilesystem .
57
59
60
+ :param fs: `FakeFilesystem` fixture representing a simulated file system for testing
58
61
:param archive_mode: The archive mode is a string parameter that specifies the mode
59
62
in which the CpioFile object should be opened.
60
- :param fs: `FakeFilesystem` fixture representing a simulated file system for testing
63
+ :param filename: The name of the file to create the cpio archive
61
64
"""
62
- # Step 1: Create and populate a cpio archive in a BytesIO buffer
63
- bytesio = io . BytesIO ()
64
- archive = CpioFile .open (fileobj = bytesio , mode = "w" + archive_mode )
65
- pyfakefs_populate_archive (archive , fs )
65
+ cpiofile = None if filename else io . BytesIO ()
66
+ fs . reset ()
67
+ cpio = CpioFile .open (name = filename , fileobj = cpiofile , mode = "w" + archive_mode )
68
+ pyfakefs_populate_archive (cpio , fs )
66
69
if archive_mode == "|gz" :
67
- archive .list (verbose = True )
68
- archive .close ()
70
+ cpio .list (verbose = True )
71
+ cpio .close ()
72
+ if not cpiofile :
73
+ cpio_data = FakeFileOpen (fs )(filename , "rb" ).read ()
74
+ fs .reset ()
75
+ fs .create_file (filename , contents = cast (str , cpio_data ))
76
+ return None
77
+ fs .reset ()
78
+ cpiofile .seek (0 )
79
+ return cpiofile
80
+
69
81
82
+ def check_archive_mode (archive_mode , fs , filename = None ):
83
+ # type: (str, FakeFilesystem, str | None) -> None
84
+ """
85
+ Test CpioFile in the given archive mode with verification of the archive contents.
86
+
87
+ :param archive_mode: The archive mode is a string parameter that specifies the mode
88
+ in which the CpioFile object should be opened.
89
+ :param fs: `FakeFilesystem` fixture representing a simulated file system for testing
90
+ """
70
91
# Step 2: Extract the archive in a clean filesystem and verify the extracted contents
71
- fs .reset ()
72
- bytesio .seek (0 )
73
- archive = CpioFile .open (fileobj = bytesio , mode = "r" + archive_mode )
92
+ cpiofile = create_cpio_archive (fs , archive_mode , filename )
93
+ archive = CpioFile .open (name = filename , fileobj = cpiofile , mode = "r" + archive_mode )
74
94
archive .extractall ()
75
95
pyfakefs_verify_filesystem (fs )
76
- assert archive .getnames () == ["dirname" , "dirname/filename" , "dir2/symlink " ]
96
+ assert archive .getnames () == ["dirname" , "dirname/filename" , "symlink" , " dir2/file_2 " ]
77
97
dirs = [cpioinfo .name for cpioinfo in archive .getmembers () if cpioinfo .isdir ()]
78
98
files = [cpioinfo .name for cpioinfo in archive .getmembers () if cpioinfo .isreg ()]
79
99
symlinks = [cpioinfo .name for cpioinfo in archive .getmembers () if cpioinfo .issym ()]
80
100
assert dirs == ["dirname" ]
81
- assert files == ["dirname/filename" ]
82
- assert symlinks == ["dir2/symlink" ]
83
- assert archive .getmember (symlinks [0 ]).linkname == "symlink_target"
101
+ assert files == ["dirname/filename" , "dir2/file_2" ]
102
+ assert symlinks == ["symlink" ]
103
+ assert archive .getmember (symlinks [0 ]).linkname == "dirname/filename"
104
+
105
+ # Test extracting a symlink to a file object:
106
+ if archive_mode .startswith ("|" ): # Non-seekable streams raise StreamError
107
+ with pytest .raises (StreamError ):
108
+ archive .extractfile ("symlink" )
109
+ else : # Expect a seekable fileobj for this test (not a stream) to work:
110
+ fileobj = archive .extractfile ("symlink" )
111
+ assert fileobj and fileobj .read () == binary_data
84
112
archive .close ()
85
113
86
114
# Step 3: Extract the archive a second time using another method
87
- fs .reset ()
88
- bytesio .seek (0 )
89
- archive = CpioFile .open (fileobj = bytesio , mode = "r" + archive_mode )
115
+ cpiofile = create_cpio_archive (fs , archive_mode , filename )
116
+ archive = CpioFile .open (name = filename , fileobj = cpiofile , mode = "r" + archive_mode )
90
117
if archive_mode [0 ] != "|" :
91
118
for cpioinfo in archive :
92
119
archive .extract (cpioinfo )
93
120
pyfakefs_verify_filesystem (fs )
94
121
if archive_mode == "|xz" :
95
122
archive .list (verbose = True )
96
123
archive .close ()
97
- bytesio .close ()
98
124
99
125
100
126
def pyfakefs_populate_archive (archive , fs ):
@@ -105,11 +131,12 @@ def pyfakefs_populate_archive(archive, fs):
105
131
:param archive: Instance of the CpioFile class to create a new cpio archive
106
132
:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
107
133
"""
108
- fs .reset ()
109
134
110
135
fs .create_file ("dirname/filename" , contents = cast (str , binary_data ))
111
136
archive .add ("dirname" , recursive = True )
112
- fs .create_symlink ("directory/symlink" , "symlink_target" )
137
+ fs .create_file ("directory/file_2" , contents = cast (str , binary_data ))
138
+ fs .create_symlink ("symlink" , "dirname/filename" )
139
+ archive .add ("symlink" )
113
140
114
141
# Test special code path of archive.add(".", ...):
115
142
os .chdir ("directory" )
@@ -129,7 +156,10 @@ def pyfakefs_verify_filesystem(fs):
129
156
130
157
:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
131
158
"""
132
- assert fs .islink ("dir2/ symlink" )
159
+ assert fs .islink ("symlink" )
133
160
assert fs .isfile ("dirname/filename" )
161
+ assert fs .isfile ("dir2/file_2" )
134
162
with FakeFileOpen (fs )("dirname/filename" , "rb" ) as contents :
135
163
assert contents .read () == binary_data
164
+ with FakeFileOpen (fs )("dir2/file_2" , "rb" ) as contents :
165
+ assert contents .read () == binary_data
0 commit comments