-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path03.py
99 lines (66 loc) · 2.66 KB
/
03.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import asyncio
from typing import Optional
from time import sleep
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from beanie import Document, Indexed, init_beanie
from beanie_batteries_queue import Task, State, Queue
from beanie_batteries_queue.worker import Worker
from beanie_batteries_queue.runner import Runner
from beanie_batteries_queue.scheduled_task import ScheduledTask
class Category(BaseModel):
name: str
description: str
class Product(Document):
name: str # You can use normal types just like in pydantic
description: Optional[str] = None
price: Indexed(float) # You can also specify that a field should correspond to an index
category: Category # You can include pydantic models as well
class SimpleTask(Task):
s: str = ""
async def run(self):
# Implement the logic for processing the task
print(f"Processing task with data: {self.s}")
self.s = self.s.upper()
await self.save()
class ProcessTask(Task):
data: str
async def run(self):
self.data = self.data.upper()
await self.save()
class AnotherTask(Task):
data: str
async def run(self):
self.data = self.data.upper()
await self.save()
# This is an asynchronous example, so we will access it from an async function
async def example():
# Beanie uses Motor async client under the hood
client = AsyncIOMotorClient("mongodb://localhost:27017")
# Initialize beanie with the Product document class
await init_beanie(database=client.db_name, document_models=[Product,
SimpleTask,
ProcessTask,
AnotherTask,
ScheduledTask
])
# Producer
task = SimpleTask(s="test SimpleTask")
await task.push()
task = ProcessTask(data="test ProcessTask")
await task.push()
task = AnotherTask(data="test AnotherTask")
await task.push()
# Consumer
runner = Runner(task_classes=[AnotherTask])
runner.start()
#asyncio.create_task(await worker.start())
#worker = Worker(task_classes=[ProcessTask, AnotherTask])
#await worker.start()
#asyncio.create_task(await worker.start())
#sleep(10)
#asyncio.create_task(queue.start())
print("Task is finished")
if __name__ == "__main__":
asyncio.run(example())
print("Done")