Skip to content

Commit d3c3a3e

Browse files
authored
send email with ai (#91)
* send email with ai * schedule workflow * fix openai * workflow failure simulate * restack ai update
1 parent d58bfcb commit d3c3a3e

14 files changed

+835
-0
lines changed

email_sender/.env.example

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
SENDGRID_API_KEY=your-sendgrid-api-key
3+
OPENAI_API_KEY=your-openai-api-key
4+

email_sender/README.md

Whitespace-only changes.

email_sender/poetry.lock

+547
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

email_sender/pyproject.toml

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[tool.poetry]
2+
name = "email_sender"
3+
version = "0.0.1"
4+
description = "Send emails with sendgrid"
5+
authors = [
6+
"Restack Team <[email protected]>",
7+
]
8+
readme = "README.md"
9+
packages = [{include = "src"}]
10+
11+
[tool.poetry.dependencies]
12+
python = "3.12.7"
13+
pydantic = "2.9.2"
14+
python-dotenv = "1.0.1"
15+
sendgrid = "6.11.0"
16+
openai = "1.56.2"
17+
restack-ai = "0.0.42"
18+
19+
[build-system]
20+
requires = ["poetry-core"]
21+
build-backend = "poetry.core.masonry.api"
22+
23+
[tool.poetry.scripts]
24+
services = "src.services:run_services"
25+
schedule = "schedule_workflow:run_schedule_workflow"
26+
schedule_failure = "schedule_workflow_failure:run_schedule_workflow_failure"

email_sender/schedule_workflow.py

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import asyncio
2+
import time
3+
from restack_ai import Restack
4+
from dataclasses import dataclass
5+
import os
6+
from dotenv import load_dotenv
7+
8+
load_dotenv()
9+
10+
@dataclass
11+
class InputParams:
12+
email_context: str
13+
subject: str
14+
to: str
15+
16+
async def main():
17+
client = Restack()
18+
19+
workflow_id = f"{int(time.time() * 1000)}-SendEmailWorkflow"
20+
to_email = os.getenv("TO_EMAIL")
21+
if not to_email:
22+
raise Exception("TO_EMAIL environment variable is not set")
23+
24+
run_id = await client.schedule_workflow(
25+
workflow_name="SendEmailWorkflow",
26+
workflow_id=workflow_id,
27+
input={
28+
"email_context": "This email should contain a greeting. And telling user we have launched a new AI feature with Restack workflows. Workflows now offer logging and automatic retries when one of its steps fails. Name of user is not provided. You can set as goodbye message on the email just say 'Best regards' or something like that. No need to mention name of user or name of person sending the email.",
29+
"subject": "Hello from Restack",
30+
"to": to_email
31+
}
32+
)
33+
34+
await client.get_workflow_result(
35+
workflow_id=workflow_id,
36+
run_id=run_id
37+
)
38+
39+
exit(0)
40+
41+
def run_schedule_workflow():
42+
asyncio.run(main())
43+
44+
if __name__ == "__main__":
45+
run_schedule_workflow()
+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import asyncio
2+
import time
3+
from restack_ai import Restack
4+
from dataclasses import dataclass
5+
import os
6+
from dotenv import load_dotenv
7+
8+
load_dotenv()
9+
10+
@dataclass
11+
class InputParams:
12+
email_context: str
13+
subject: str
14+
to: str
15+
16+
async def main():
17+
client = Restack()
18+
19+
workflow_id = f"{int(time.time() * 1000)}-SendEmailWorkflow"
20+
to_email = os.getenv("TO_EMAIL")
21+
if not to_email:
22+
raise Exception("TO_EMAIL environment variable is not set")
23+
24+
run_id = await client.schedule_workflow(
25+
workflow_name="SendEmailWorkflow",
26+
workflow_id=workflow_id,
27+
input={
28+
"email_context": "This email should contain a greeting. And telling user we have launched a new AI feature with Restack workflows. Workflows now offer logging and automatic retries when one of its steps fails. Name of user is not provided. You can set as goodbye message on the email just say 'Best regards' or something like that. No need to mention name of user or name of person sending the email.",
29+
"subject": "Hello from Restack",
30+
"to": to_email,
31+
"simulate_failure": True
32+
}
33+
)
34+
35+
await client.get_workflow_result(
36+
workflow_id=workflow_id,
37+
run_id=run_id
38+
)
39+
40+
exit(0)
41+
42+
def run_schedule_workflow_failure():
43+
asyncio.run(main())
44+
45+
if __name__ == "__main__":
46+
run_schedule_workflow_failure()

email_sender/src/__init__.py

Whitespace-only changes.

email_sender/src/client.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import os
2+
from restack_ai import Restack
3+
from restack_ai.restack import CloudConnectionOptions
4+
from dotenv import load_dotenv
5+
6+
# Load environment variables from a .env file
7+
load_dotenv()
8+
9+
10+
engine_id = os.getenv("RESTACK_ENGINE_ID")
11+
address = os.getenv("RESTACK_ENGINE_ADDRESS")
12+
api_key = os.getenv("RESTACK_ENGINE_API_KEY")
13+
14+
connection_options = CloudConnectionOptions(
15+
engine_id=engine_id,
16+
address=address,
17+
api_key=api_key
18+
)
19+
client = Restack(connection_options)

email_sender/src/functions/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from restack_ai.function import function, log, FunctionFailure
2+
from dataclasses import dataclass
3+
from openai import OpenAI
4+
import os
5+
from dotenv import load_dotenv
6+
7+
load_dotenv()
8+
9+
tries = 0
10+
11+
@dataclass
12+
class GenerateEmailInput:
13+
email_context: str
14+
simulate_failure: bool = False
15+
16+
@function.defn()
17+
async def generate_email_content(input: GenerateEmailInput):
18+
global tries
19+
20+
if input.simulate_failure and tries == 0:
21+
tries += 1
22+
raise FunctionFailure("Simulated failure", non_retryable=False)
23+
24+
if (os.environ.get("OPENAI_API_KEY") is None):
25+
raise FunctionFailure("OPENAI_API_KEY is not set", non_retryable=True)
26+
27+
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
28+
29+
response = client.chat.completions.create(
30+
model="gpt-4",
31+
messages=[
32+
{
33+
"role": "system",
34+
"content": "You are a helpful assistant that generates short emails based on the provided context."
35+
},
36+
{
37+
"role": "user",
38+
"content": f"Generate a short email based on the following context: {input.email_context}"
39+
}
40+
],
41+
max_tokens=150
42+
)
43+
44+
return response.choices[0].message.content
45+
+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import sendgrid
2+
from sendgrid.helpers.mail import Mail
3+
import os
4+
from restack_ai.function import function, FunctionFailure
5+
from dataclasses import dataclass
6+
from dotenv import load_dotenv
7+
8+
load_dotenv()
9+
10+
@dataclass
11+
class SendEmailInput:
12+
text: str
13+
subject: str
14+
to: str
15+
16+
@function.defn()
17+
async def send_email(input: SendEmailInput) -> None:
18+
from_email = os.getenv('FROM_EMAIL')
19+
20+
if not from_email:
21+
raise FunctionFailure('FROM_EMAIL is not set', non_retryable=True)
22+
23+
sendgrid_api_key = os.getenv('SENDGRID_API_KEY')
24+
25+
if not sendgrid_api_key:
26+
raise FunctionFailure('SENDGRID_API_KEY is not set', non_retryable=True)
27+
28+
29+
message = Mail(
30+
from_email=from_email,
31+
to_emails=input.to,
32+
subject=input.subject,
33+
plain_text_content=input.text
34+
)
35+
36+
try:
37+
sg = sendgrid.SendGridAPIClient(sendgrid_api_key)
38+
sg.send(message)
39+
except Exception:
40+
raise FunctionFailure('Failed to send email', non_retryable=False)

email_sender/src/services.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import asyncio
2+
from src.client import client
3+
from src.workflows.send_email import SendEmailWorkflow
4+
from src.functions.send_email import send_email
5+
from src.functions.generate_email_content import generate_email_content
6+
7+
async def main():
8+
await asyncio.gather(
9+
client.start_service(
10+
workflows=[SendEmailWorkflow],
11+
functions=[generate_email_content, send_email]
12+
)
13+
)
14+
15+
def run_services():
16+
asyncio.run(main())
17+
18+
if __name__ == "__main__":
19+
run_services()

email_sender/src/workflows/__init__.py

Whitespace-only changes.
+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from restack_ai.workflow import workflow, import_functions, log, RetryPolicy
2+
from dataclasses import dataclass
3+
from datetime import timedelta
4+
5+
with import_functions():
6+
from src.functions.send_email import send_email, SendEmailInput
7+
from src.functions.generate_email_content import generate_email_content, GenerateEmailInput
8+
9+
@dataclass
10+
class WorkflowInputParams:
11+
email_context: str
12+
subject: str
13+
to: str
14+
simulate_failure: bool = False
15+
16+
@workflow.defn()
17+
class SendEmailWorkflow:
18+
@workflow.run
19+
async def run(self, input: WorkflowInputParams):
20+
log.info("SendEmailWorkflow started", input=input)
21+
22+
text = await workflow.step(
23+
generate_email_content,
24+
GenerateEmailInput(
25+
email_context=input.email_context,
26+
simulate_failure=input.simulate_failure,
27+
),
28+
retry_policy=RetryPolicy(
29+
initial_interval=timedelta(seconds=10),
30+
backoff_coefficient=1,
31+
),
32+
)
33+
34+
await workflow.step(
35+
send_email,
36+
SendEmailInput(
37+
text=text,
38+
subject=input.subject,
39+
to=input.to,
40+
),
41+
start_to_close_timeout=timedelta(seconds=120)
42+
)
43+
44+
return 'Email sent successfully'

0 commit comments

Comments
 (0)