2
2
import json
3
3
import ssl
4
4
from threading import Event
5
- from typing import Union , Optional
5
+ from typing import Union , Optional , Callable
6
6
7
7
from ovos_bus_client import Message as MycroftMessage , MessageBusClient as OVOSBusClient
8
8
from ovos_bus_client .session import Session
@@ -32,13 +32,14 @@ class HiveMessageWaiter:
32
32
message_type: message type to wait for
33
33
"""
34
34
35
- def __init__ (self , bus , message_type : Union [HiveMessageType , str ]):
35
+ def __init__ (self , bus : 'HiveMessageBusClient' ,
36
+ message_type : Union [HiveMessageType , str ]):
36
37
self .bus = bus
37
38
self .msg_type = message_type
38
39
self .received_msg = None
39
40
# Setup response handler
40
41
self .response_event = Event ()
41
- self .bus .on (message_type , self ._handler )
42
+ self .bus .on (self . msg_type , self ._handler )
42
43
43
44
def _handler (self , message ):
44
45
"""Receive response data."""
@@ -60,9 +61,12 @@ def wait(self, timeout=3.0):
60
61
61
62
62
63
class HivePayloadWaiter (HiveMessageWaiter ):
63
- def __init__ (self , payload_type : Union [HiveMessageType , str ],
64
+ def __init__ (self , bus : 'HiveMessageBusClient' ,
65
+ payload_type : Union [HiveMessageType , str ],
66
+ message_type : Union [HiveMessageType , str ] = HiveMessageType .BUS ,
64
67
* args , ** kwargs ):
65
- super (HivePayloadWaiter , self ).__init__ (* args , ** kwargs )
68
+ super (HivePayloadWaiter , self ).__init__ (bus = bus , message_type = message_type ,
69
+ * args , ** kwargs )
66
70
self .payload_type = payload_type
67
71
68
72
def _handler (self , message ):
@@ -355,6 +359,12 @@ def on(self, event_name, func):
355
359
LOG .debug (f"registering handler: { event_name } " )
356
360
self .emitter .on (event_name , func )
357
361
362
+ def remove (self , event_name : str , func : Callable ):
363
+ if event_name not in list (HiveMessageType ):
364
+ self .internal_bus .remove (event_name , func )
365
+ else : # hivemind message
366
+ self .emitter .remove_listener (event_name , func )
367
+
358
368
# utility
359
369
def wait_for_message (self , message_type : Union [HiveMessageType , str ], timeout = 3.0 ):
360
370
"""Wait for a message of a specific type.
@@ -404,10 +414,11 @@ def wait_for_response(self, message: Union[MycroftMessage, HiveMessage],
404
414
Returns:
405
415
The received message or None if the response timed out
406
416
"""
407
- if isinstance (message , MycroftMessage ):
408
- message = HiveMessage (msg_type = HiveMessageType .BUS , payload = message )
409
417
message_type = reply_type or message .msg_type
410
- waiter = HiveMessageWaiter (self , message_type ) # Setup response handler
418
+ if isinstance (message , MycroftMessage ):
419
+ waiter = HivePayloadWaiter (bus = self , payload_type = message_type )
420
+ else :
421
+ waiter = HiveMessageWaiter (bus = self , message_type = message_type ) # Setup response handler
411
422
# Send message and wait for it's response
412
423
self .emit (message )
413
424
return waiter .wait (timeout )
0 commit comments