3
3
4
4
(c) 2006-2009, Armin Rigo, Holger Krekel, Maciej Fijalkowski
5
5
"""
6
+ from __future__ import annotations
7
+
6
8
import os
7
9
import stat
8
10
from hashlib import md5
9
11
from queue import Queue
12
+ from typing import Callable
13
+ from typing import Type
10
14
11
15
import execnet .rsync_remote
16
+ from execnet .gateway_base import Channel
17
+ from execnet .multi import MultiChannel
12
18
13
19
14
20
class RSync :
@@ -21,51 +27,54 @@ class RSync:
21
27
a path on remote side).
22
28
"""
23
29
24
- def __init__ (self , sourcedir , callback = None , verbose = True ):
30
+ def __init__ (self , sourcedir : str | os . PathLike [ str ] , callback = None , verbose = True ):
25
31
self ._sourcedir = str (sourcedir )
26
32
self ._verbose = verbose
27
33
assert callback is None or hasattr (callback , "__call__" )
28
34
self ._callback = callback
29
- self ._channels = {}
30
- self ._receivequeue = Queue ()
31
- self ._links = []
35
+ self ._channels : dict [Channel , Callable [[], None ] | None ] = {}
36
+ self ._links : list [tuple [str , str , str ]] = []
32
37
33
38
def filter (self , path ):
34
39
return True
35
40
36
- def _end_of_channel (self , channel ):
41
+ def _end_of_channel (self , channel , data ):
37
42
if channel in self ._channels :
38
43
# too early! we must have got an error
39
44
channel .waitclose ()
40
45
# or else we raise one
41
46
raise OSError (f"connection unexpectedly closed: { channel .gateway } " )
42
47
43
- def _process_link (self , channel ):
48
+ def _process_link (self , channel , data ):
44
49
for link in self ._links :
45
50
channel .send (link )
46
51
# completion marker, this host is done
47
52
channel .send (42 )
48
53
49
- def _done (self , channel ):
54
+ def _done (self , channel , data ):
50
55
"""Call all callbacks"""
51
56
finishedcallback = self ._channels .pop (channel )
52
57
if finishedcallback :
53
58
finishedcallback ()
54
59
channel .waitclose ()
55
60
56
- def _list_done (self , channel ):
61
+ def _ack (self , channel , data ):
62
+ if self ._callback :
63
+ self ._callback ("ack" , self ._paths [data ], channel )
64
+
65
+ def _list_done (self , channel , data ):
57
66
# sum up all to send
58
67
if self ._callback :
59
- s = sum ([ self ._paths [i ] for i in self ._to_send [channel ] ])
68
+ s = sum (self ._paths [i ] for i in self ._to_send [channel ])
60
69
self ._callback ("list" , s , channel )
61
70
62
71
def _send_item (self , channel , data ):
63
72
"""Send one item"""
64
73
modified_rel_path , checksum = data
65
74
modifiedpath = os .path .join (self ._sourcedir , * modified_rel_path )
66
75
try :
67
- f = open (modifiedpath , "rb" )
68
- data = f .read ()
76
+ with open (modifiedpath , "rb" ) as fp :
77
+ data = fp .read ()
69
78
except OSError :
70
79
data = None
71
80
@@ -81,7 +90,6 @@ def _send_item(self, channel, data):
81
90
# print "sending", modified_rel_path, data and len(data) or 0, checksum
82
91
83
92
if data is not None :
84
- f .close ()
85
93
if checksum is not None and checksum == md5 (data ).digest ():
86
94
data = None # not really modified
87
95
else :
@@ -92,7 +100,7 @@ def _report_send_file(self, gateway, modified_rel_path):
92
100
if self ._verbose :
93
101
print (f"{ gateway } <= { modified_rel_path } " )
94
102
95
- def send (self , raises = True ):
103
+ def send (self , raises : bool = True ) -> None :
96
104
"""Sends a sourcedir to all added targets. Flag indicates
97
105
whether to raise an error or return in case of lack of
98
106
targets
@@ -110,45 +118,34 @@ def send(self, raises=True):
110
118
111
119
# paths and to_send are only used for doing
112
120
# progress-related callbacks
113
- self ._paths = {}
114
- self ._to_send = {}
121
+ self ._paths : dict [str , int ] = {}
122
+ self ._to_send : dict [Channel , list [str ]] = {}
123
+
124
+ mch = MultiChannel (list (self ._channels ))
125
+ rq = mch .make_receive_queue (endmarker = (None , None ))
115
126
116
127
# send modified file to clients
128
+ commands : dict [str | None , Callable ] = {
129
+ None : self ._end_of_channel ,
130
+ "links" : self ._process_link ,
131
+ "done" : self ._done ,
132
+ "ack" : self ._ack ,
133
+ "send" : self ._send_item ,
134
+ "list_done" : self ._list_done ,
135
+ }
136
+
117
137
while self ._channels :
118
- channel , req = self ._receivequeue .get ()
119
- if req is None :
120
- self ._end_of_channel (channel )
121
- else :
122
- command , data = req
123
- if command == "links" :
124
- self ._process_link (channel )
125
- elif command == "done" :
126
- self ._done (channel )
127
- elif command == "ack" :
128
- if self ._callback :
129
- self ._callback ("ack" , self ._paths [data ], channel )
130
- elif command == "list_done" :
131
- self ._list_done (channel )
132
- elif command == "send" :
133
- self ._send_item (channel , data )
134
- del data
135
- else :
136
- assert "Unknown command %s" % command
137
-
138
- def add_target (self , gateway , destdir , finishedcallback = None , ** options ):
138
+ channel , (command , data ) = rq .get ()
139
+ assert command in commands , "Unknown command %s" % command
140
+ commands [command ](channel , data )
141
+
142
+ def add_target (self , gateway , destdir , finishedcallback = None , delete : bool = False ):
139
143
"""Adds a remote target specified via a gateway
140
144
and a remote destination directory.
141
145
"""
142
- for name in options :
143
- assert name in ("delete" ,)
144
-
145
- def itemcallback (req ):
146
- self ._receivequeue .put ((channel , req ))
147
-
148
- channel = gateway .remote_exec (execnet .rsync_remote )
149
- channel .reconfigure (py2str_as_py3str = False , py3str_as_py2str = False )
150
- channel .setcallback (itemcallback , endmarker = None )
151
- channel .send ((str (destdir ), options ))
146
+ channel = gateway .remote_exec (
147
+ execnet .rsync_remote .serve_rsync , destdir = str (destdir ), delete = delete
148
+ )
152
149
self ._channels [channel ] = finishedcallback
153
150
154
151
def _broadcast (self , msg ):
0 commit comments