diff --git a/examples/api_helpers_stream.py b/examples/api_helpers_stream.py index ce1b779..6b55a79 100644 --- a/examples/api_helpers_stream.py +++ b/examples/api_helpers_stream.py @@ -4,14 +4,17 @@ # LICENSE file in the root directory of this source tree. import asyncio -from typing import AsyncIterator +from collections import deque +from typing import Any, AsyncIterator, Callable, Iterator from context import iftk from dotenv import dotenv_values from elevenlabs.client import ElevenLabs from elevenlabs.play import stream +from iftk.channel import DequeChannel from iftk.helpers import deepgram, elevenlabs, groq, pyaudio +from iftk.system import System DOTENV = dotenv_values(".env") GROQ_API_KEY = DOTENV["GROQ_API_KEY"] @@ -23,25 +26,65 @@ messages = [{"role": "system", "content": "Answer to the user in a few sentences."}] +class RemoteChannel(DequeChannel): + def __init__( + self, + notify_readable: Callable[[None], None] = None, + ) -> None: + super().__init__(notify_readable) + self.groq_client = groq.groq.AsyncClient(api_key=GROQ_API_KEY) + self.elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY) + self.q = deque() + + async def read_to_stream(self) -> AsyncIterator[bytes]: + """Convert the deque into an AsyncIterator.""" + while True: + while not self.q: + continue + + while self.q: + try: + message = self.q.popleft() + if message is not None: + yield message + except IndexError: + continue + + async def read(self) -> AsyncIterator[Iterator[bytes]]: + self.deepgram_stream = deepgram.deepgram_stream( + key=DEEPGRAM_API_KEY, audio_stream=self.read_to_stream() + ) + async for user_message in self.deepgram_stream: + messages.append({"role": "user", "content": user_message}) + llm_stream = await self.groq_client.chat.completions.create( + messages=messages, model=model_id, stream=True + ) + groq_sentence_stream: AsyncIterator = groq.groq_sentence_stream( + llm_stream=llm_stream + ) + async for sentence in elevenlabs.eleven_stream( + sentences=groq_sentence_stream, eleven_client=self.elevenlabs_client + ): + yield sentence + + +class RemoteInferenceSystem(System): + async def create_async_channel( + self, notify_readable: Callable[[None], None] = None, **kwargs + ) -> iftk.AsyncChannel: + super().__init__() + return RemoteChannel() + + async def main(): - groq_client = groq.groq.AsyncClient(api_key=GROQ_API_KEY) - elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY) mic_stream: AsyncIterator = pyaudio.microphone(rate=RATE, frames_per_buffer=CHUNK) - deepgram_stream: AsyncIterator = deepgram.deepgram_stream( - key=DEEPGRAM_API_KEY, audio_stream=mic_stream - ) - async for user_message in deepgram_stream: - messages.append({"role": "user", "content": user_message}) - llm_stream = await groq_client.chat.completions.create( - messages=messages, model=model_id, stream=True - ) - groq_sentence_stream: AsyncIterator = groq.groq_sentence_stream( - llm_stream=llm_stream - ) - async for sentence in elevenlabs.eleven_stream( - sentences=groq_sentence_stream, eleven_client=elevenlabs_client - ): - await asyncio.to_thread(stream, sentence) + system = RemoteInferenceSystem() + channel = await system.create_async_channel() + async for audio_chunk in mic_stream: + channel.write(audio_chunk) + async for output in channel.read(): + if output: + await asyncio.to_thread(stream, output) if __name__ == "__main__": diff --git a/iftk/helpers/deepgram.py b/iftk/helpers/deepgram.py index 6260297..8394865 100644 --- a/iftk/helpers/deepgram.py +++ b/iftk/helpers/deepgram.py @@ -5,7 +5,7 @@ import asyncio import json -from typing import AsyncIterator +from typing import Any, AsyncIterator, Callable import websockets diff --git a/iftk/helpers/elevenlabs.py b/iftk/helpers/elevenlabs.py index 2550307..420c302 100644 --- a/iftk/helpers/elevenlabs.py +++ b/iftk/helpers/elevenlabs.py @@ -3,7 +3,7 @@ # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. -from typing import AsyncIterator, Iterator, Optional +from typing import Any, AsyncIterator, Callable, Iterator, Optional from elevenlabs.client import ElevenLabs @@ -21,7 +21,7 @@ async def eleven_stream( messages (dict, optional): The message history of the dialogue system. Yields: - Iterator[AsyncIterator]: A bytes iterator for playing audio using 11Labs stream audio playing function. + Iterator[bytes]: A bytes iterator for playing audio using 11Labs stream audio playing function. """ async for sentence in sentences: if sentence: diff --git a/iftk/helpers/groq.py b/iftk/helpers/groq.py index 74813ce..a76f7ca 100644 --- a/iftk/helpers/groq.py +++ b/iftk/helpers/groq.py @@ -3,7 +3,7 @@ # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. -from typing import AsyncIterator +from typing import AsyncIterator, Callable import groq