-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
206 lines (185 loc) · 9.79 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
__author__ = "Tatiana Kadykova"
import telebot
import pickle
from os.path import isfile
from channel_analyzer import MessagesCollector
from channels_handler import ChannelsHandler
from secrets import telebot_token
def is_int(s):
"""
@param {str} s A string to check if it is an int
@returns {bool} True, if int(s) can be used, else False
"""
try:
int(s)
return True
except:
return False
bot = telebot.TeleBot(telebot_token)
msg_collector = MessagesCollector()
all_users = dict()
def answer_success(bot, chat_id, success, ok_message, bad_message, *args, **kwargs):
"""
@param {telebot.TeleBot} bot A telegram bot to send message from
@param {int} chat_id The chat id to send message to
@param {dict} success Information about command execution result
@param {str} ok_message What to reply if the execution succeeded
and there're nothing to reply from command
@param {str} bad_message What to reply if the execution didn't succeed
and there're nothing to reply from command
@params {ANY} *args These arguments will be given to send_message
@params {ANY} **kwargs These arguments will be given to send_message
"""
if 'user_message' in success:
bot.send_message(chat_id, success['user_message'], *args, **kwargs)
elif success['ok']:
bot.send_message(chat_id, ok_message, *args, **kwargs)
else:
bot.send_message(chat_id, bad_message, *args, **kwargs)
def unfalling(func):
def ans(message, *args, **kwargs):
try:
result = func(message, *args, **kwargs)
return result
except KeyboardInterrupt:
exit(0)
except Exception as e:
print(e)
try:
bot.reply_to(message, "Извините, произошла ошибка. Пожалуйста, обратитесь к разработчикам. Бот продолжит работать в своем обычном режиме")
except:
pass
return ans
def safe_user_access(func):
def ans(message, *args, **kwargs):
if(message.chat.id not in all_users.keys()):
all_users[message.chat.id] = ChannelsHandler(
bot,
message.chat.id,
msg_collector
)
return func(message, *args, **kwargs)
return ans
def autodump(func):
def ans(*args, **kwargs):
result = func(*args, **kwargs)
with open("userdata.pickle", "wb") as f:
pickle.dump(
{k: v.dumps() for k, v in all_users.items()},
f
)
return result
return ans
@bot.message_handler(commands=['start'])
@unfalling
def init(message):
if message.chat.id in all_users:
bot.reply_to(message, 'Вы уже зарегистрированы. Если вы хотите сбросить все настройки, то попрощайтесь со мной (команда /stop), а затем познакомьтесь снова (/start)')
else:
bot.reply_to(message, 'Привет! Я симулирую ленту наиболее популярных новостей по выбранным вами каналам. \nДля того, чтобы начать, пришлите мне "Добавить <@Упоминание_канала> [M] [N]". Я начну присылать вам N новостей по этому каналу каждые M часов. \n\nЧтобы посмотреть полный список команд, пришлите /help')
@bot.message_handler(commands=['help'])
@unfalling
@safe_user_access
def comands(message):
bot.reply_to(message, 'Полный список команд:\n\n\
"Добавить <@Упоминание_канала> <M> <N>" - Я начну присылать вам N самых популярных новостей выбранного канала каждые M часов. Отсчет времени начнется с момента добавления канала\n\n\
"Удалить <@Упоминание_канала> - Я больше не буду присылать вам новости этого канала\n\n\
"Изменить количество <@Упоминание_канала> <N>" - Теперь я буду присылать вам N новостей по этому каналу\n\n\
"Изменить частоту <@Упоминание_канала> <M>" - Я буду присылать вам новости этого канала каждые M часов\n\n\n\
/list - посмотреть мои каналы\n\
/stop - Прекратить общение со мной')
@bot.message_handler(commands=['list'])
@unfalling
def list_channels(message):
if message.chat.id not in all_users.keys():
bot.reply_to(message, "Кажется, мы еще не знакомы...")
elif all_users[message.chat.id] == {}:
bot.reply_to(message, "У вас нет отслеживаемых каналов")
else:
ans = ''
for channel in all_users[message.chat.id].channels.values():
ans += 'Для канала {} присылаю {} сообщений каждые {} часов\n'.format(
channel.channel_id,
channel.count,
channel.frequency
)
bot.reply_to(message, ans)
@bot.message_handler(commands=['stop'])
@unfalling
def stop(message):
if message.chat.id in all_users:
bot.reply_to(message, 'Вы уверены, что хотите прекратить общаться? Все ваши настройки сбросятся! Если вы согласны, то пришлите мне /yesstop.')
return
bot.reply_to(message, "Вы просите меня забыть вас, но мы даже не были знакомы... Чтобы познакомиться, пришлите /start")
@bot.message_handler(commands=['yesstop'])
@unfalling
@safe_user_access
@autodump
def del_user(message):
bot.reply_to(message, 'Можете считать, что меня никогда не существовало.. Пока-пока')
del all_users[message.chat.id]
@bot.message_handler()
@unfalling
@safe_user_access
@autodump
def msg_handler(message):
text = message.text.lower()
if(len(text.split()) == 4 and text.split()[0] == "добавить" and is_int(text.split()[2]) \
and is_int(text.split()[3])):
#добавляем новый канал
answer_success(
bot,
message.chat.id,
all_users[message.chat.id].add_channel(text.split()[1], int(text.split()[2]), int(text.split()[3])),
"Канал был успешно добавлен",
"Не удалось добавить канал. Возможно, следует обратиться к разработчикам",
reply_to_message_id=message.message_id
)
elif(len(text.split()) == 2 and text.split()[0] == "удалить"):
#удаляем канал
answer_success(
bot,
message.chat.id,
all_users[message.chat.id].del_channel(text.split()[1]),
"Я навсегда забыл про этот канал",
"Не удалось забыть канал. Возможно, следует обратиться к разработчикам",
reply_to_message_id=message.message_id
)
elif(len(text.split()) == 4 and " ".join(text.split()[0:2]) == "изменить количество" \
and is_int(text.split()[-1])):
#изменяем количество новостей
answer_success(
bot,
message.chat.id,
all_users[message.chat.id].edit_channel(text.split()[2], new_count=int(text.split()[-1])),
"Теперь я буду присылать вам другое количество новостей по этому каналу",
"Не удалось выполнить операцию. Возможно, следует обратиться к разработчикам",
reply_to_message_id=message.message_id
)
elif(len(text.split()) == 4 and " ".join(text.split()[0:2]) == "изменить частоту" \
and is_int(text.split()[-1])):
#изменяем частоту
answer_success(
bot,
message.chat.id,
all_users[message.chat.id].edit_channel(text.split()[2], new_frequency=int(text.split()[3])),
"Частота успешно обновлена!",
"Не удалось выполнить операцию. Возможно, следует обратиться к разработчикам",
reply_to_message_id=message.message_id
)
else:
bot.reply_to(message, 'Неверный формат. Попробуйте /help')
if __name__ == "__main__":
if isfile("userdata.pickle"):
with open("userdata.pickle", "rb") as f:
all_users = {k: ChannelsHandler(bot, k, msg_collector).loads(v) for k, v in pickle.load(f).items()}
for user in list(all_users.keys())[:]:
try:
bot.send_message(user, "Извините, бот был перезагружен. Но не волнуйтесь! Мы сохранили ваши настройки. Время отправки новостей будет отсчитываться с текущего момента")
except Exception as e:
try:
if(e.result.json()['description'] == 'Forbidden: bot was blocked by the user'):
del all_users[user]
except:
pass
bot.polling(none_stop=False)