11import logging
2- from queue import Queue
3- from gi . repository import GObject
2+ from queue import Empty , Queue
3+ from threading import Lock
44
5+ from gi .repository import GObject
56from lib .commands import ControlServerCommands
6- from lib .tcpmulticonnection import TCPMultiConnection
77from lib .response import NotifyResponse
8+ from lib .tcpmulticonnection import TCPMultiConnection
89
910from vocto .port import Port
1011
@@ -18,6 +19,9 @@ def __init__(self, pipeline):
1819
1920 self .command_queue = Queue ()
2021
22+ self .on_loop_lock = Lock ()
23+ self .on_loop_active = False
24+
2125 self .commands = ControlServerCommands (pipeline )
2226
2327 def on_accepted (self , conn , addr ):
@@ -59,8 +63,11 @@ def on_data(self, conn, _, leftovers, *args):
5963 self .close_connection (conn )
6064 return False
6165
62- self .log .debug ('re-starting on_loop scheduling' )
63- GObject .idle_add (self .on_loop )
66+ with self .on_loop_lock :
67+ if not self .on_loop_active :
68+ self .log .debug ('re-starting on_loop scheduling' )
69+ GObject .idle_add (self .on_loop )
70+ self .on_loop_active = True
6471
6572 self .command_queue .put ((line , conn ))
6673
@@ -78,25 +85,23 @@ def on_loop(self):
7885 '''Command handler. Processes commands in the command queue whenever
7986 nothing else is happening (registered as GObject idle callback)'''
8087
81- self .log . debug ( 'on_loop called' )
82-
83- if self .command_queue .empty ():
84- self .log .debug ('command_queue is empty again, '
85- 'stopping on_loop scheduling' )
86- return False
87-
88- line , requestor = self . command_queue . get ()
88+ with self .on_loop_lock :
89+ try :
90+ line , requestor = self .command_queue .get_nowait ()
91+ self .log .debug (f'on_loop { line = } { requestor = } ' )
92+ except Empty :
93+ self . log . debug ( 'command_queue is empty again, stopping on_loop scheduling' )
94+ self . on_loop_active = False
95+ return False
8996
9097 words = line .split ()
9198 if len (words ) < 1 :
92- self .log .debug ('command_queue is empty again, '
93- 'stopping on_loop scheduling' )
99+ self .log .debug (f'command_queue contained { line !r} , which is invalid, returning early' )
94100 return True
95101
96- self .log .info ("processing command '%s'" , ' ' .join (words ))
97-
98102 command = words [0 ]
99103 args = words [1 :]
104+ self .log .debug (f"on_loop { command = } { args = } " )
100105
101106 response = None
102107 try :
@@ -106,16 +111,15 @@ def on_loop(self):
106111 raise KeyError ()
107112
108113 command_function = self .commands .__class__ .__dict__ [command ]
109-
110114 except KeyError as e :
111115 self .log .info ("Received unknown command %s" , command )
112116 response = "error unknown command %s\n " % command
113117
114118 else :
115119 try :
116120 responseObject = command_function (self .commands , * args )
117-
118121 except Exception as e :
122+ self .log .error (f'{ command } (*{ args !r} ) returned exception: { e !r} ' )
119123 message = str (e ) or "<no message>"
120124 response = "error %s\n " % message
121125
@@ -130,12 +134,12 @@ def on_loop(self):
130134 self ._schedule_write (conn , signal )
131135 else :
132136 response = "%s\n " % str (responseObject )
133-
134137 finally :
138+ self .log .debug (f'on_loop { response = } { requestor = } ' )
135139 if response is not None and requestor in self .currentConnections :
136140 self ._schedule_write (requestor , response )
137141
138- return False
142+ return True
139143
140144 def _schedule_write (self , conn , message ):
141145 queue = self .currentConnections [conn ]
@@ -153,13 +157,13 @@ def on_write(self, conn, *args):
153157 except KeyError :
154158 return False
155159
156- if queue .empty ():
157- self .log .debug ('write_queue[%u] is empty again, '
158- 'stopping on_write scheduling' ,
159- conn .fileno ())
160+ try :
161+ message = queue .get_nowait ()
162+ self .log .debug (f'on_write { message = } ' )
163+ except Empty :
164+ self .log .debug (f'write_queue[{ conn .fileno ()} ] is empty again, stopping on_write scheduling' )
160165 return False
161166
162- message = queue .get ()
163167 self .log .info ("Responding message '%s'" , message .strip ())
164168 try :
165169 conn .send (message .encode ())
0 commit comments