-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathasynchronous.py
211 lines (164 loc) · 9.36 KB
/
asynchronous.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import asyncio
import time
import aiohttp
from typing import Optional, List
from Weverse import WeverseClientAsync, InvalidToken, BeingRateLimited, models
from dotenv import load_dotenv
from os import getenv
"""
asynchronous.py
Asynchronous Examples for Weverse.
This file should be able to run if a token is supplied.
If you would like to view more specific information,
then it would be suggested to look at the API Docs at https://weverse.readthedocs.io/en/latest/
or run this file in a debugger.
In order to get a token: https://weverse.readthedocs.io/en/latest/api.html#get-account-token
In order to set a token, Rename .env.example to .env and set WEVERSE_AUTH to the token (recommended).
You may also set the variable weverse_token to the token (not recommended).
There is also now the availability to login using a usernane and password instead of a token.
This file will display both auth methods.
If you are running in a Synchronous environment, go take a look at the Synchronous examples file.
"""
def get_formatted_time(seconds):
"""Turn seconds into days, hours, minutes, and seconds.
Not related to Weverse.
:param seconds: Amount of seconds to convert.
"""
seconds = round(seconds)
minute, sec = divmod(seconds, 60)
hour, minute = divmod(minute, 60)
day, hour = divmod(hour, 24)
return f"{f'{day}d ' if day else ''}" \
f"{f'{hour}h ' if hour else ''}" \
f"{f'{minute}m ' if minute else ''}" \
f"{f'{sec}s' if sec else ''}" \
f"{f'0s' if seconds < 1 else ''}"
class Example:
def __init__(self):
self.weverse_client: Optional[WeverseClientAsync] = None
async def check_new_notifications(self) -> Optional[List[models.Notification]]:
"""Check for new weverse notifications.
This should be called in a loop to constantly check for notifications.
"""
if not self.weverse_client.cache_loaded:
# This is more of an issue with async, but we should always make sure the cache is loaded.
return
if not await self.weverse_client.check_new_user_notifications():
# This will check if there are new notifications and will process them into objects for us.
return
# They should now be loaded into notification cache, but just to confirm nothing went wrong, we will check.
if not self.weverse_client.user_notifications:
return
# We can now get the latest notifications
# get_new_notifications is new in V1.0.3. Separates new notifications from older ones.
latest_notifications = self.weverse_client.get_new_notifications()
return latest_notifications
async def start(self):
"""
Managing Weverse Client and starting the notification loop.
"""
load_dotenv() # loads up .env in memory
weverse_token = getenv("WEVERSE_AUTH") or "fake_token"
# you have the option to pass in a token or a username and password.
weverse_username = getenv("WEVERSE_USERNAME") or None
weverse_password = getenv("WEVERSE_PASSWORD") or None
web_session = aiohttp.ClientSession() # create a web session for Weverse to use (Not needed, but recommended)
client_kwargs = {
"verbose": True, # Will print warning messages for links that have failed to connect or were not found.
"web_session": web_session, # Existing web session
# Auth Token to connect to Weverse. Not needed if you are using the login method.
"authorization": weverse_token,
"username": weverse_username, # Username if you are using the login method.
"password": weverse_password, # Password if you are using the login method.
"hook": self.on_new_notifications, # The method that will receive new notifications.
"loop": asyncio.get_event_loop() # current event loop
}
self.weverse_client = WeverseClientAsync(**client_kwargs)
# start the client process.
start_kwargs = {
# Will load ALL followed community posts. If following majority communities, this will take
# more than 10 minutes to complete. If you are not in need of OLDER posts, you can set it to False and
# it should load within a few seconds.
"create_old_posts": True,
# Will load up the past 20 notifications. This is a quick process, so you may as well load them up.
"create_notifications": True,
# Will load ALL media posts. This will take a very long time if enabled.
"create_media": True
}
# You can actually check if the token works before starting the cache process and receiving an exception.
# If you are using a normal login, do not check.
# token_works = await self.weverse_client.check_token_works()
try:
print("Weverse cache is currently being created.")
start_time = time.time() # Start Time for Weverse Cache
await self.weverse_client.start(**start_kwargs) # start the client process to all needed data.
end_time = time.time() # The time when Weverse Cache has loaded.
print(f"Weverse cache has loaded after {get_formatted_time(end_time - start_time)}")
except InvalidToken:
raise RuntimeError("An Invalid Token was passed in")
except BeingRateLimited:
raise RuntimeError("Weverse Client is being rate-limited.") # Note that this issue has never occurred yet.
except Exception as e:
raise e # more exceptions regarding incorrect logins would be raised.
# We now have all of the data we would want to modify.
# loop the notifications
if not self.weverse_client._hook: # we are just checking to see if we passed in a hook method
await self.loop_notifications()
async def loop_notifications(self):
"""Check for notifications on a loop.
THIS IS THE MANUAL WAY TO CHECK FOR NOTIFICATIONS.
IT IS RECOMMENDED TO PASS IN A HOOK TO THE CLIENT INSTEAD.
"""
continue_loop = True
while continue_loop: # Just a warning that this would block if not under it's own task.
# This is a loop running to check for notifications.
new_notifications = await self.check_new_notifications()
if not new_notifications:
continue
for new_notification in new_notifications:
await self.handle_notification(new_notification)
continue_loop = False # no longer want to check for notifications.
async def on_new_notifications(self, notifications: List[models.Notification]):
"""
Decides what to do with a list of Notifications.
This is a hook method. Whenever there is a notification, this method will be called.
"""
for notification in notifications:
await self.handle_notification(notification)
async def handle_notification(self, new_notification: models.Notification):
"""Handle a notification."""
# Two ways to determine the community name.
community_name = new_notification.community_name or new_notification.bold_element
if not community_name:
# In the case a notification faults from Weverse, this is a safety measure.
return
# We do not know the type of notification it is because it is usually hidden inside the message.
# The wrapper does not automatically check for this, so we must call the method.
notification_type = self.weverse_client.determine_notification_type(new_notification.message)
possible_notification_types = ["comment", "post", "media", "announcement"]
# The content ID is the ID of the actual post we are looking for.
content_id = new_notification.contents_id
# In the below conditions, you should do any logic needed for knowing the types...
if notification_type == possible_notification_types[0]: # comment
comment = self.weverse_client.get_comment_by_id(content_id)
... # Do stuff with the comment here
elif notification_type == possible_notification_types[1]: # post
post = self.weverse_client.get_post_by_id(content_id)
... # Do stuff with the post here
elif notification_type == possible_notification_types[2]: # media
media = self.weverse_client.get_media_by_id(content_id)
... # Do stuff with the media here
elif notification_type == possible_notification_types[3]: # announcement
announcement = self.weverse_client.get_announcement_by_id(content_id)
... # Do stuff with the announcement here
else: # No Type Found (Perhaps a new type was created but wrapper is not updated)
print(f"{new_notification.id} has an unrecognized type.")
... # do more stuff if you want to.
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(Example().start())
# Please note that if you are running Weverse in a huge program and you are loading the entire cache, this should
# be run using asyncio.create_task
# Assuming you will not be running the Weverse section from the main part (this function) of your program,
# it would be advised to do something along the lines of the below line
# task = asyncio.create_task(Example().start())