-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathmain.ml
1769 lines (1677 loc) · 61.6 KB
/
main.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(*
* Copyright (C) Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)
module U = Unix
module R = Rpc
module B = Backtrace
open Core
open Async
open Xapi_storage_script_types
module Plugin_client = Xapi_storage.Plugin.Plugin (Rpc_async.GenClient ())
module Volume_client = Xapi_storage.Control.Volume (Rpc_async.GenClient ())
module Sr_client = Xapi_storage.Control.Sr (Rpc_async.GenClient ())
module Datapath_client = Xapi_storage.Data.Datapath (Rpc_async.GenClient ())
let ( >>>= ) = Deferred.Result.( >>= )
(** Functions for returning SMAPIv2 errors *)
(** Exception returned by fork_exec_rpc when the script invocation fails *)
exception Fork_exec_error of Storage_interface.Errors.error
let backend_error name args =
let open Storage_interface in
Storage_interface.Errors.Backend_error (name, args)
let backend_backtrace_error name args backtrace =
let open Storage_interface in
match args with
| ["Activated_on_another_host"; uuid] ->
Storage_interface.Errors.Activated_on_another_host uuid
| _ ->
let backtrace = rpc_of_backtrace backtrace |> Jsonrpc.to_string in
Storage_interface.Errors.Backend_error_with_backtrace
(name, backtrace :: args)
let missing_uri () =
backend_error "MISSING_URI" ["Please include a URI in the device-config"]
(** Functions to wrap calls to the above client modules and convert their
exceptions and errors into SMAPIv2 errors of type
[Storage_interface.Exception.exnty]. The above client modules should only
be used with these functions, otherwise the Fork_exec_error exception
raised by fork_exec_rpc will not be caught and the main thread will fail. *)
(* fork_exec_rpc either raises a Fork_exec_error exception or
returns a successful RPC response *)
let return_rpc typ result =
(* Operator to unwrap the wrapped async return type of ocaml-rpc's Rpc_async *)
let ( >*= ) a b = a |> Rpc_async.T.get >>= b in
Monitor.try_with ~extract_exn:true (fun () ->
(* We need to delay the evaluation of [result] until now, because
when fork_exec_rpc is called by GenClient.declare, it
might immediately raise a Fork_exec_error *)
result () >*= fun result ->
(* In practice we'll always get a successful RPC response here (Ok),
but we still have to transform the Error to make the types match: *)
let result =
Result.map_error result ~f:(fun err ->
backend_error "SCRIPT_RETURNED_RPC_ERROR"
[Rpcmarshal.marshal typ err |> R.to_string])
in
return result)
>>= function
| Ok result ->
return result
| Error (Fork_exec_error err) ->
return (Error err)
(* We should not get any other exception from fork_exec_rpc: *)
| Error e ->
return
(Error
(backend_error "SCRIPT_FAILED"
["Unexpected exception:" ^ Exn.to_string e]))
let return_volume_rpc result =
return_rpc Xapi_storage.Control.typ_of_exns result
let return_plugin_rpc result = return_rpc Xapi_storage.Common.typ_of_exnt result
let return_data_rpc result = return_rpc Xapi_storage.Common.typ_of_exnt result
let use_syslog = ref false
let log level fmt =
Printf.ksprintf
(fun s ->
if !use_syslog then
(* FIXME: this is synchronous and will block other I/O.
* This should use Log_extended.Syslog, but that brings in Core's Syslog module
* which conflicts with ours *)
Syslog.log Syslog.Daemon level s
else
let w = Lazy.force Writer.stderr in
Writer.write w s ; Writer.newline w)
fmt
let debug fmt = log Syslog.Debug fmt
let info fmt = log Syslog.Info fmt
let warn fmt = log Syslog.Warning fmt
let error fmt = log Syslog.Err fmt
let pvs_version = "3.0"
let supported_api_versions = [pvs_version; "5.0"]
let api_max = List.fold_left ~f:String.max supported_api_versions ~init:""
let id x = x
(** A function that changes the input to make it compatible with an older
script *)
type compat_in = R.t -> R.t
(** A function that changes the output of an older script to make it
compatible with the new interface and ensure it is unmarshalled without
error. *)
type compat_out = R.t -> R.t
module Compat (V : sig
val version : string option ref
end) : sig
(** Module for making the inputs and outputs compatible with the old PVS
version of the storage scripts. *)
type device_config = (Core.String.t, string) Core.List.Assoc.t
val compat_out_volume : compat_out
(** Add the missing [sharable] field to the Dict in [rpc], to ensure the
volume in the output match the new volume record type and is successfully
parsed by rpclib. *)
val compat_out_volumes : compat_out
(** Add the missing [sharable] field to the Dicts in [rpc], to ensure the
volumes in the output match the new volume record type and are
successfully parsed by rpclib. *)
val sr_create :
device_config
-> ( device_config * compat_in * compat_out
, Storage_interface.Errors.error )
Deferred.Result.t
(** Compatiblity for the old PVS version of SR.create, which had signature
[uri -> name -> desc -> config -> unit] *)
val sr_attach :
device_config
-> (compat_in, Storage_interface.Errors.error) Deferred.Result.t
(** Compatiblity for the old PVS version of SR.attach, which had signature
[uri -> sr (=string)] *)
end = struct
type device_config = (Core.String.t, string) Core.List.Assoc.t
type compat_in = R.t -> R.t
type compat_out = R.t -> R.t
let remove field rpc =
match (!V.version, rpc) with
| Some v, R.Dict d when String.(v = pvs_version) ->
R.Dict (List.filter ~f:(fun (k, _) -> String.(k <> field)) d)
| _ ->
rpc
let with_pvs_version f rpc =
match !V.version with
| Some v when String.(v = pvs_version) ->
f rpc
| _ ->
rpc
let add_param_to_input params =
with_pvs_version (function
(* Currently all parameters must be named. In this case, rpclib
currently puts them into a Dict. *)
| R.Dict d ->
R.Dict (List.rev_append params d)
| rpc ->
rpc)
let add_fields_to_dict fields = function
| R.Dict d ->
R.Dict (List.rev_append fields d)
| rpc ->
rpc
let add_fields_to_record_output fields =
with_pvs_version (function
| R.Dict _ as d ->
add_fields_to_dict fields d
| rpc ->
rpc)
let add_fields_to_record_list_output fields =
with_pvs_version (function
| R.Enum l ->
R.Enum (List.map ~f:(add_fields_to_dict fields) l)
| rpc ->
rpc)
let compat_out_volume =
add_fields_to_record_output [("sharable", R.Bool false)]
let compat_out_volumes =
add_fields_to_record_list_output [("sharable", R.Bool false)]
(** Adds the uri parameter to the call from device_config when talking to the
old PVS scripts *)
let compat_uri device_config =
match !V.version with
| Some version when String.(version = pvs_version) -> (
match List.Assoc.find ~equal:String.equal device_config "uri" with
| None ->
return (Error (missing_uri ()))
| Some uri ->
return (Ok (add_param_to_input [("uri", R.String uri)]))
)
| _ ->
return (Ok id)
let sr_create device_config =
compat_uri device_config >>>= fun compat_in ->
let compat_out =
match !V.version with
| Some v when String.(v = pvs_version) -> (
function
(* The PVS version will return nothing *)
| R.Null ->
Rpcmarshal.marshal Xapi_storage.Control.typ_of_configuration
device_config
| rpc ->
rpc
)
| _ ->
id
in
return (Ok (device_config, compat_in, compat_out))
let sr_attach = compat_uri
end
let check_plugin_version_compatible query_result =
let Xapi_storage.Plugin.{name; required_api_version; _} = query_result in
if String.(required_api_version <> api_max) then
warn
"Using deprecated SMAPIv3 API version %s, latest is %s. Update your %s \
plugin!"
required_api_version api_max name ;
if List.mem ~equal:String.equal supported_api_versions required_api_version
then
Deferred.Result.return ()
else
let msg =
Printf.sprintf "%s requires unknown SMAPI API version %s, supported: %s"
name required_api_version
(String.concat ~sep:"," supported_api_versions)
in
return (Error (Storage_interface.Errors.No_storage_plugin_for_sr msg))
module RRD = struct
open Message_switch_async.Protocol_async
let ( >>|= ) m f =
m >>= function
| Ok x ->
f x
| Error y ->
let b = Buffer.create 16 in
let fmt = Format.formatter_of_buffer b in
Client.pp_error fmt y ;
Format.pp_print_flush fmt () ;
raise (Failure (Buffer.contents b))
let switch_rpc queue_name string_of_call response_of_string call =
Client.connect ~switch:queue_name () >>|= fun t ->
Client.rpc ~t ~queue:queue_name ~body:(string_of_call call) () >>|= fun s ->
return (response_of_string s)
let rpc =
switch_rpc !Rrd_interface.queue_name Jsonrpc.string_of_call
Jsonrpc.response_of_string
module Client = Rrd_interface.RPC_API (Rpc_async.GenClient ())
end
let _nonpersistent = "NONPERSISTENT"
let _clone_on_boot_key = "clone-on-boot"
let _vdi_type_key = "vdi-type"
let _snapshot_time_key = "snapshot_time"
let _is_a_snapshot_key = "is_a_snapshot"
let _snapshot_of_key = "snapshot_of"
let is_executable path =
Sys.is_file ~follow_symlinks:true path >>= function
| `No | `Unknown ->
return (Error (`missing path))
| `Yes -> (
Unix.access path [`Exec] >>= function
| Error exn ->
return (Error (`not_executable (path, exn)))
| Ok () ->
return (Ok ())
)
module Script = struct
(** We cache (lowercase script name -> original script name) mapping for the
scripts in the root directory of every registered plugin. *)
let name_mapping = String.Table.create ~size:4 ()
let update_mapping ~script_dir =
Sys.readdir script_dir >>| Array.to_list >>| fun files ->
(* If there are multiple files which map to the same lowercase string, we
just take the first one, instead of failing *)
let mapping =
List.zip_exn files files
|> String.Caseless.Map.of_alist_reduce ~f:String.min
in
Hashtbl.set name_mapping ~key:script_dir ~data:mapping
let path ~script_dir ~script_name =
let find () =
let cached_script_name =
let ( >>?= ) = Option.( >>= ) in
Hashtbl.find name_mapping script_dir >>?= fun mapping ->
Core.String.Caseless.Map.find mapping script_name
in
let script_name = Option.value cached_script_name ~default:script_name in
let path = Filename.concat script_dir script_name in
is_executable path >>| function Ok () -> Ok path | Error _ as e -> e
in
find () >>= function
| Ok path ->
return (Ok path)
| Error _ ->
update_mapping ~script_dir >>= fun () -> find ()
end
(** Call the script named after the RPC method in the [script_dir]
directory. The arguments (not the whole JSON-RPC call) are passed as JSON
to its stdin, and stdout is returned. In case of a non-zero exit code,
stdout is treated as the error report.
The rest of the parameters are for compatiblity with the old PVS scripts:
- The PVS storage scripts are missing some calls. If [missing] is [Some
value], this [value] will be returned in case the required script is missing.
- If [compat_in] or [compat_out] are defined, they will convert the input to
the script and the output from the script, respectively, to ensure that
the script can understand the input and that rpclib can unmarshal its
output.
This function either returns a successful RPC response, or raises
Fork_exec_error with a suitable SMAPIv2 error if the call failed. *)
let fork_exec_rpc :
script_dir:string
-> ?missing:R.t
-> ?compat_in:compat_in
-> ?compat_out:compat_out
-> R.call
-> R.response Deferred.t =
fun ~script_dir ?missing ?(compat_in = id) ?(compat_out = id) ->
let invoke_script call script_name :
(R.response, Storage_interface.Errors.error) Deferred.Result.t =
Process.create ~prog:script_name ~args:["--json"] () >>= function
| Error e ->
error "%s failed: %s" script_name (Error.to_string_hum e) ;
return
(Error
(backend_error "SCRIPT_FAILED"
[script_name; Error.to_string_hum e]))
| Ok p -> (
(* Send the request as json on stdin *)
let w = Process.stdin p in
(* We pass just the args, not the complete JSON-RPC call.
Currently the Python code generated by rpclib requires all params to
be named - they will be converted into a name->value Python dict.
Rpclib currently puts all named params into a dict, so we expect
params to be a single Dict, if all the params are named. *)
( match call.R.params with
| [(R.Dict _ as d)] ->
return (Ok d)
| _ ->
return
(Error
(backend_error "INCORRECT_PARAMETERS"
[
script_name
; "All the call parameters should be named and should be \
in a RPC Dict"
]))
)
>>>= fun args ->
let args = compat_in args in
Writer.write w (Jsonrpc.to_string args) ;
Writer.close w >>= fun () ->
Process.collect_output_and_wait p >>= fun output ->
match output.Process.Output.exit_status with
| Error (`Exit_non_zero code) -> (
(* Expect an exception and backtrace on stdout *)
match
Or_error.try_with (fun () ->
Jsonrpc.of_string output.Process.Output.stdout)
with
| Error _ ->
error "%s failed and printed bad error json: %s" script_name
output.Process.Output.stdout ;
error "%s failed, stderr: %s" script_name
output.Process.Output.stderr ;
return
(Error
(backend_error "SCRIPT_FAILED"
[
script_name
; "non-zero exit and bad json on stdout"
; string_of_int code
; output.Process.Output.stdout
; output.Process.Output.stdout
]))
| Ok response -> (
match Or_error.try_with (fun () -> error_of_rpc response) with
| Error _ ->
error "%s failed and printed bad error json: %s" script_name
output.Process.Output.stdout ;
error "%s failed, stderr: %s" script_name
output.Process.Output.stderr ;
return
(Error
(backend_error "SCRIPT_FAILED"
[
script_name
; "non-zero exit and bad json on stdout"
; string_of_int code
; output.Process.Output.stdout
; output.Process.Output.stdout
]))
| Ok x ->
return
(Error (backend_backtrace_error x.code x.params x.backtrace))
)
)
| Error (`Signal signal) ->
error "%s caught a signal and failed" script_name ;
return
(Error
(backend_error "SCRIPT_FAILED"
[
script_name
; "signalled"
; Signal.to_string signal
; output.Process.Output.stdout
; output.Process.Output.stdout
]))
| Ok () -> (
(* Parse the json on stdout. We get back a JSON-RPC
value from the scripts, not a complete JSON-RPC response *)
match
Or_error.try_with (fun () ->
Jsonrpc.of_string output.Process.Output.stdout)
with
| Error _ ->
error "%s succeeded but printed bad json: %s" script_name
output.Process.Output.stdout ;
return
(Error
(backend_error "SCRIPT_FAILED"
[
script_name
; "bad json on stdout"
; output.Process.Output.stdout
]))
| Ok response ->
info "%s succeeded: %s" script_name output.Process.Output.stdout ;
let response = compat_out response in
let response = R.success response in
return (Ok response)
)
)
in
let script_rpc call :
(R.response, Storage_interface.Errors.error) Deferred.Result.t =
info "%s" (Jsonrpc.string_of_call call) ;
Script.path ~script_dir ~script_name:call.R.name >>= function
| Error (`missing path) -> (
error "%s is not a file" path ;
match missing with
| None ->
return
(Error
(backend_error "SCRIPT_MISSING"
[
path
; "Check whether the file exists and has correct \
permissions"
]))
| Some m ->
warn
"Deprecated: script '%s' is missing, treating as no-op. Update \
your plugin!"
path ;
return (Ok (R.success m))
)
| Error (`not_executable (path, exn)) ->
error "%s is not executable" path ;
return
(Error
(backend_error "SCRIPT_NOT_EXECUTABLE" [path; Exn.to_string exn]))
| Ok path ->
invoke_script call path
in
(* The Errors we return from this function and the special error format
returned by the scripts are not included in the error types of the various
SMAPIv3 interfaces, therefore we have to propagate them as exceptions
instead of returning an RPC call with an error, because rpclib would fail
to unmarshal that error.
Therefore we either return a successful RPC response, or raise
Fork_exec_error with a suitable SMAPIv2 error if the call failed. *)
let rpc : R.call -> R.response Deferred.t =
fun call ->
script_rpc call >>= fun result ->
Result.map_error ~f:(fun e -> Fork_exec_error e) result
|> Result.ok_exn
|> return
in
rpc
module Attached_SRs = struct
type state = {sr: string; uids: string list} [@@deriving sexp]
let sr_table : state String.Table.t ref = ref (String.Table.create ())
let state_path = ref None
let add smapiv2 plugin uids =
let key = Storage_interface.Sr.string_of smapiv2 in
Hashtbl.set !sr_table ~key ~data:{sr= plugin; uids} ;
( match !state_path with
| None ->
return ()
| Some path ->
let contents =
String.Table.sexp_of_t sexp_of_state !sr_table
|> Sexplib.Sexp.to_string
in
let dir = Filename.dirname path in
Unix.mkdir ~p:() dir >>= fun () -> Writer.save path ~contents
)
>>= fun () -> return (Ok ())
let find smapiv2 =
let key = Storage_interface.Sr.string_of smapiv2 in
match Hashtbl.find !sr_table key with
| None ->
let open Storage_interface in
return (Error (Errors.Sr_not_attached key))
| Some {sr; _} ->
return (Ok sr)
let get_uids smapiv2 =
let key = Storage_interface.Sr.string_of smapiv2 in
match Hashtbl.find !sr_table key with
| None ->
let open Storage_interface in
return (Error (Errors.Sr_not_attached key))
| Some {uids; _} ->
return (Ok uids)
let remove smapiv2 =
let key = Storage_interface.Sr.string_of smapiv2 in
Hashtbl.remove !sr_table key ;
return (Ok ())
let reload path =
state_path := Some path ;
Sys.is_file ~follow_symlinks:true path >>= function
| `No | `Unknown ->
return ()
| `Yes ->
Reader.file_contents path >>= fun contents ->
sr_table :=
contents
|> Sexplib.Sexp.of_string
|> String.Table.t_of_sexp state_of_sexp ;
return ()
end
module Datapath_plugins = struct
let table = String.Table.create ()
let register ~datapath_root datapath_plugin_name =
let result =
let script_dir = Filename.concat datapath_root datapath_plugin_name in
return_plugin_rpc (fun () ->
Plugin_client.query (fork_exec_rpc ~script_dir) "register")
>>>= fun response ->
check_plugin_version_compatible response >>= function
| Ok () ->
info "Registered datapath plugin %s" datapath_plugin_name ;
Hashtbl.set table ~key:datapath_plugin_name
~data:(script_dir, response) ;
return (Ok ())
| Error e ->
let err_msg =
Storage_interface.(rpc_of Errors.error) e |> Jsonrpc.to_string
in
info "Failed to register datapath plugin %s: %s" datapath_plugin_name
err_msg ;
return (Error e)
in
(* We just do not register the plugin if we've encountered any error. In
the future we might want to change that, so we keep the error result
above. *)
result >>= fun _ -> return ()
let unregister datapath_plugin_name =
Hashtbl.remove table datapath_plugin_name ;
return ()
let supports_feature scheme feature =
match Hashtbl.find table scheme with
| None ->
false
| Some (_script_dir, query_result) ->
List.mem query_result.Xapi_storage.Plugin.features feature
~equal:String.equal
end
let vdi_of_volume x =
let find key ~default ~of_string =
match
List.Assoc.find x.Xapi_storage.Control.keys key ~equal:String.equal
with
| None ->
default
| Some v ->
v |> of_string
in
let find_string = find ~of_string:id in
let open Storage_interface in
{
vdi= Vdi.of_string x.Xapi_storage.Control.key
; uuid= x.Xapi_storage.Control.uuid
; content_id= ""
; name_label= x.Xapi_storage.Control.name
; name_description= x.Xapi_storage.Control.description
; ty= find_string _vdi_type_key ~default:""
; metadata_of_pool= ""
; is_a_snapshot=
find _is_a_snapshot_key ~default:false ~of_string:bool_of_string
; snapshot_time= find_string _snapshot_time_key ~default:"19700101T00:00:00Z"
; snapshot_of= Vdi.of_string (find_string _snapshot_of_key ~default:"")
; read_only= not x.Xapi_storage.Control.read_write
; cbt_enabled= false
; virtual_size= x.Xapi_storage.Control.virtual_size
; physical_utilisation= x.Xapi_storage.Control.physical_utilisation
; sm_config= []
; sharable= x.Xapi_storage.Control.sharable
; persistent= true
}
let choose_datapath ?(persistent = true) domain response =
(* We can only use a URI with a valid scheme, since we use the scheme
to name the datapath plugin. *)
let possible =
List.filter_map
~f:(fun x ->
let uri = Uri.of_string x in
match Uri.scheme uri with
| None ->
None
| Some scheme ->
Some (scheme, x))
response.Xapi_storage.Control.uri
in
(* We can only use URIs whose schemes correspond to registered plugins *)
let possible =
List.filter_map
~f:(fun (scheme, uri) ->
match Hashtbl.find Datapath_plugins.table scheme with
| Some (script_dir, _query_result) ->
Some (script_dir, scheme, uri)
| None ->
None)
possible
in
(* If we want to be non-persistent, we prefer if the datapath plugin supports it natively *)
let preference_order =
if persistent then
possible
else
let supports_nonpersistent, others =
List.partition_tf
~f:(fun (_script_dir, scheme, _uri) ->
Datapath_plugins.supports_feature scheme _nonpersistent)
possible
in
supports_nonpersistent @ others
in
match preference_order with
| [] ->
return (Error (missing_uri ()))
| (script_dir, scheme, u) :: us ->
return (Ok (fork_exec_rpc ~script_dir, scheme, u, domain))
(* Bind the implementations *)
let bind ~volume_script_dir =
(* Each plugin has its own version, see the call to listen
where `process` is partially applied. *)
let module S = Storage_interface.StorageAPI (Rpc_async.GenServer ()) in
let version = ref None in
let volume_rpc = fork_exec_rpc ~script_dir:volume_script_dir in
let module Compat = Compat (struct let version = version end) in
let stat ~dbg ~sr ~vdi =
(* TODO add default value to sharable? *)
return_volume_rpc (fun () ->
Volume_client.stat
(volume_rpc ~compat_out:Compat.compat_out_volume)
dbg sr vdi)
in
let clone ~dbg ~sr ~vdi =
return_volume_rpc (fun () -> Volume_client.clone volume_rpc dbg sr vdi)
in
let destroy ~dbg ~sr ~vdi =
return_volume_rpc (fun () -> Volume_client.destroy volume_rpc dbg sr vdi)
in
let set ~dbg ~sr ~vdi ~key ~value =
(* this is wrong, we loose the VDI type, but old pvsproxy didn't have
* Volume.set and Volume.unset *)
(* TODO handle this properly? *)
let missing =
Option.bind !version (fun v ->
if String.(v = pvs_version) then Some (R.rpc_of_unit ()) else None)
in
return_volume_rpc (fun () ->
Volume_client.set (volume_rpc ?missing) dbg sr vdi key value)
in
let unset ~dbg ~sr ~vdi ~key =
let missing =
Option.bind !version (fun v ->
if String.(v = pvs_version) then Some (R.rpc_of_unit ()) else None)
in
return_volume_rpc (fun () ->
Volume_client.unset (volume_rpc ?missing) dbg sr vdi key)
in
let update_keys ~dbg ~sr ~key ~value response =
let open Deferred.Result.Monad_infix in
match value with
| None ->
Deferred.Result.return response
| Some value ->
set ~dbg ~sr ~vdi:response.Xapi_storage.Control.key ~key ~value
>>= fun () ->
Deferred.Result.return
{response with keys= (key, value) :: response.keys}
in
let vdi_attach_common dbg sr vdi domain =
let open Deferred.Result.Monad_infix in
Attached_SRs.find sr >>= fun sr ->
(* Discover the URIs using Volume.stat *)
stat ~dbg ~sr ~vdi >>= fun response ->
(* If we have a clone-on-boot volume then use that instead *)
( match
List.Assoc.find response.Xapi_storage.Control.keys _clone_on_boot_key
~equal:String.equal
with
| None ->
return (Ok response)
| Some temporary ->
stat ~dbg ~sr ~vdi:temporary
)
>>= fun response ->
choose_datapath domain response >>= fun (rpc, datapath, uri, domain) ->
return_data_rpc (fun () -> Datapath_client.attach rpc dbg uri domain)
in
let wrap th = Rpc_async.T.put th in
(* the actual API call for this plugin, sharing same version ref across all calls *)
let query_impl dbg =
let th =
return_plugin_rpc (fun () -> Plugin_client.query volume_rpc dbg)
>>>= fun response ->
let required_api_version =
response.Xapi_storage.Plugin.required_api_version
in
(* the first call to a plugin must be a Query.query that sets the version *)
version := Some required_api_version ;
check_plugin_version_compatible response >>>= fun () ->
(* Convert between the xapi-storage interface and the SMAPI *)
let features =
List.map
~f:(function "VDI_DESTROY" -> "VDI_DELETE" | x -> x)
response.Xapi_storage.Plugin.features
in
(* Look for executable scripts and automatically add capabilities *)
let rec loop acc = function
| [] ->
return (Ok acc)
| (script_name, capability) :: rest -> (
Script.path ~script_dir:volume_script_dir ~script_name >>= function
| Error _ ->
loop acc rest
| Ok _ ->
loop (capability :: acc) rest
)
in
loop []
[
("SR.attach", "SR_ATTACH")
; ("SR.create", "SR_CREATE")
; ("SR.destroy", "SR_DELETE")
; ("SR.detach", "SR_DETACH")
; ("SR.ls", "SR_SCAN")
; ("SR.stat", "SR_UPDATE")
; ("SR.probe", "SR_PROBE")
; ("Volume.create", "VDI_CREATE")
; ("Volume.clone", "VDI_CLONE")
; ("Volume.snapshot", "VDI_SNAPSHOT")
; ("Volume.resize", "VDI_RESIZE")
; ("Volume.destroy", "VDI_DELETE")
; ("Volume.stat", "VDI_UPDATE")
]
>>>= fun x ->
let features = features @ x in
(* Add the features we always have *)
let features =
features
@ [
"VDI_ATTACH"
; "VDI_DETACH"
; "VDI_ACTIVATE"
; "VDI_DEACTIVATE"
; "VDI_INTRODUCE"
]
in
(* If we have the ability to clone a disk then we can provide
clone on boot. *)
let features =
if List.mem features "VDI_CLONE" ~equal:String.equal then
"VDI_RESET_ON_BOOT/2" :: features
else
features
in
let name = response.Xapi_storage.Plugin.name in
Deferred.Result.return
{
Storage_interface.driver= response.Xapi_storage.Plugin.plugin
; name
; description= response.Xapi_storage.Plugin.description
; vendor= response.Xapi_storage.Plugin.vendor
; copyright= response.Xapi_storage.Plugin.copyright
; version= response.Xapi_storage.Plugin.version
; required_api_version
; features
; configuration= response.Xapi_storage.Plugin.configuration
; required_cluster_stack=
response.Xapi_storage.Plugin.required_cluster_stack
}
in
wrap th
in
S.Query.query query_impl ;
let query_diagnostics_impl dbg =
let th =
let open Deferred.Result.Monad_infix in
return_plugin_rpc (fun () -> Plugin_client.diagnostics volume_rpc dbg)
>>= fun response -> Deferred.Result.return response
in
wrap th
in
S.Query.diagnostics query_diagnostics_impl ;
let sr_attach_impl dbg sr device_config =
let th =
Compat.sr_attach device_config >>>= fun compat_in ->
return_volume_rpc (fun () ->
Sr_client.attach (volume_rpc ~compat_in) dbg device_config)
>>>= fun attach_response ->
(* Stat the SR to look for datasources *)
(* SR.stat should take the attached URI *)
return_volume_rpc (fun () ->
Sr_client.stat volume_rpc dbg attach_response)
>>>= fun stat ->
let rec loop acc = function
| [] ->
return acc
| datasource :: datasources -> (
let uri = Uri.of_string datasource in
match Uri.scheme uri with
| Some "xeno+shm" -> (
let uid = Uri.path uri in
let uid =
if String.length uid > 1 then
String.sub uid ~pos:1 ~len:(String.length uid - 1)
else
uid
in
RRD.Client.Plugin.Local.register RRD.rpc uid Rrd.Five_Seconds
Rrd_interface.V2
|> Rpc_async.T.get
>>= function
| Ok _ ->
loop (uid :: acc) datasources
| Error x ->
raise Rrd_interface.(Rrdd_error x)
)
| _ ->
loop acc datasources
)
in
loop [] stat.Xapi_storage.Control.datasources >>= fun uids ->
(* associate the 'sr' from the plugin with the SR reference passed in *)
Attached_SRs.add sr attach_response uids >>>= fun () ->
Deferred.Result.return ()
in
wrap th
in
S.SR.attach sr_attach_impl ;
let sr_detach_impl dbg sr =
let th =
Attached_SRs.find sr >>= function
| Error _ ->
(* ensure SR.detach is idempotent *)
Deferred.Result.return ()
| Ok sr' ->
return_volume_rpc (fun () -> Sr_client.detach volume_rpc dbg sr')
>>>= fun response ->
Attached_SRs.get_uids sr >>>= fun uids ->
let rec loop = function
| [] ->
return ()
| datasource :: datasources -> (
let uri = Uri.of_string datasource in
match Uri.scheme uri with
| Some "xeno+shm" -> (
let uid = Uri.path uri in
let uid =
if String.length uid > 1 then
String.sub uid ~pos:1 ~len:(String.length uid - 1)
else
uid
in
RRD.Client.Plugin.Local.deregister RRD.rpc uid
|> Rpc_async.T.get
>>= function
| Ok _ ->
loop datasources
| Error x ->
raise Rrd_interface.(Rrdd_error x)
)
| _ ->
loop datasources
)
in
loop uids >>= fun () ->
let open Deferred.Result.Monad_infix in
Attached_SRs.remove sr >>= fun () -> Deferred.Result.return response
in
wrap th
in
S.SR.detach sr_detach_impl ;
let sr_probe_impl dbg queue device_config sm_config =
let th =
return_volume_rpc (fun () -> Sr_client.probe volume_rpc dbg device_config)
>>>= fun response ->
let pp_probe_result () probe_result =
Rpcmarshal.marshal Xapi_storage.Control.typ_of_probe_result probe_result
|> Jsonrpc.to_string
in
response
|> List.map ~f:(fun probe_result ->
let uuid =
List.Assoc.find probe_result.Xapi_storage.Control.configuration
~equal:String.equal "sr_uuid"
in
let open Deferred.Or_error in
let smapiv2_probe ?sr_info () =
{
Storage_interface.configuration= probe_result.configuration
; complete= probe_result.complete
; sr= sr_info
; extra_info= probe_result.extra_info
}
in
match
( probe_result.Xapi_storage.Control.sr