From 4a759f9f86c6750a8b1d6fe5cd13ef14c9c11b81 Mon Sep 17 00:00:00 2001 From: Sebastian Ponce Date: Thu, 5 Sep 2024 23:56:54 -0600 Subject: [PATCH 1/5] Add system and channel subclass interfaces for Deepgram and ElevenLabs --- iftk/helpers/deepgram.py | 35 +++++++++++++++++++++++++++- iftk/helpers/elevenlabs.py | 47 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/iftk/helpers/deepgram.py b/iftk/helpers/deepgram.py index 6260297..193255c 100644 --- a/iftk/helpers/deepgram.py +++ b/iftk/helpers/deepgram.py @@ -5,10 +5,13 @@ import asyncio import json -from typing import AsyncIterator +from typing import Any, AsyncIterator, Callable import websockets +from iftk.channel import AsyncChannel +from iftk.system import System + WSS_URL = "wss://api.deepgram.com/v1/listen?endpointing=500&encoding=linear16&sample_rate=16000&channels=1&interim_results=false" @@ -65,3 +68,33 @@ async def receiver(ws): tasks = [sender_task, keep_alive_task] for task in tasks: task.cancel() + + +class DeepgramChannel(AsyncChannel): + def __init__( + self, + deepgram_key: str, + input_stream: AsyncIterator[bytes], + notify_readable: Callable[[None], None] = None, + ) -> None: + super().__init__(notify_readable) + self.deepgram_stream = deepgram_stream(deepgram_key, input_stream) + + async def read(self) -> AsyncIterator[str]: + yield await anext(self.deepgram_stream) + + +class DeepgramSystem(System): + async def create_async_channel( + self, + deepgram_key: str, + input_stream: AsyncIterator[bytes], + notify_readable: Callable[[None], None], + **kwargs, + ) -> AsyncChannel: + deepgram_channel = DeepgramChannel( + deepgram_key=deepgram_key, + input_stream=input_stream, + notify_readable=notify_readable, + ) + return deepgram_channel diff --git a/iftk/helpers/elevenlabs.py b/iftk/helpers/elevenlabs.py index 2550307..7f1f672 100644 --- a/iftk/helpers/elevenlabs.py +++ b/iftk/helpers/elevenlabs.py @@ -3,10 +3,13 @@ # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. -from typing import AsyncIterator, Iterator, Optional +from typing import Any, AsyncIterator, Callable, Iterator, Optional from elevenlabs.client import ElevenLabs +from iftk.channel import AsyncChannel +from iftk.system import System + async def eleven_stream( sentences: AsyncIterator[str], @@ -21,7 +24,7 @@ async def eleven_stream( messages (dict, optional): The message history of the dialogue system. Yields: - Iterator[AsyncIterator]: A bytes iterator for playing audio using 11Labs stream audio playing function. + Iterator[bytes]: A bytes iterator for playing audio using 11Labs stream audio playing function. """ async for sentence in sentences: if sentence: @@ -29,3 +32,43 @@ async def eleven_stream( text=sentence, stream=True, voice=voice ) yield audio_stream + + +class ElevenlabsChannel(AsyncChannel): + def __init__( + self, + sentence_stream: AsyncChannel, + eleven_client: ElevenLabs, + voice: Optional[str] = "Jessica", + notify_readable: Callable[[None], None] = None, + ) -> None: + super().__init__(notify_readable) + self.sentence_stream = sentence_stream + self.eleven_stream = eleven_stream( + sentences=self.write(), eleven_client=eleven_client, voice=voice + ) + + async def read(self) -> AsyncIterator[bytes]: + yield await anext(self.eleven_stream) + + async def write(self) -> AsyncIterator[str]: + async for sentence in self.sentence_stream: + yield sentence + + +class ElevenlabsSystem(System): + async def create_async_channel( + self, + sentence_stream: AsyncChannel, + eleven_client: ElevenLabs, + notify_readable: Callable[[None], None] = None, + voice: Optional[str] = "Jessica", + **kwargs, + ) -> AsyncChannel: + eleven_channel = ElevenlabsChannel( + sentence_stream=sentence_stream, + eleven_client=eleven_client, + voice=voice, + notify_readable=notify_readable, + ) + return eleven_channel From 97601915fec76c16d03f9dd8fb65b7565a871e8b Mon Sep 17 00:00:00 2001 From: Sebastian Ponce Date: Fri, 6 Sep 2024 00:08:17 -0600 Subject: [PATCH 2/5] Groq system-channel subclass interface --- iftk/helpers/groq.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/iftk/helpers/groq.py b/iftk/helpers/groq.py index 74813ce..aad2654 100644 --- a/iftk/helpers/groq.py +++ b/iftk/helpers/groq.py @@ -3,10 +3,13 @@ # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. -from typing import AsyncIterator +from typing import AsyncIterator, Callable import groq +from iftk.channel import AsyncChannel +from iftk.system import System + async def groq_sentence_stream(llm_stream: groq._client.AsyncStream) -> AsyncIterator: """An AsyncIterator wrapper for the groq generation stream. @@ -29,3 +32,28 @@ async def groq_sentence_stream(llm_stream: groq._client.AsyncStream) -> AsyncIte if sentence: yield sentence sentence = "" + + +class GroqChannel(AsyncChannel): + def __init__( + self, + sentence_stream: groq._client.AsyncStream, + notify_readable: Callable[[None], None] = None, + ) -> None: + super().__init__(notify_readable) + self.sentence_stream = sentence_stream + self.groq_stream = groq_sentence_stream(sentence_stream=sentence_stream) + + async def read(self) -> AsyncIterator[bytes]: + yield await anext(self.groq_stream) + + +class ElevenlabsSystem(System): + async def create_async_channel( + self, + sentence_stream: groq._client.AsyncStream, + notify_readable: Callable[[None], None] = None, + **kwargs, + ) -> AsyncChannel: + groq_channel = GroqChannel(sentence_stream=sentence_stream) + return groq_channel From bd1746e653ffc65d2c793c24e63cf5c78da7538c Mon Sep 17 00:00:00 2001 From: Sebastian Ponce Date: Fri, 6 Sep 2024 00:15:59 -0600 Subject: [PATCH 3/5] Fix typing issue --- iftk/helpers/groq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iftk/helpers/groq.py b/iftk/helpers/groq.py index aad2654..e851116 100644 --- a/iftk/helpers/groq.py +++ b/iftk/helpers/groq.py @@ -44,7 +44,7 @@ def __init__( self.sentence_stream = sentence_stream self.groq_stream = groq_sentence_stream(sentence_stream=sentence_stream) - async def read(self) -> AsyncIterator[bytes]: + async def read(self) -> AsyncIterator[str]: yield await anext(self.groq_stream) From c35752e7c222b169403789a9f6efb809009cdb4b Mon Sep 17 00:00:00 2001 From: Sebastian Ponce Date: Fri, 6 Sep 2024 09:45:34 -0600 Subject: [PATCH 4/5] Fix naming on GroqSystem --- iftk/helpers/groq.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iftk/helpers/groq.py b/iftk/helpers/groq.py index e851116..acf9046 100644 --- a/iftk/helpers/groq.py +++ b/iftk/helpers/groq.py @@ -48,7 +48,7 @@ async def read(self) -> AsyncIterator[str]: yield await anext(self.groq_stream) -class ElevenlabsSystem(System): +class GroqSystem(System): async def create_async_channel( self, sentence_stream: groq._client.AsyncStream, From f0c2f5689b45ee66abdbdebb8251975e119c2800 Mon Sep 17 00:00:00 2001 From: Sebastian Ponce Date: Tue, 17 Sep 2024 01:41:45 -0600 Subject: [PATCH 5/5] System/Channel remote inference example --- examples/api_helpers_stream.py | 79 ++++++++++++++++++++++++++-------- iftk/helpers/deepgram.py | 33 -------------- iftk/helpers/elevenlabs.py | 43 ------------------ iftk/helpers/groq.py | 28 ------------ 4 files changed, 61 insertions(+), 122 deletions(-) diff --git a/examples/api_helpers_stream.py b/examples/api_helpers_stream.py index ce1b779..6b55a79 100644 --- a/examples/api_helpers_stream.py +++ b/examples/api_helpers_stream.py @@ -4,14 +4,17 @@ # LICENSE file in the root directory of this source tree. import asyncio -from typing import AsyncIterator +from collections import deque +from typing import Any, AsyncIterator, Callable, Iterator from context import iftk from dotenv import dotenv_values from elevenlabs.client import ElevenLabs from elevenlabs.play import stream +from iftk.channel import DequeChannel from iftk.helpers import deepgram, elevenlabs, groq, pyaudio +from iftk.system import System DOTENV = dotenv_values(".env") GROQ_API_KEY = DOTENV["GROQ_API_KEY"] @@ -23,25 +26,65 @@ messages = [{"role": "system", "content": "Answer to the user in a few sentences."}] +class RemoteChannel(DequeChannel): + def __init__( + self, + notify_readable: Callable[[None], None] = None, + ) -> None: + super().__init__(notify_readable) + self.groq_client = groq.groq.AsyncClient(api_key=GROQ_API_KEY) + self.elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY) + self.q = deque() + + async def read_to_stream(self) -> AsyncIterator[bytes]: + """Convert the deque into an AsyncIterator.""" + while True: + while not self.q: + continue + + while self.q: + try: + message = self.q.popleft() + if message is not None: + yield message + except IndexError: + continue + + async def read(self) -> AsyncIterator[Iterator[bytes]]: + self.deepgram_stream = deepgram.deepgram_stream( + key=DEEPGRAM_API_KEY, audio_stream=self.read_to_stream() + ) + async for user_message in self.deepgram_stream: + messages.append({"role": "user", "content": user_message}) + llm_stream = await self.groq_client.chat.completions.create( + messages=messages, model=model_id, stream=True + ) + groq_sentence_stream: AsyncIterator = groq.groq_sentence_stream( + llm_stream=llm_stream + ) + async for sentence in elevenlabs.eleven_stream( + sentences=groq_sentence_stream, eleven_client=self.elevenlabs_client + ): + yield sentence + + +class RemoteInferenceSystem(System): + async def create_async_channel( + self, notify_readable: Callable[[None], None] = None, **kwargs + ) -> iftk.AsyncChannel: + super().__init__() + return RemoteChannel() + + async def main(): - groq_client = groq.groq.AsyncClient(api_key=GROQ_API_KEY) - elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY) mic_stream: AsyncIterator = pyaudio.microphone(rate=RATE, frames_per_buffer=CHUNK) - deepgram_stream: AsyncIterator = deepgram.deepgram_stream( - key=DEEPGRAM_API_KEY, audio_stream=mic_stream - ) - async for user_message in deepgram_stream: - messages.append({"role": "user", "content": user_message}) - llm_stream = await groq_client.chat.completions.create( - messages=messages, model=model_id, stream=True - ) - groq_sentence_stream: AsyncIterator = groq.groq_sentence_stream( - llm_stream=llm_stream - ) - async for sentence in elevenlabs.eleven_stream( - sentences=groq_sentence_stream, eleven_client=elevenlabs_client - ): - await asyncio.to_thread(stream, sentence) + system = RemoteInferenceSystem() + channel = await system.create_async_channel() + async for audio_chunk in mic_stream: + channel.write(audio_chunk) + async for output in channel.read(): + if output: + await asyncio.to_thread(stream, output) if __name__ == "__main__": diff --git a/iftk/helpers/deepgram.py b/iftk/helpers/deepgram.py index 193255c..8394865 100644 --- a/iftk/helpers/deepgram.py +++ b/iftk/helpers/deepgram.py @@ -9,9 +9,6 @@ import websockets -from iftk.channel import AsyncChannel -from iftk.system import System - WSS_URL = "wss://api.deepgram.com/v1/listen?endpointing=500&encoding=linear16&sample_rate=16000&channels=1&interim_results=false" @@ -68,33 +65,3 @@ async def receiver(ws): tasks = [sender_task, keep_alive_task] for task in tasks: task.cancel() - - -class DeepgramChannel(AsyncChannel): - def __init__( - self, - deepgram_key: str, - input_stream: AsyncIterator[bytes], - notify_readable: Callable[[None], None] = None, - ) -> None: - super().__init__(notify_readable) - self.deepgram_stream = deepgram_stream(deepgram_key, input_stream) - - async def read(self) -> AsyncIterator[str]: - yield await anext(self.deepgram_stream) - - -class DeepgramSystem(System): - async def create_async_channel( - self, - deepgram_key: str, - input_stream: AsyncIterator[bytes], - notify_readable: Callable[[None], None], - **kwargs, - ) -> AsyncChannel: - deepgram_channel = DeepgramChannel( - deepgram_key=deepgram_key, - input_stream=input_stream, - notify_readable=notify_readable, - ) - return deepgram_channel diff --git a/iftk/helpers/elevenlabs.py b/iftk/helpers/elevenlabs.py index 7f1f672..420c302 100644 --- a/iftk/helpers/elevenlabs.py +++ b/iftk/helpers/elevenlabs.py @@ -7,9 +7,6 @@ from elevenlabs.client import ElevenLabs -from iftk.channel import AsyncChannel -from iftk.system import System - async def eleven_stream( sentences: AsyncIterator[str], @@ -32,43 +29,3 @@ async def eleven_stream( text=sentence, stream=True, voice=voice ) yield audio_stream - - -class ElevenlabsChannel(AsyncChannel): - def __init__( - self, - sentence_stream: AsyncChannel, - eleven_client: ElevenLabs, - voice: Optional[str] = "Jessica", - notify_readable: Callable[[None], None] = None, - ) -> None: - super().__init__(notify_readable) - self.sentence_stream = sentence_stream - self.eleven_stream = eleven_stream( - sentences=self.write(), eleven_client=eleven_client, voice=voice - ) - - async def read(self) -> AsyncIterator[bytes]: - yield await anext(self.eleven_stream) - - async def write(self) -> AsyncIterator[str]: - async for sentence in self.sentence_stream: - yield sentence - - -class ElevenlabsSystem(System): - async def create_async_channel( - self, - sentence_stream: AsyncChannel, - eleven_client: ElevenLabs, - notify_readable: Callable[[None], None] = None, - voice: Optional[str] = "Jessica", - **kwargs, - ) -> AsyncChannel: - eleven_channel = ElevenlabsChannel( - sentence_stream=sentence_stream, - eleven_client=eleven_client, - voice=voice, - notify_readable=notify_readable, - ) - return eleven_channel diff --git a/iftk/helpers/groq.py b/iftk/helpers/groq.py index acf9046..a76f7ca 100644 --- a/iftk/helpers/groq.py +++ b/iftk/helpers/groq.py @@ -7,9 +7,6 @@ import groq -from iftk.channel import AsyncChannel -from iftk.system import System - async def groq_sentence_stream(llm_stream: groq._client.AsyncStream) -> AsyncIterator: """An AsyncIterator wrapper for the groq generation stream. @@ -32,28 +29,3 @@ async def groq_sentence_stream(llm_stream: groq._client.AsyncStream) -> AsyncIte if sentence: yield sentence sentence = "" - - -class GroqChannel(AsyncChannel): - def __init__( - self, - sentence_stream: groq._client.AsyncStream, - notify_readable: Callable[[None], None] = None, - ) -> None: - super().__init__(notify_readable) - self.sentence_stream = sentence_stream - self.groq_stream = groq_sentence_stream(sentence_stream=sentence_stream) - - async def read(self) -> AsyncIterator[str]: - yield await anext(self.groq_stream) - - -class GroqSystem(System): - async def create_async_channel( - self, - sentence_stream: groq._client.AsyncStream, - notify_readable: Callable[[None], None] = None, - **kwargs, - ) -> AsyncChannel: - groq_channel = GroqChannel(sentence_stream=sentence_stream) - return groq_channel