Skip to content

Commit

Permalink
Chore: Clean up code (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielchua authored Oct 1, 2024
1 parent 5deb312 commit ab25593
Show file tree
Hide file tree
Showing 5 changed files with 448 additions and 228 deletions.
238 changes: 87 additions & 151 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,69 +8,48 @@
import time
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import List, Literal, Tuple, Optional
from typing import List, Tuple, Optional

# Third-party imports
import gradio as gr
import random
from loguru import logger
from pydantic import BaseModel, Field
from pypdf import PdfReader
from pydub import AudioSegment

# Local imports
from prompts import SYSTEM_PROMPT
from utils import generate_script, generate_podcast_audio, parse_url


class DialogueItem(BaseModel):
"""A single dialogue item."""

speaker: Literal["Host (Jane)", "Guest"]
text: str


class ShortDialogue(BaseModel):
"""The dialogue between the host and guest."""

scratchpad: str
name_of_guest: str
dialogue: List[DialogueItem] = Field(..., description="A list of dialogue items, typically between 5 to 9 items")


class MediumDialogue(BaseModel):
"""The dialogue between the host and guest."""

scratchpad: str
name_of_guest: str
dialogue: List[DialogueItem] = Field(..., description="A list of dialogue items, typically between 8 to 13 items")


LANGUAGE_MAPPING = {
"English": "en",
"Chinese": "zh",
"French": "fr",
"German": "de",
"Hindi": "hi",
"Italian": "it",
"Japanese": "ja",
"Korean": "ko",
"Polish": "pl",
"Portuguese": "pt",
"Russian": "ru",
"Spanish": "es",
"Turkish": "tr"
}

MELO_TTS_LANGUAGE_MAPPING = {
"en": "EN",
"es": "ES",
"fr": "FR",
"zh": "ZJ",
"ja": "JP",
"ko": "KR",
}


from constants import (
APP_TITLE,
CHARACTER_LIMIT,
ERROR_MESSAGE_NOT_PDF,
ERROR_MESSAGE_NO_INPUT,
ERROR_MESSAGE_NOT_SUPPORTED_IN_MELO_TTS,
ERROR_MESSAGE_READING_PDF,
ERROR_MESSAGE_TOO_LONG,
GRADIO_CACHE_DIR,
GRADIO_CLEAR_CACHE_OLDER_THAN,
MELO_TTS_LANGUAGE_MAPPING,
NOT_SUPPORTED_IN_MELO_TTS,
SUNO_LANGUAGE_MAPPING,
UI_ALLOW_FLAGGING,
UI_API_NAME,
UI_CACHE_EXAMPLES,
UI_CONCURRENCY_LIMIT,
UI_DESCRIPTION,
UI_EXAMPLES,
UI_INPUTS,
UI_OUTPUTS,
UI_SHOW_API,
)
from prompts import (
LANGUAGE_MODIFIER,
LENGTH_MODIFIERS,
QUESTION_MODIFIER,
SYSTEM_PROMPT,
TONE_MODIFIER,
)
from schema import ShortDialogue, MediumDialogue
from utils import generate_podcast_audio, generate_script, parse_url


def generate_podcast(
Expand All @@ -84,32 +63,30 @@ def generate_podcast(
) -> Tuple[str, str]:
"""Generate the audio and transcript from the PDFs and/or URL."""



text = ""

# Check if the selected language is supported by MeloTTS when not using advanced audio
if not use_advanced_audio and language in ['German', 'Hindi', 'Italian', 'Polish', 'Portuguese', 'Russian', 'Turkish']:
raise gr.Error(f"The selected language '{language}' is not supported without advanced audio generation. Please enable advanced audio generation or choose a supported language.")
# Choose random number from 0 to 9
random_voice_number = random.randint(0, 9) # this is for suno model

if not use_advanced_audio and language in NOT_SUPPORTED_IN_MELO_TTS:
raise gr.Error(ERROR_MESSAGE_NOT_SUPPORTED_IN_MELO_TTS)

# Check if at least one input is provided
if not files and not url:
raise gr.Error("Please provide at least one PDF file or a URL.")
raise gr.Error(ERROR_MESSAGE_NO_INPUT)

# Process PDFs if any
if files:
for file in files:
if not file.lower().endswith(".pdf"):
raise gr.Error(
f"File {file} is not a PDF. Please upload only PDF files."
)
raise gr.Error(ERROR_MESSAGE_NOT_PDF)

try:
with Path(file).open("rb") as f:
reader = PdfReader(f)
text += "\n\n".join([page.extract_text() for page in reader.pages])
except Exception as e:
raise gr.Error(f"Error reading the PDF file {file}: {str(e)}")
raise gr.Error(f"{ERROR_MESSAGE_READING_PDF}: {str(e)}")

# Process URL if provided
if url:
Expand All @@ -120,34 +97,27 @@ def generate_podcast(
raise gr.Error(str(e))

# Check total character count
if len(text) > 100000:
raise gr.Error(
"The total content is too long. Please ensure the combined text from PDFs and URL is fewer than ~100,000 characters."
)

if len(text) > CHARACTER_LIMIT:
raise gr.Error(ERROR_MESSAGE_TOO_LONG)

# Modify the system prompt based on the user input
modified_system_prompt = SYSTEM_PROMPT

if question:
modified_system_prompt += f"\n\PLEASE ANSWER THE FOLLOWING QN: {question}"
modified_system_prompt += f"\n\n{QUESTION_MODIFIER} {question}"
if tone:
modified_system_prompt += f"\n\nTONE: The tone of the podcast should be {tone}."
modified_system_prompt += f"\n\n{TONE_MODIFIER} {tone}."
if length:
length_instructions = {
"Short (1-2 min)": "Keep the podcast brief, around 1-2 minutes long.",
"Medium (3-5 min)": "Aim for a moderate length, about 3-5 minutes.",
}
modified_system_prompt += f"\n\nLENGTH: {length_instructions[length]}"
modified_system_prompt += f"\n\n{LENGTH_MODIFIERS[length]}"
if language:
modified_system_prompt += (
f"\n\nOUTPUT LANGUAGE <IMPORTANT>: The the podcast should be {language}."
)
modified_system_prompt += f"\n\n{LANGUAGE_MODIFIER} {language}."

# Call the LLM
if length == "Short (1-2 min)":
llm_output = generate_script(modified_system_prompt, text, ShortDialogue)
else:
llm_output = generate_script(modified_system_prompt, text, MediumDialogue)

logger.info(f"Generated dialogue: {llm_output}")

# Process the dialogue
Expand All @@ -164,14 +134,14 @@ def generate_podcast(
transcript += speaker + "\n\n"
total_characters += len(line.text)

language_for_tts = LANGUAGE_MAPPING[language]
language_for_tts = SUNO_LANGUAGE_MAPPING[language]

if not use_advanced_audio:
language_for_tts = MELO_TTS_LANGUAGE_MAPPING[language_for_tts]

# Get audio file path
audio_file_path = generate_podcast_audio(
line.text, line.speaker, language_for_tts, use_advanced_audio
line.text, line.speaker, language_for_tts, use_advanced_audio, random_voice_number
)
# Read the audio file into an AudioSegment
audio_segment = AudioSegment.from_file(audio_file_path)
Expand All @@ -181,7 +151,7 @@ def generate_podcast(
combined_audio = sum(audio_segments)

# Export the combined audio to a temporary file
temporary_directory = "./gradio_cached_examples/tmp/"
temporary_directory = GRADIO_CACHE_DIR
os.makedirs(temporary_directory, exist_ok=True)

temporary_file = NamedTemporaryFile(
Expand All @@ -193,7 +163,10 @@ def generate_podcast(

# Delete any files in the temp directory that end with .mp3 and are over a day old
for file in glob.glob(f"{temporary_directory}*.mp3"):
if os.path.isfile(file) and time.time() - os.path.getmtime(file) > 24 * 60 * 60:
if (
os.path.isfile(file)
and time.time() - os.path.getmtime(file) > GRADIO_CLEAR_CACHE_OLDER_THAN
):
os.remove(file)

logger.info(f"Generated {total_characters} characters of audio")
Expand All @@ -202,90 +175,53 @@ def generate_podcast(


demo = gr.Interface(
title="Open NotebookLM",
description="""
<table style="border-collapse: collapse; border: none; padding: 20px;">
<tr style="border: none;">
<td style="border: none; vertical-align: top; padding-right: 30px; padding-left: 30px;">
<img src="https://raw.githubusercontent.com/gabrielchua/daily-ai-papers/main/_includes/icon.png" alt="Open NotebookLM" width="120" style="margin-bottom: 10px;">
</td>
<td style="border: none; vertical-align: top; padding: 10px;">
<p style="margin-bottom: 15px;"><strong>Convert</strong> your PDFs into podcasts with open-source AI models (Llama 3.1 405B and MeloTTS).</p>
<p style="margin-top: 15px;">Note: Only the text content of the PDFs will be processed. Images and tables are not included. The total content should be no more than 100,000 characters due to the context length of Llama 3.1 405B.</p>
</td>
</tr>
</table>
""",
title=APP_TITLE,
description=UI_DESCRIPTION,
fn=generate_podcast,
inputs=[
gr.File(
label="1. 📄 Upload your PDF(s)", file_types=[".pdf"], file_count="multiple"
label=UI_INPUTS["file_upload"]["label"], # Step 1: File upload
file_types=UI_INPUTS["file_upload"]["file_types"],
file_count=UI_INPUTS["file_upload"]["file_count"],
),
gr.Textbox(
label="2. 🔗 Paste a URL (optional)",
placeholder="Enter a URL to include its content",
label=UI_INPUTS["url"]["label"], # Step 2: URL
placeholder=UI_INPUTS["url"]["placeholder"],
),
gr.Textbox(label="3. 🤔 Do you have a specific question or topic in mind?"),
gr.Textbox(label=UI_INPUTS["question"]["label"]), # Step 3: Question
gr.Dropdown(
choices=["Fun", "Formal"],
label="4. 🎭 Choose the tone",
value="Fun"
label=UI_INPUTS["tone"]["label"], # Step 4: Tone
choices=UI_INPUTS["tone"]["choices"],
value=UI_INPUTS["tone"]["value"],
),
gr.Dropdown(
choices=["Short (1-2 min)", "Medium (3-5 min)"],
label="5. ⏱️ Choose the length",
value="Medium (3-5 min)"
label=UI_INPUTS["length"]["label"], # Step 5: Length
choices=UI_INPUTS["length"]["choices"],
value=UI_INPUTS["length"]["value"],
),
gr.Dropdown(
choices=list(LANGUAGE_MAPPING.keys()),
value="English",
label="6. 🌐 Choose the language"
choices=UI_INPUTS["language"]["choices"], # Step 6: Language
value=UI_INPUTS["language"]["value"],
label=UI_INPUTS["language"]["label"],
),
gr.Checkbox(
label="7. 🔄 Use advanced audio generation? (Experimental)",
value=False
)
label=UI_INPUTS["advanced_audio"]["label"],
value=UI_INPUTS["advanced_audio"]["value"],
),
],
outputs=[
gr.Audio(label="Podcast", format="mp3"),
gr.Markdown(label="Transcript"),
gr.Audio(
label=UI_OUTPUTS["audio"]["label"], format=UI_OUTPUTS["audio"]["format"]
),
gr.Markdown(label=UI_OUTPUTS["transcript"]["label"]),
],
allow_flagging="never",
api_name="generate_podcast",
allow_flagging=UI_ALLOW_FLAGGING,
api_name=UI_API_NAME,
theme=gr.themes.Soft(),
concurrency_limit=3,
examples=[
[
[str(Path("examples/1310.4546v1.pdf"))],
"",
"Explain this paper to me like I'm 5 years old",
"Fun",
"Short (1-2 min)",
"English",
True
],
[
[],
"https://en.wikipedia.org/wiki/Hugging_Face",
"How did Hugging Face become so successful?",
"Fun",
"Short (1-2 min)",
"English",
False
],
[
[],
"https://simple.wikipedia.org/wiki/Taylor_Swift",
"Why is Taylor Swift so popular?",
"Fun",
"Short (1-2 min)",
"English",
False
],
],
cache_examples=True,
concurrency_limit=UI_CONCURRENCY_LIMIT,
# examples=UI_EXAMPLES,
# cache_examples=UI_CACHE_EXAMPLES,
)

if __name__ == "__main__":
demo.launch(show_api=True)
demo.launch(show_api=UI_SHOW_API)
Loading

0 comments on commit ab25593

Please sign in to comment.