-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathmanager.py
140 lines (122 loc) · 5.95 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from __future__ import annotations
import asyncio
import time
from collections.abc import Sequence
from rich.console import Console
from agents import Runner, RunResult, custom_span, gen_trace_id, trace
from .agents.financials_agent import financials_agent
from .agents.planner_agent import FinancialSearchItem, FinancialSearchPlan, planner_agent
from .agents.risk_agent import risk_agent
from .agents.search_agent import search_agent
from .agents.verifier_agent import VerificationResult, verifier_agent
from .agents.writer_agent import FinancialReportData, writer_agent
from .printer import Printer
async def _summary_extractor(run_result: RunResult) -> str:
"""Custom output extractor for sub‑agents that return an AnalysisSummary."""
# The financial/risk analyst agents emit an AnalysisSummary with a `summary` field.
# We want the tool call to return just that summary text so the writer can drop it inline.
return str(run_result.final_output.summary)
class FinancialResearchManager:
"""
Orchestrates the full flow: planning, searching, sub‑analysis, writing, and verification.
"""
def __init__(self) -> None:
self.console = Console()
self.printer = Printer(self.console)
async def run(self, query: str) -> None:
trace_id = gen_trace_id()
with trace("Financial research trace", trace_id=trace_id):
self.printer.update_item(
"trace_id",
f"View trace: https://platform.openai.com/traces/{trace_id}",
is_done=True,
hide_checkmark=True,
)
self.printer.update_item(
"start", "Starting financial research...", is_done=True)
search_plan = await self._plan_searches(query)
search_results = await self._perform_searches(search_plan)
report = await self._write_report(query, search_results)
verification = await self._verify_report(report)
final_report = f"Report summary\n\n{report.short_summary}"
self.printer.update_item(
"final_report", final_report, is_done=True)
self.printer.end()
# Print to stdout
print("\n\n=====REPORT=====\n\n")
print(f"Report:\n{report.markdown_report}")
print("\n\n=====FOLLOW UP QUESTIONS=====\n\n")
print("\n".join(report.follow_up_questions))
print("\n\n=====VERIFICATION=====\n\n")
print(verification)
async def _plan_searches(self, query: str) -> FinancialSearchPlan:
self.printer.update_item("planning", "Planning searches...")
result = await Runner.run(planner_agent, f"Query: {query}")
self.printer.update_item(
"planning",
f"Will perform {len(result.final_output.searches)} searches",
is_done=True,
)
return result.final_output_as(FinancialSearchPlan)
async def _perform_searches(self, search_plan: FinancialSearchPlan) -> Sequence[str]:
with custom_span("Search the web"):
self.printer.update_item("searching", "Searching...")
tasks = [asyncio.create_task(self._search(item))
for item in search_plan.searches]
results: list[str] = []
num_completed = 0
for task in asyncio.as_completed(tasks):
result = await task
if result is not None:
results.append(result)
num_completed += 1
self.printer.update_item(
"searching", f"Searching... {num_completed}/{len(tasks)} completed"
)
self.printer.mark_item_done("searching")
return results
async def _search(self, item: FinancialSearchItem) -> str | None:
input_data = f"Search term: {item.query}\nReason: {item.reason}"
try:
result = await Runner.run(search_agent, input_data)
return str(result.final_output)
except Exception:
return None
async def _write_report(self, query: str, search_results: Sequence[str]) -> FinancialReportData:
# Expose the specialist analysts as tools so the writer can invoke them inline
# and still produce the final FinancialReportData output.
fundamentals_tool = financials_agent.as_tool(
tool_name="fundamentals_analysis",
tool_description="Use to get a short write‑up of key financial metrics",
custom_output_extractor=_summary_extractor,
)
risk_tool = risk_agent.as_tool(
tool_name="risk_analysis",
tool_description="Use to get a short write‑up of potential red flags",
custom_output_extractor=_summary_extractor,
)
writer_with_tools = writer_agent.clone(
tools=[fundamentals_tool, risk_tool])
self.printer.update_item("writing", "Thinking about report...")
input_data = f"Original query: {query}\nSummarized search results: {search_results}"
result = Runner.run_streamed(writer_with_tools, input_data)
update_messages = [
"Planning report structure...",
"Writing sections...",
"Finalizing report...",
]
last_update = time.time()
next_message = 0
async for _ in result.stream_events():
if time.time() - last_update > 5 and next_message < len(update_messages):
self.printer.update_item(
"writing", update_messages[next_message])
next_message += 1
last_update = time.time()
self.printer.mark_item_done("writing")
return result.final_output_as(FinancialReportData)
async def _verify_report(self, report: FinancialReportData) -> VerificationResult:
self.printer.update_item("verifying", "Verifying report...")
result = await Runner.run(verifier_agent, report.markdown_report)
self.printer.mark_item_done("verifying")
return result.final_output_as(VerificationResult)