1
1
import libzfs
2
2
3
- from middlewared .schema import accepts , Any , Bool , Dict , Int , List , Ref , Str
4
3
from middlewared .service import CallError , job , Service
5
4
from middlewared .utils import filter_list
6
5
@@ -15,20 +14,23 @@ class Config:
15
14
private = True
16
15
process_pool = True
17
16
18
- @accepts (
19
- Ref ('query-filters' ),
20
- Ref ('query-options' ),
21
- List (
22
- 'additional_information' ,
23
- items = [Str ('desideratum' , enum = ['SIZE' , 'RO' , 'DEVID' , 'ATTACHMENT' ])]
24
- )
25
- )
26
- def unlocked_zvols_fast (self , filters , options , additional_information ):
17
+ def unlocked_zvols_fast (
18
+ self ,
19
+ filters : list | None = None ,
20
+ options : dict | None = None ,
21
+ additional_information : list | None = None
22
+ ):
27
23
"""
28
24
Fast check for zvol information. Supports `additional_information` to
29
25
expand output on an as-needed basis. Adding additional_information to
30
26
the output may impact performance of 'fast' method.
31
27
"""
28
+ if filters is None :
29
+ filters = []
30
+ if options is None :
31
+ options = dict ()
32
+ if additional_information is None :
33
+ additional_information = []
32
34
33
35
def get_attachments ():
34
36
extents = self .middleware .call_sync (
@@ -113,17 +115,15 @@ def common_encryption_checks(self, id_, ds):
113
115
if not ds .encrypted :
114
116
raise CallError (f'{ id_ } is not encrypted' )
115
117
116
- @accepts (
117
- Str ('id' ),
118
- Dict (
119
- 'load_key_options' ,
120
- Bool ('mount' , default = True ),
121
- Bool ('recursive' , default = False ),
122
- Any ('key' , default = None , null = True ),
123
- Str ('key_location' , default = None , null = True ),
124
- ),
125
- )
126
- def load_key (self , id_ , options ):
118
+ def load_key (self , id_ : str , options : dict | None = None ):
119
+ if options is None :
120
+ options = {
121
+ 'mount' : True ,
122
+ 'recursive' : False ,
123
+ 'key' : None ,
124
+ 'key_location' : None ,
125
+ }
126
+
127
127
mount_ds = options .pop ('mount' )
128
128
recursive = options .pop ('recursive' )
129
129
try :
@@ -138,18 +138,16 @@ def load_key(self, id_, options):
138
138
if mount_ds :
139
139
self .middleware .call_sync ('zfs.dataset.mount' , id_ , {'recursive' : recursive })
140
140
141
- @accepts (
142
- Str ('id' ),
143
- Dict (
144
- 'check_key' ,
145
- Any ('key' , default = None , null = True ),
146
- Str ('key_location' , default = None , null = True ),
147
- )
148
- )
149
- def check_key (self , id_ , options ):
141
+ def check_key (self , id_ : str , options : dict | None = None ):
150
142
"""
151
143
Returns `true` if the `key` is valid, `false` otherwise.
152
144
"""
145
+ if options is None :
146
+ options = {
147
+ 'key' : None ,
148
+ 'key_location' : None ,
149
+ }
150
+
153
151
try :
154
152
with libzfs .ZFS () as zfs :
155
153
ds = zfs .get_dataset (id_ )
@@ -159,16 +157,14 @@ def check_key(self, id_, options):
159
157
self .logger .error (f'Failed to check key for { id_ } ' , exc_info = True )
160
158
raise CallError (f'Failed to check key for { id_ } : { e } ' )
161
159
162
- @accepts (
163
- Str ('id' ),
164
- Dict (
165
- 'unload_key_options' ,
166
- Bool ('recursive' , default = False ),
167
- Bool ('force_umount' , default = False ),
168
- Bool ('umount' , default = False ),
169
- )
170
- )
171
- def unload_key (self , id_ , options ):
160
+ def unload_key (self , id_ : str , options : dict | None = None ):
161
+ if options is None :
162
+ options = {
163
+ 'recursive' : False ,
164
+ 'force_umount' : False ,
165
+ 'umount' : False ,
166
+ }
167
+
172
168
force = options .pop ('force_umount' )
173
169
if options .pop ('umount' ) and self .middleware .call_sync (
174
170
'zfs.dataset.query' , [['id' , '=' , id_ ]], {'extra' : {'retrieve_children' : False }, 'get' : True }
@@ -185,21 +181,14 @@ def unload_key(self, id_, options):
185
181
self .logger .error (f'Failed to unload key for { id_ } ' , exc_info = True )
186
182
raise CallError (f'Failed to unload key for { id_ } : { e } ' )
187
183
188
- @accepts (
189
- Str ('id' ),
190
- Dict (
191
- 'change_key_options' ,
192
- Dict (
193
- 'encryption_properties' ,
194
- Str ('keyformat' ),
195
- Str ('keylocation' ),
196
- Int ('pbkdf2iters' )
197
- ),
198
- Bool ('load_key' , default = True ),
199
- Any ('key' , default = None , null = True ),
200
- ),
201
- )
202
- def change_key (self , id_ , options ):
184
+ def change_key (self , id_ : str , options : dict | None = None ):
185
+ if options is None :
186
+ options = {
187
+ 'encryption_properties' : {},
188
+ 'load_key' : True ,
189
+ 'key' : None ,
190
+ }
191
+
203
192
try :
204
193
with libzfs .ZFS () as zfs :
205
194
ds = zfs .get_dataset (id_ )
@@ -209,24 +198,19 @@ def change_key(self, id_, options):
209
198
self .logger .error (f'Failed to change key for { id_ } ' , exc_info = True )
210
199
raise CallError (f'Failed to change key for { id_ } : { e } ' )
211
200
212
- @accepts (
213
- Str ('id' ),
214
- Dict (
215
- 'change_encryption_root_options' ,
216
- Bool ('load_key' , default = True ),
217
- )
218
- )
219
- def change_encryption_root (self , id_ , options ):
201
+ def change_encryption_root (self , id_ : str , options : dict | None = None ):
202
+ if options is None :
203
+ options = {'load_key' : True }
204
+
220
205
try :
221
206
with libzfs .ZFS () as zfs :
222
207
ds = zfs .get_dataset (id_ )
223
208
ds .change_key (load_key = options ['load_key' ], inherit = True )
224
209
except libzfs .ZFSException as e :
225
210
raise CallError (f'Failed to change encryption root for { id_ } : { e } ' )
226
211
227
- @accepts (Str ('name' ), List ('params' , private = True ))
228
212
@job ()
229
- def bulk_process (self , job , name , params ):
213
+ def bulk_process (self , job , name : str , params : list ):
230
214
f = getattr (self , name , None )
231
215
if not f :
232
216
raise CallError (f'{ name } method not found in zfs.dataset' )
0 commit comments