Skip to content

System-Channel iftk interface subclasses #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 61 additions & 18 deletions examples/api_helpers_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
# LICENSE file in the root directory of this source tree.

import asyncio
from typing import AsyncIterator
from collections import deque
from typing import Any, AsyncIterator, Callable, Iterator

from context import iftk
from dotenv import dotenv_values
from elevenlabs.client import ElevenLabs
from elevenlabs.play import stream

from iftk.channel import DequeChannel
from iftk.helpers import deepgram, elevenlabs, groq, pyaudio
from iftk.system import System

DOTENV = dotenv_values(".env")
GROQ_API_KEY = DOTENV["GROQ_API_KEY"]
Expand All @@ -23,25 +26,65 @@
messages = [{"role": "system", "content": "Answer to the user in a few sentences."}]


class RemoteChannel(DequeChannel):
def __init__(
self,
notify_readable: Callable[[None], None] = None,
) -> None:
super().__init__(notify_readable)
self.groq_client = groq.groq.AsyncClient(api_key=GROQ_API_KEY)
self.elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY)
self.q = deque()

async def read_to_stream(self) -> AsyncIterator[bytes]:
"""Convert the deque into an AsyncIterator."""
while True:
while not self.q:
continue

while self.q:
try:
message = self.q.popleft()
if message is not None:
yield message
except IndexError:
continue

async def read(self) -> AsyncIterator[Iterator[bytes]]:
self.deepgram_stream = deepgram.deepgram_stream(
key=DEEPGRAM_API_KEY, audio_stream=self.read_to_stream()
)
async for user_message in self.deepgram_stream:
messages.append({"role": "user", "content": user_message})
llm_stream = await self.groq_client.chat.completions.create(
messages=messages, model=model_id, stream=True
)
groq_sentence_stream: AsyncIterator = groq.groq_sentence_stream(
llm_stream=llm_stream
)
async for sentence in elevenlabs.eleven_stream(
sentences=groq_sentence_stream, eleven_client=self.elevenlabs_client
):
yield sentence


class RemoteInferenceSystem(System):
async def create_async_channel(
self, notify_readable: Callable[[None], None] = None, **kwargs
) -> iftk.AsyncChannel:
super().__init__()
return RemoteChannel()


async def main():
groq_client = groq.groq.AsyncClient(api_key=GROQ_API_KEY)
elevenlabs_client = ElevenLabs(api_key=ELEVENLABS_API_KEY)
mic_stream: AsyncIterator = pyaudio.microphone(rate=RATE, frames_per_buffer=CHUNK)
deepgram_stream: AsyncIterator = deepgram.deepgram_stream(
key=DEEPGRAM_API_KEY, audio_stream=mic_stream
)
async for user_message in deepgram_stream:
messages.append({"role": "user", "content": user_message})
llm_stream = await groq_client.chat.completions.create(
messages=messages, model=model_id, stream=True
)
groq_sentence_stream: AsyncIterator = groq.groq_sentence_stream(
llm_stream=llm_stream
)
async for sentence in elevenlabs.eleven_stream(
sentences=groq_sentence_stream, eleven_client=elevenlabs_client
):
await asyncio.to_thread(stream, sentence)
system = RemoteInferenceSystem()
channel = await system.create_async_channel()
async for audio_chunk in mic_stream:
channel.write(audio_chunk)
async for output in channel.read():
if output:
await asyncio.to_thread(stream, output)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion iftk/helpers/deepgram.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import asyncio
import json
from typing import AsyncIterator
from typing import Any, AsyncIterator, Callable

import websockets

Expand Down
4 changes: 2 additions & 2 deletions iftk/helpers/elevenlabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from typing import AsyncIterator, Iterator, Optional
from typing import Any, AsyncIterator, Callable, Iterator, Optional

from elevenlabs.client import ElevenLabs

Expand All @@ -21,7 +21,7 @@ async def eleven_stream(
messages (dict, optional): The message history of the dialogue system.

Yields:
Iterator[AsyncIterator]: A bytes iterator for playing audio using 11Labs stream audio playing function.
Iterator[bytes]: A bytes iterator for playing audio using 11Labs stream audio playing function.
"""
async for sentence in sentences:
if sentence:
Expand Down
2 changes: 1 addition & 1 deletion iftk/helpers/groq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from typing import AsyncIterator
from typing import AsyncIterator, Callable

import groq

Expand Down