-
Notifications
You must be signed in to change notification settings - Fork 196
Expand file tree
/
Copy pathworkflow_with_custom_function_step.py
More file actions
116 lines (96 loc) · 3 KB
/
workflow_with_custom_function_step.py
File metadata and controls
116 lines (96 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
This example shows how to instrument your agno agent with OpenInference
and send traces to Arize Phoenix.
Install dependencies:
pip install openai opentelemetry-sdk opentelemetry-exporter-otlp
pip install openinference-instrumentation-agno
"""
import os
from textwrap import dedent
from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.team import Team
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.hackernews import HackerNewsTools
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.workflow import Workflow
from phoenix.otel import register
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={os.getenv('ARIZE_PHOENIX_API_KEY')}"
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = ""
# configure the Phoenix tracer
tracer_provider = register(
project_name="default", # Default is 'default'
auto_instrument=True, # Automatically use the installed OpenInference instrumentation
)
# Define agents
web_agent = Agent(
name="Web Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[DuckDuckGoTools()],
role="Search the web for the latest news and trends",
)
hackernews_agent = Agent(
name="Hackernews Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[HackerNewsTools()],
role="Extract key insights and content from Hackernews posts",
)
writer_agent = Agent(
name="Writer Agent",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="Write a blog post on the topic",
)
def prepare_input_for_web_search(step_input: StepInput) -> StepOutput:
topic = step_input.input
return StepOutput(
content=dedent(f"""\
I'm writing a blog post on the topic
<topic>
{topic}
</topic>
Search the web for atleast 10 articles\
""")
)
def prepare_input_for_writer(step_input: StepInput) -> StepOutput:
topic = step_input.input
research_team_output = step_input.previous_step_content
return StepOutput(
content=dedent(f"""\
I'm writing a blog post on the topic:
<topic>
{topic}
</topic>
Here is information from the web:
<research_results>
{research_team_output}
<research_results>\
""")
)
# Define research team for complex analysis
research_team = Team(
name="Research Team",
members=[hackernews_agent, web_agent],
instructions="Research tech topics from Hackernews and the web",
)
# Create and use workflow
if __name__ == "__main__":
content_creation_workflow = Workflow(
name="Blog Post Workflow",
description="Automated blog post creation from Hackernews and the web",
db=SqliteDb(
session_table="workflow_session",
db_file="tmp/workflow.db",
),
steps=[
prepare_input_for_web_search,
research_team,
prepare_input_for_writer,
writer_agent,
],
)
content_creation_workflow.print_response(
input="Compare machine learning algorithms for image classification?",
stream=True,
stream_events=True,
)