8
8
import pathlib
9
9
import secrets
10
10
import urllib
11
+ import urllib .parse
11
12
import urllib .request
12
13
from typing import Any , Dict , List , Optional , Tuple , Union
13
14
20
21
logger = logging .getLogger ("zocalo.util.rabbitmq" )
21
22
22
23
24
+ def _quote (arg : str ) -> str :
25
+ """URL-quote a VHost name (which can contain /)"""
26
+ return urllib .parse .quote (arg , safe = [])
27
+
28
+
23
29
class MessageStats (BaseModel ):
24
30
publish : Optional [int ] = Field (None , description = "Count of messages published." )
25
31
@@ -750,7 +756,7 @@ def bindings(
750
756
) -> List [BindingInfo ]:
751
757
endpoint = "bindings"
752
758
if vhost is not None :
753
- endpoint = f"{ endpoint } /{ vhost } "
759
+ endpoint = f"{ endpoint } /{ _quote ( vhost ) } "
754
760
_check = {source , destination , destination_type }
755
761
if None in _check and len (_check ) > 1 :
756
762
raise ValueError (
@@ -789,7 +795,9 @@ def bindings_delete(
789
795
):
790
796
# If properties_key is not specified then all bindings between the specified
791
797
# source and destination are deleted
792
- endpoint = f"bindings/{ vhost } /e/{ source } /{ destination_type } /{ destination } "
798
+ endpoint = (
799
+ f"bindings/{ _quote (vhost )} /e/{ source } /{ destination_type } /{ destination } "
800
+ )
793
801
if properties_key is None :
794
802
dest_map = {"queue" : "q" , "exchange" : "e" }
795
803
@@ -834,11 +842,11 @@ def exchanges(
834
842
) -> Union [List [ExchangeInfo ], ExchangeInfo ]:
835
843
endpoint = "exchanges"
836
844
if vhost is not None and name is not None :
837
- endpoint = f"{ endpoint } /{ vhost } /{ name } /"
845
+ endpoint = f"{ endpoint } /{ _quote ( vhost ) } /{ name } /"
838
846
response = self .get (endpoint )
839
847
return ExchangeInfo (** response .json ())
840
848
elif vhost is not None :
841
- endpoint = f"{ endpoint } /{ vhost } /"
849
+ endpoint = f"{ endpoint } /{ _quote ( vhost ) } /"
842
850
elif name is not None :
843
851
raise ValueError ("name can not be set without vhost" )
844
852
response = self .get (endpoint )
@@ -853,19 +861,19 @@ def exchange_declare(self, exchange: ExchangeSpec):
853
861
response .raise_for_status ()
854
862
855
863
def exchange_delete (self , vhost : str , name : str , if_unused : bool = False ):
856
- endpoint = f"exchanges/{ vhost } /{ name } "
864
+ endpoint = f"exchanges/{ _quote ( vhost ) } /{ name } "
857
865
response = self .delete (endpoint , params = {"if-unused" : if_unused })
858
866
response .raise_for_status ()
859
867
860
868
def policies (self , vhost : Optional [str ] = None ) -> List [PolicySpec ]:
861
869
endpoint = "policies"
862
870
if vhost is not None :
863
- endpoint = f"{ endpoint } /{ vhost } /"
871
+ endpoint = f"{ endpoint } /{ _quote ( vhost ) } /"
864
872
response = self .get (endpoint )
865
873
return [PolicySpec (** p ) for p in response .json ()]
866
874
867
875
def policy (self , vhost : str , name : str ) -> PolicySpec :
868
- endpoint = f"policies/{ vhost } /{ name } /"
876
+ endpoint = f"policies/{ _quote ( vhost ) } /{ name } /"
869
877
response = self .get (endpoint )
870
878
return PolicySpec (** response .json ())
871
879
@@ -880,7 +888,7 @@ def set_policy(self, policy: PolicySpec):
880
888
response .raise_for_status ()
881
889
882
890
def clear_policy (self , vhost : str , name : str ):
883
- endpoint = f"policies/{ vhost } /{ name } /"
891
+ endpoint = f"policies/{ _quote ( vhost ) } /{ name } /"
884
892
response = self .delete (endpoint )
885
893
response .raise_for_status ()
886
894
@@ -889,11 +897,11 @@ def queues(
889
897
) -> Union [List [QueueInfo ], QueueInfo ]:
890
898
endpoint = "queues"
891
899
if vhost is not None and name is not None :
892
- endpoint = f"{ endpoint } /{ vhost } /{ name } "
900
+ endpoint = f"{ endpoint } /{ _quote ( vhost ) } /{ name } "
893
901
response = self .get (endpoint )
894
902
return QueueInfo (** response .json ())
895
903
elif vhost is not None :
896
- endpoint = f"{ endpoint } /{ vhost } "
904
+ endpoint = f"{ endpoint } /{ _quote ( vhost ) } "
897
905
elif name is not None :
898
906
raise ValueError ("name can not be set without vhost" )
899
907
response = self .get (endpoint )
@@ -910,7 +918,8 @@ def queue_declare(self, queue: QueueSpec):
910
918
def queue_delete (
911
919
self , vhost : str , name : str , if_unused : bool = False , if_empty : bool = False
912
920
):
913
- endpoint = f"queues/{ vhost } /{ name } "
921
+ logger .debug (f"Deleting queue { _quote (vhost )} /{ name } " )
922
+ endpoint = f"queues/{ _quote (vhost )} /{ name } "
914
923
response = self .delete (
915
924
endpoint , params = {"if-unused" : if_unused , "if-empty" : if_empty }
916
925
)
@@ -931,7 +940,7 @@ def permissions(
931
940
) -> List [PermissionSpec ] | PermissionSpec :
932
941
endpoint = "permissions"
933
942
if vhost is not None and user is not None :
934
- endpoint = f"{ endpoint } /{ vhost } /{ user } /"
943
+ endpoint = f"{ endpoint } /{ _quote ( vhost ) } /{ user } /"
935
944
response = self .get (endpoint )
936
945
return PermissionSpec (** response .json ())
937
946
elif vhost is not None or user is not None :
@@ -950,7 +959,7 @@ def set_permissions(self, permission: PermissionSpec):
950
959
response .raise_for_status ()
951
960
952
961
def clear_permissions (self , vhost : str , user : str ):
953
- endpoint = f"permissions/{ vhost } /{ user } /"
962
+ endpoint = f"permissions/{ _quote ( vhost ) } /{ user } /"
954
963
response = self .delete (endpoint )
955
964
response .raise_for_status ()
956
965
0 commit comments