-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path02.py
86 lines (57 loc) · 2.26 KB
/
02.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import asyncio
from typing import Optional
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
from beanie import Document, Indexed, init_beanie
class Category(BaseModel):
name: str
description: str
class Product(Document):
name: str # You can use normal types just like in pydantic
description: Optional[str] = None
price: Indexed(float) # You can also specify that a field should correspond to an index
category: Category # You can include pydantic models as well
from beanie_batteries_queue import Task, Runner, State
from time import sleep
class SimpleTask(Task):
s: str = ""
sleep(5)
async def run(self):
# Implement the logic for processing the task
print(f"Processing task with data: {self.s}")
self.s = self.s.upper()
await self.save()
# This is an asynchronous example, so we will access it from an async function
async def example():
# Beanie uses Motor async client under the hood
client = AsyncIOMotorClient("mongodb://localhost:27017")
# Initialize beanie with the Product document class
await init_beanie(database=client.db_name, document_models=[Product, SimpleTask])
chocolate = Category(name="Chocolate", description="A preparation of roasted and ground cacao seeds.")
# Beanie documents work just like pydantic models
tonybar = Product(name="Tony's", price=5.95, category=chocolate)
# And can be inserted into the database
await tonybar.insert()
# You can find documents with pythonic syntax
product = await Product.find_one(Product.price < 10)
# And update them
await product.set({Product.name:"Gold bar"})
# Producer
task = SimpleTask(s="test")
await task.push()
# Consumer
async for task in SimpleTask.queue():
assert task.s == "test"
# Do some work
await task.finish()
break
# Check that the task is finished
task = await SimpleTask.find_one({"s": "test"})
assert task.state == State.FINISHED
queue = SimpleTask.queue()
#await
asyncio.create_task(queue.start())
print("Task is finished")
if __name__ == "__main__":
asyncio.run(example())
print("Done")