Skip to content

Commit

Permalink
fix(PDFService): fix support for target/context PDFs (#44)
Browse files Browse the repository at this point in the history
* clean up PDF handling/saving and add type to PDFMetadata

* update prompts to use target vs context

* update test.py

* revert bad changes

* small typo

* types being overwritten

* ruff

* refactor

* update test

* ruff

* fix test

---------

Co-authored-by: Andrew Wang <[email protected]>
Co-authored-by: ishandhanani <[email protected]>
  • Loading branch information
3 people authored Dec 2, 2024
1 parent 39a0487 commit 9911f58
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 112 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pr-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ jobs:
pip install -r tests/requirements-test.txt # Make sure this file exists with your test dependencies
- name: Run monologue E2E tests
run: python tests/test.py --monologue bofa-context.pdf citi-context.pdf
run: python tests/test.py --monologue --target bofa-context.pdf --context citi-context.pdf

- name: Run podcast E2E tests
run: python tests/test.py bofa-context.pdf citi-context.pdf
run: python tests/test.py --target bofa-context.pdf --context citi-context.pdf

60 changes: 42 additions & 18 deletions services/APIService/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import logging
import time
import asyncio
from typing import Dict, List, Union
from typing import Dict, List, Union, Tuple

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -163,7 +163,7 @@ async def websocket_endpoint(websocket: WebSocket, job_id: str):

def process_pdf_task(
job_id: str,
files_content: List[bytes],
files_and_types: List[Tuple[bytes, str]],
transcription_params: TranscriptionParams,
):
with telemetry.tracer.start_as_current_span("api.process_pdf_task") as span:
Expand All @@ -173,7 +173,7 @@ def process_pdf_task(
pubsub.subscribe("status_updates:all")

# Store all original PDFs
for idx, content in enumerate(files_content):
for idx, (content, _) in enumerate(files_and_types):
storage_manager.store_file(
transcription_params.userId,
job_id,
Expand All @@ -183,22 +183,27 @@ def process_pdf_task(
transcription_params,
)
logger.info(
f"Stored {len(files_content)} original PDFs for {job_id} in storage"
f"Stored {len(files_and_types)} original PDFs for {job_id} in storage"
)

# Send all PDFs to PDF Service
files = [
("files", (f"file_{i}.pdf", content, "application/pdf"))
for i, content in enumerate(files_content)
]
files = []
types = []
for i, (content, type) in enumerate(files_and_types):
files.append(("files", (f"file_{i}.pdf", content, "application/pdf")))
types.append(type)

logger.info(
f"Sending {len(files)} PDFs to PDF Service for {job_id} with VDB task: {transcription_params.vdb_task}"
)
requests.post(
f"{PDF_SERVICE_URL}/convert",
files=files,
data={"job_id": job_id, "vdb_task": transcription_params.vdb_task},
data={
"types": types,
"job_id": job_id,
"vdb_task": transcription_params.vdb_task,
},
)

# Monitor services
Expand Down Expand Up @@ -296,18 +301,34 @@ def process_pdf_task(
@app.post("/process_pdf", status_code=202)
async def process_pdf(
background_tasks: BackgroundTasks,
files: Union[UploadFile, List[UploadFile]] = File(...),
target_files: Union[UploadFile, List[UploadFile]] = File(...),
context_files: Union[UploadFile, List[UploadFile]] = File([]),
transcription_params: str = Form(...),
):
with telemetry.tracer.start_as_current_span("api.process_pdf") as span:
# Convert single file to list for consistent handling
files_list = [files] if isinstance(files, UploadFile) else files
target_files_list = (
[target_files] if isinstance(target_files, UploadFile) else target_files
)
context_files_list = (
[context_files] if isinstance(context_files, UploadFile) else context_files
)

span.set_attribute("request", transcription_params)
span.set_attribute("num_files", len(files_list))
span.set_attribute(
"num_files", len(target_files_list) + len(context_files_list)
)

# Validate all files are PDFs
for file in files_list:
for file in target_files_list:
if file.content_type != "application/pdf":
span.set_status(
status=StatusCode.ERROR, description="invalid file type"
)
raise HTTPException(
status_code=400, detail="Only PDF files are allowed"
)
for file in context_files_list:
if file.content_type != "application/pdf":
span.set_status(
status=StatusCode.ERROR, description="invalid file type"
Expand All @@ -328,14 +349,17 @@ async def process_pdf(
job_id = str(uuid.uuid4())
span.set_attribute("job_id", job_id)

# Read all files
files_content = []
for file in files_list:
# Read target and context files
files_and_types = []
for file in target_files_list:
content = await file.read()
files_and_types.append((content, "target"))
for file in context_files_list:
content = await file.read()
files_content.append(content)
files_and_types.append((content, "context"))

# Start processing
background_tasks.add_task(process_pdf_task, job_id, files_content, params)
background_tasks.add_task(process_pdf_task, job_id, files_and_types, params)
span.set_status(status=StatusCode.OK)

return {"job_id": job_id}
Expand Down
10 changes: 5 additions & 5 deletions services/AgentService/monologue_prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
Requirements:
1. Content Strategy
- Prioritize topics according to focus instructions
- Focus on the content in Target Documents, and use Context Documents as support and context
- Identify key financial metrics and trends
- Analyze potential stakeholder concerns
- Draw connections between documents and focus areas
Expand All @@ -78,7 +78,7 @@
- Express percentages in spoken form
- Write out mathematical operations
Output a structured outline that synthesizes insights across all documents. This should be a concise, actionable summary."""
Output a structured outline that synthesizes insights across all documents, emphasizing Target Documents while using Context Documents for support."""

MONOLOGUE_TRANSCRIPT_PROMPT_STR = """
Create a focused financial update based on this outline and source documents.
Expand All @@ -89,7 +89,7 @@
Available Source Documents:
{% for doc in documents %}
<document>
<is_important>true</is_important>
<type>{"Target Document" if doc.type == "target" else "Context Document"}</type>
<path>{{doc.filename}}</path>
<summary>
{{doc.summary}}
Expand All @@ -116,9 +116,9 @@
- Clear source attribution
2. Content Structure
- Follow the provided outline
- Prioritize insights from Target Documents
- Support with Context Documents where relevant
- Maintain logical flow between points
- Support key claims with evidence
- End with a clear takeaway
3. Text Formatting:
Expand Down
2 changes: 1 addition & 1 deletion services/AgentService/podcast_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def podcast_generate_raw_outline(
for pdf in summarized_pdfs:
doc_str = f"""
<document>
<is_important>true</is_important>
<type>{"Target Document" if pdf.type == "target" else "Context Document"}</type>
<path>{pdf.filename}</path>
<summary>
{pdf.summary}
Expand Down
8 changes: 4 additions & 4 deletions services/AgentService/podcast_prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
Requirements:
1. Content Strategy
- Prioritize topics according to focus instructions
- Focus on the content in Target Documents, and use Context Documents as support and context
- Identify key debates and differing viewpoints
- Analyze potential audience questions/concerns
- Draw connections between documents and focus areas
Expand All @@ -62,12 +62,12 @@
- Build natural narrative flow between topics
3. Coverage
- Comprehensive treatment of focus areas
- Strategic depth on key topics
- Comprehensive treatment of Target Documents
- Strategic integration of Context Documents for support
- Supporting evidence from all relevant documents
- Balance technical accuracy with engaging delivery
Ensure the outline creates a cohesive narrative that emphasizes the specified focus areas while maintaining overall context and accuracy.
Ensure the outline creates a cohesive narrative that emphasizes the Target Documents while using Context Documents to provide additional depth and background information.
"""

PODCAST_MULTI_PDF_STRUCUTRED_OUTLINE_PROMPT_STR = """
Expand Down
24 changes: 17 additions & 7 deletions services/PDFService/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fastapi import FastAPI, File, UploadFile, BackgroundTasks, HTTPException, Form
from fastapi import FastAPI, BackgroundTasks, HTTPException, Form, File, UploadFile
from shared.job import JobStatusManager
from shared.otel import OpenTelemetryInstrumentation, OpenTelemetryConfig
from opentelemetry.trace.status import StatusCode
Expand Down Expand Up @@ -153,11 +153,15 @@ async def convert_pdfs_to_markdown(
)


async def process_pdfs(
job_id: str, contents: List[bytes], filenames: List[str], vdb_task: bool = False
async def convert_pdfs(
job_id: str,
contents: List[bytes],
filenames: List[str],
types: List[str],
vdb_task: bool = False,
):
"""Process multiple PDFs and return metadata for each"""
with telemetry.tracer.start_as_current_span("pdf.process_pdfs") as span:
with telemetry.tracer.start_as_current_span("pdf.convert_pdfs") as span:
try:
logger.info(
f"Starting PDF processing for job {job_id} with {len(contents)} files"
Expand Down Expand Up @@ -194,13 +198,14 @@ async def process_pdfs(

# Create metadata list
pdf_metadata_list = []
for filename, result in zip(filenames, results):
for filename, result, type in zip(filenames, results, types):
try:
metadata = PDFMetadata(
filename=filename,
markdown=result.content
if result.status == ConversionStatus.SUCCESS
else "",
type=type,
status=result.status,
error=result.error,
)
Expand Down Expand Up @@ -257,6 +262,7 @@ async def process_pdfs(
async def convert_pdf(
background_tasks: BackgroundTasks,
files: List[UploadFile] = File(...),
types: List[str] = Form(...),
job_id: str = Form(...),
vdb_task: bool = Form(False),
):
Expand All @@ -272,16 +278,20 @@ async def convert_pdf(
# Read all file contents and filenames
contents = []
filenames = []
for file in files:
file_types = []
for file, type in zip(files, types):
content = await file.read()
contents.append(content)
filenames.append(file.filename)
file_types.append(type)

span.set_attribute("num_files", len(files))
job_manager.create_job(job_id)

# Start processing in background
background_tasks.add_task(process_pdfs, job_id, contents, filenames, vdb_task)
background_tasks.add_task(
convert_pdfs, job_id, contents, filenames, file_types, vdb_task
)

return {"job_id": job_id}

Expand Down
3 changes: 2 additions & 1 deletion shared/shared/pdf_types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pydantic import BaseModel, Field
from typing import Optional
from typing import Optional, Union, Literal
from datetime import datetime
from enum import Enum

Expand All @@ -21,5 +21,6 @@ class PDFMetadata(BaseModel):
markdown: str = ""
summary: str = ""
status: ConversionStatus
type: Union[Literal["target"], Literal["context"]]
error: Optional[str] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
2 changes: 1 addition & 1 deletion tests/prod-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ python3 test.py --monologue \
db-context.pdf \
gs-context.pdf \
hsbc-context.pdf \
investorpres-main.pdf \
investorpres-main.pdf target \
jpm-context.pdf \
keybanc-context.pdf
Loading

0 comments on commit 9911f58

Please sign in to comment.