Skip to content

Commit ad276e1

Browse files
authored
Merge pull request #490 from gerrymeixiong/gm/thread-cls-override
Add threading.Thread Class Override
2 parents 7df7b27 + e4b2ab7 commit ad276e1

File tree

2 files changed

+40
-11
lines changed

2 files changed

+40
-11
lines changed

deepgram/clients/common/v1/abstract_sync_websocket.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import json
55
import time
66
import logging
7-
from typing import Dict, Union, Optional, cast, Any, Callable
7+
from typing import Dict, Union, Optional, cast, Any, Callable, Type
88
from datetime import datetime
99
import threading
1010
from abc import ABC, abstractmethod
@@ -38,6 +38,12 @@ class AbstractSyncWebSocketClient(ABC): # pylint: disable=too-many-instance-att
3838
3939
This class provides methods to establish a WebSocket connection generically for
4040
use in all WebSocket clients.
41+
42+
Args:
43+
config (DeepgramClientOptions): all the options for the client
44+
endpoint (str): the endpoint to connect to
45+
thread_cls (Type[threading.Thread]): optional thread class to use for creating threads,
46+
defaults to threading.Thread. Useful for custom thread management like ContextVar support.
4147
"""
4248

4349
_logger: verboselogs.VerboseLogger
@@ -52,12 +58,19 @@ class AbstractSyncWebSocketClient(ABC): # pylint: disable=too-many-instance-att
5258
_listen_thread: Union[threading.Thread, None]
5359
_delegate: Optional[Speaker] = None
5460

61+
_thread_cls: Type[threading.Thread]
62+
5563
_kwargs: Optional[Dict] = None
5664
_addons: Optional[Dict] = None
5765
_options: Optional[Dict] = None
5866
_headers: Optional[Dict] = None
5967

60-
def __init__(self, config: DeepgramClientOptions, endpoint: str = ""):
68+
def __init__(
69+
self,
70+
config: DeepgramClientOptions,
71+
endpoint: str = "",
72+
thread_cls: Type[threading.Thread] = threading.Thread,
73+
):
6174
if config is None:
6275
raise DeepgramError("Config is required")
6376
if endpoint == "":
@@ -73,6 +86,8 @@ def __init__(self, config: DeepgramClientOptions, endpoint: str = ""):
7386

7487
self._listen_thread = None
7588

89+
self._thread_cls = thread_cls
90+
7691
# exit
7792
self._exit_event = threading.Event()
7893

@@ -152,7 +167,7 @@ def start(
152167
self._delegate.set_push_callback(self._process_message)
153168
else:
154169
self._logger.notice("create _listening thread")
155-
self._listen_thread = threading.Thread(target=self._listening)
170+
self._listen_thread = self._thread_cls(target=self._listening)
156171
self._listen_thread.start()
157172

158173
# debug the threads

deepgram/clients/listen/v1/websocket/client.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import json
55
import time
66
import logging
7-
from typing import Dict, Union, Optional, cast, Any, Callable
7+
from typing import Dict, Union, Optional, cast, Any, Callable, Type
88
from datetime import datetime
99
import threading
1010

@@ -38,10 +38,12 @@ class ListenWebSocketClient(
3838
"""
3939
Client for interacting with Deepgram's live transcription services over WebSockets.
4040
41-
This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
41+
This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
4242
43-
Args:
44-
config (DeepgramClientOptions): all the options for the client.
43+
Args:
44+
config (DeepgramClientOptions): all the options for the client.
45+
thread_cls (Type[threading.Thread]): optional thread class to use for creating threads,
46+
defaults to threading.Thread. Useful for custom thread management like ContextVar support.
4547
"""
4648

4749
_logger: verboselogs.VerboseLogger
@@ -55,12 +57,18 @@ class ListenWebSocketClient(
5557
_flush_thread: Union[threading.Thread, None]
5658
_last_datagram: Optional[datetime] = None
5759

60+
_thread_cls: Type[threading.Thread]
61+
5862
_kwargs: Optional[Dict] = None
5963
_addons: Optional[Dict] = None
6064
_options: Optional[Dict] = None
6165
_headers: Optional[Dict] = None
6266

63-
def __init__(self, config: DeepgramClientOptions):
67+
def __init__(
68+
self,
69+
config: DeepgramClientOptions,
70+
thread_cls: Type[threading.Thread] = threading.Thread,
71+
):
6472
if config is None:
6573
raise DeepgramError("Config is required")
6674

@@ -78,13 +86,19 @@ def __init__(self, config: DeepgramClientOptions):
7886
self._last_datagram = None
7987
self._lock_flush = threading.Lock()
8088

89+
self._thread_cls = thread_cls
90+
8191
# init handlers
8292
self._event_handlers = {
8393
event: [] for event in LiveTranscriptionEvents.__members__.values()
8494
}
8595

8696
# call the parent constructor
87-
super().__init__(self._config, self._endpoint)
97+
super().__init__(
98+
config=self._config,
99+
endpoint=self._endpoint,
100+
thread_cls=self._thread_cls,
101+
)
88102

89103
# pylint: disable=too-many-statements,too-many-branches
90104
def start(
@@ -154,15 +168,15 @@ def start(
154168
# keepalive thread
155169
if self._config.is_keep_alive_enabled():
156170
self._logger.notice("keepalive is enabled")
157-
self._keep_alive_thread = threading.Thread(target=self._keep_alive)
171+
self._keep_alive_thread = self._thread_cls(target=self._keep_alive)
158172
self._keep_alive_thread.start()
159173
else:
160174
self._logger.notice("keepalive is disabled")
161175

162176
# flush thread
163177
if self._config.is_auto_flush_reply_enabled():
164178
self._logger.notice("autoflush is enabled")
165-
self._flush_thread = threading.Thread(target=self._flush)
179+
self._flush_thread = self._thread_cls(target=self._flush)
166180
self._flush_thread.start()
167181
else:
168182
self._logger.notice("autoflush is disabled")

0 commit comments

Comments
 (0)