-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsocial_api.py
132 lines (119 loc) · 4.47 KB
/
social_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from fastapi import FastAPI, BackgroundTasks
import uvicorn
import logfire
from typing import Literal
import os
import asyncio
from social.twitter.agent import initialize_agent as initialize_twitter_agent
from social.twitter.agent import check_mentions as check_twitter_mentions, post as post_to_twitter
from models.twitter import StartTwitterArgs, StopTwitterArgs, GetTwitterStatusArgs
from dotenv import load_dotenv
import time
load_dotenv()
app = FastAPI()
logfire.configure(
token=os.getenv("LOGFIRE_TOKEN"),
service_name="social_API",
send_to_logfire="if-token-present",
scrubbing=False
)
running_loop = False
running_social = {
"twitter": False,
#"farcaster": False #TODO: implement farcaster
}
global_prompt = 'Post on twitter about how cool SuperAssistant is!'
last_time_checked_twitter_mentions = 0
last_time_posted_on_twitter = 0
mentions_rate_limit = 60*60*8
post_rate_limit = 60*60*3
twitter_agent, config = initialize_twitter_agent()
responses = {
"twitter": {
"check_mentions": [],
"post": []
}
}
async def main_loop():
global global_prompt
global running_social
global running_loop
global last_time_checked_twitter_mentions
global last_time_posted_on_twitter
global responses
while True:
if not running_loop:
break
if running_social["twitter"]:
if time.time() > last_time_checked_twitter_mentions + mentions_rate_limit:
response = check_twitter_mentions(
agent_executor=twitter_agent,
config=config,
prompt=global_prompt,
status=responses
)
responses["twitter"]["check_mentions"].append({
"agent_response": response,
"timestamp": time.time()
})
last_time_checked_twitter_mentions = time.time()
if running_social["twitter"]:
if time.time() > last_time_posted_on_twitter + post_rate_limit:
response = post_to_twitter(
agent_executor=twitter_agent,
config=config,
prompt=global_prompt,
status=responses
)
responses["twitter"]["post"].append({
"agent_response": response,
"timestamp": time.time()
})
last_time_posted_on_twitter = time.time()
await asyncio.sleep(10)
@app.post("/start_with_prompt")
async def start_with_prompt(args: StartTwitterArgs, background_tasks: BackgroundTasks):
global running_loop
global global_prompt
global running_social
global_prompt = args.prompt if args.prompt != "default" else global_prompt
if not running_loop:
running_loop = True
running_social[args.social_network] = True
background_tasks.add_task(main_loop)
return f"Started {args.social_network} agent with prompt: {args.prompt}"
else:
return f"Already running {args.social_network} agent, changed prompt to: {args.prompt}"
@app.post("/stop")
async def stop_loop(args: StopTwitterArgs):
global running_social
global running_loop
if not args.social_network:
running_loop = False
running_social = {
"twitter": False,
}
else:
running_social[args.social_network] = False
if not running_social[args.social_network]:
running_loop = False
return f"Stopped {args.social_network}" if args.social_network else "Stopped all"
@app.get("/status")
async def get_status(args: GetTwitterStatusArgs):
return {
"running": running_loop,
"running_social": running_social,
"prompt": global_prompt,
"last_time_checked_mentions": last_time_checked_twitter_mentions if args.social_network == "twitter" else None, #already preapred to add more socials
"last_time_posted": last_time_posted_on_twitter if args.social_network == "twitter" else None, #already preapred to add more socials
"agent_responses": responses[args.social_network]
}
if __name__ == "__main__":
logfire.info("Starting social API", _tags=["social_api"])
try:
uvicorn.run(app, host='localhost', port=int(os.getenv("SOCIAL_API_PORT")) or 42070)
except KeyboardInterrupt:
logfire.info("Social API stopped", _tags=["social_api"])
except Exception as e:
logfire.error(f"Error running social API: {e}", _tags=["social_api"])
raise e