|
| 1 | +"""example code for ding-dong-bot with oop style""" |
| 2 | +from typing import List, Optional, Union |
| 3 | +import asyncio |
| 4 | +from datetime import datetime |
| 5 | +from wechaty_puppet import get_logger |
| 6 | +from wechaty import ( |
| 7 | + MessageType, |
| 8 | + FileBox, |
| 9 | + RoomMemberQueryFilter, |
| 10 | + Wechaty, |
| 11 | + Contact, |
| 12 | + Room, |
| 13 | + Message, |
| 14 | + Image, |
| 15 | + MiniProgram, |
| 16 | + Friendship, |
| 17 | + FriendshipType, |
| 18 | + EventReadyPayload |
| 19 | +) |
| 20 | + |
| 21 | +logger = get_logger(__name__) |
| 22 | + |
| 23 | + |
| 24 | +class MyBot(Wechaty): |
| 25 | + """ |
| 26 | + listen wechaty event with inherited functions, which is more friendly for |
| 27 | + oop developer |
| 28 | + """ |
| 29 | + |
| 30 | + def __init__(self) -> None: |
| 31 | + """initialization function |
| 32 | + """ |
| 33 | + self.login_user: Optional[Contact] = None |
| 34 | + super().__init__() |
| 35 | + |
| 36 | + async def on_ready(self, payload: EventReadyPayload) -> None: |
| 37 | + """listen for on-ready event""" |
| 38 | + logger.info('ready event %s...', payload) |
| 39 | + |
| 40 | + # pylint: disable=R0912,R0914,R0915 |
| 41 | + async def on_message(self, msg: Message) -> None: |
| 42 | + """ |
| 43 | + listen for message event |
| 44 | + """ |
| 45 | + from_contact: Contact = msg.talker() |
| 46 | + text: str = msg.text() |
| 47 | + room: Optional[Room] = msg.room() |
| 48 | + msg_type: MessageType = msg.type() |
| 49 | + file_box: Optional[FileBox] = None |
| 50 | + if text == 'ding': |
| 51 | + conversation: Union[ |
| 52 | + Room, Contact] = from_contact if room is None else room |
| 53 | + await conversation.ready() |
| 54 | + await conversation.say('dong') |
| 55 | + file_box = FileBox.from_url( |
| 56 | + 'https://ss3.bdstatic.com/70cFv8Sh_Q1YnxGkpoWK1HF6hhy/it/' |
| 57 | + 'u=1116676390,2305043183&fm=26&gp=0.jpg', |
| 58 | + name='ding-dong.jpg') |
| 59 | + await conversation.say(file_box) |
| 60 | + |
| 61 | + elif msg_type == MessageType.MESSAGE_TYPE_IMAGE: |
| 62 | + logger.info('receving image file') |
| 63 | + # file_box: FileBox = await msg.to_file_box() |
| 64 | + image: Image = msg.to_image() |
| 65 | + |
| 66 | + hd_file_box: FileBox = await image.hd() |
| 67 | + await hd_file_box.to_file('./hd-image.jpg', overwrite=True) |
| 68 | + |
| 69 | + thumbnail_file_box: FileBox = await image.thumbnail() |
| 70 | + await thumbnail_file_box.to_file('./thumbnail-image.jpg', overwrite=True) |
| 71 | + artwork_file_box: FileBox = await image.artwork() |
| 72 | + await artwork_file_box.to_file('./artwork-image.jpg', overwrite=True) |
| 73 | + # reply the image |
| 74 | + await msg.say(hd_file_box) |
| 75 | + |
| 76 | + # pylint: disable=C0301 |
| 77 | + elif msg_type in [MessageType.MESSAGE_TYPE_AUDIO, MessageType.MESSAGE_TYPE_ATTACHMENT, MessageType.MESSAGE_TYPE_VIDEO]: |
| 78 | + logger.info('receving file ...') |
| 79 | + file_box = await msg.to_file_box() |
| 80 | + if file_box: |
| 81 | + await file_box.to_file(file_box.name) |
| 82 | + |
| 83 | + elif msg_type == MessageType.MESSAGE_TYPE_MINI_PROGRAM: |
| 84 | + logger.info('receving mini-program ...') |
| 85 | + mini_program: Optional[MiniProgram] = await msg.to_mini_program() |
| 86 | + if mini_program: |
| 87 | + await msg.say(mini_program) |
| 88 | + |
| 89 | + elif text == 'get room members' and room: |
| 90 | + logger.info('get room members ...') |
| 91 | + room_members: List[Contact] = await room.member_list() |
| 92 | + names: List[str] = [ |
| 93 | + room_member.name for room_member in room_members] |
| 94 | + await msg.say('\n'.join(names)) |
| 95 | + |
| 96 | + elif text.startswith('remove room member:'): |
| 97 | + logger.info('remove room member:') |
| 98 | + if not room: |
| 99 | + await msg.say('this is not room zone') |
| 100 | + return |
| 101 | + |
| 102 | + room_member_name = text[len('remove room member:') + 1:] |
| 103 | + |
| 104 | + room_member: Optional[Contact] = await room.member( |
| 105 | + query=RoomMemberQueryFilter(name=room_member_name) |
| 106 | + ) |
| 107 | + if room_member: |
| 108 | + if self.login_user and self.login_user.contact_id in room.payload.admin_ids: |
| 109 | + await room.delete(room_member) |
| 110 | + else: |
| 111 | + await msg.say('登录用户不是该群管理员...') |
| 112 | + |
| 113 | + else: |
| 114 | + await msg.say(f'can not fine room member by name<{room_member_name}>') |
| 115 | + elif text.startswith('get room topic'): |
| 116 | + logger.info('get room topic') |
| 117 | + if room: |
| 118 | + topic: Optional[str] = await room.topic() |
| 119 | + if topic: |
| 120 | + await msg.say(topic) |
| 121 | + |
| 122 | + elif text.startswith('rename room topic:'): |
| 123 | + logger.info('rename room topic ...') |
| 124 | + if room: |
| 125 | + new_topic = text[len('rename room topic:') + 1:] |
| 126 | + await msg.say(new_topic) |
| 127 | + elif text.startswith('add new friend:'): |
| 128 | + logger.info('add new friendship ...') |
| 129 | + identity_info = text[len('add new friend:'):] |
| 130 | + weixin_contact: Optional[Contact] = await self.Friendship.search(weixin=identity_info) |
| 131 | + phone_contact: Optional[Contact] = await self.Friendship.search(phone=identity_info) |
| 132 | + contact: Optional[Contact] = weixin_contact or phone_contact |
| 133 | + if contact: |
| 134 | + await self.Friendship.add(contact, 'hello world ...') |
| 135 | + |
| 136 | + elif text.startswith('at me'): |
| 137 | + if room: |
| 138 | + talker = msg.talker() |
| 139 | + await room.say('hello', mention_ids=[talker.contact_id]) |
| 140 | + |
| 141 | + elif text.startswith('my alias'): |
| 142 | + talker = msg.talker() |
| 143 | + alias = await talker.alias() |
| 144 | + await msg.say('your alias is:' + (alias or '')) |
| 145 | + |
| 146 | + elif text.startswith('set alias:'): |
| 147 | + talker = msg.talker() |
| 148 | + new_alias = text[len('set alias:'):] |
| 149 | + |
| 150 | + # set your new alias |
| 151 | + alias = await talker.alias(new_alias) |
| 152 | + # get your new alias |
| 153 | + alias = await talker.alias() |
| 154 | + await msg.say('your new alias is:' + (alias or '')) |
| 155 | + |
| 156 | + elif text.startswith('find friends:'): |
| 157 | + friend_name: str = text[len('find friends:'):] |
| 158 | + friend = await self.Contact.find(friend_name) |
| 159 | + if friend: |
| 160 | + logger.info('find only one friend <%s>', friend) |
| 161 | + |
| 162 | + friends: List[Contact] = await self.Contact.find_all(friend_name) |
| 163 | + |
| 164 | + logger.info('find friend<%d>', len(friends)) |
| 165 | + logger.info(friends) |
| 166 | + |
| 167 | + else: |
| 168 | + pass |
| 169 | + |
| 170 | + if msg.type() == MessageType.MESSAGE_TYPE_UNSPECIFIED: |
| 171 | + talker = msg.talker() |
| 172 | + assert isinstance(talker, Contact) |
| 173 | + |
| 174 | + async def on_login(self, contact: Contact) -> None: |
| 175 | + """login event |
| 176 | +
|
| 177 | + Args: |
| 178 | + contact (Contact): the account logined |
| 179 | + """ |
| 180 | + logger.info('Contact<%s> has logined ...', contact) |
| 181 | + self.login_user = contact |
| 182 | + |
| 183 | + async def on_friendship(self, friendship: Friendship) -> None: |
| 184 | + """when receive a new friendship application, or accept a new friendship |
| 185 | +
|
| 186 | + Args: |
| 187 | + friendship (Friendship): contains the status and friendship info, |
| 188 | + eg: hello text, friend contact object |
| 189 | + """ |
| 190 | + MAX_ROOM_MEMBER_COUNT = 500 |
| 191 | + # 1. receive a new friendship from someone |
| 192 | + if friendship.type() == FriendshipType.FRIENDSHIP_TYPE_RECEIVE: |
| 193 | + hello_text: str = friendship.hello() |
| 194 | + |
| 195 | + # accept friendship when there is a keyword in hello text |
| 196 | + if 'wechaty' in hello_text.lower(): |
| 197 | + await friendship.accept() |
| 198 | + |
| 199 | + # 2. you have a new friend to your contact list |
| 200 | + elif friendship.type() == FriendshipType.FRIENDSHIP_TYPE_CONFIRM: |
| 201 | + # 2.1 invite the user to wechaty group |
| 202 | + # find the topic of room which contains Wechaty keyword |
| 203 | + wechaty_rooms: List[Room] = await self.Room.find_all('Wechaty') |
| 204 | + |
| 205 | + # 2.2 find the suitable room |
| 206 | + for wechaty_room in wechaty_rooms: |
| 207 | + members: List[Contact] = await wechaty_room.member_list() |
| 208 | + if len(members) < MAX_ROOM_MEMBER_COUNT: |
| 209 | + contact: Contact = friendship.contact() |
| 210 | + await wechaty_room.add(contact) |
| 211 | + break |
| 212 | + |
| 213 | + async def on_room_join(self, room: Room, invitees: List[Contact], |
| 214 | + inviter: Contact, date: datetime) -> None: |
| 215 | + """on_room_join when there are new contacts to the room |
| 216 | +
|
| 217 | + Args: |
| 218 | + room (Room): the room instance |
| 219 | + invitees (List[Contact]): the new contacts to the room |
| 220 | + inviter (Contact): the inviter who share qrcode or manual invite someone |
| 221 | + date (datetime): the datetime to join the room |
| 222 | + """ |
| 223 | + # 1. say something to welcome the new arrivals |
| 224 | + names: List[str] = [] |
| 225 | + for invitee in invitees: |
| 226 | + await invitee.ready() |
| 227 | + names.append(invitee.name) |
| 228 | + |
| 229 | + await room.say(f'welcome {",".join(names)} to the wechaty group !') |
| 230 | + |
| 231 | + |
| 232 | +async def main() -> None: |
| 233 | + """doc""" |
| 234 | + bot = MyBot() |
| 235 | + await bot.start() |
| 236 | + |
| 237 | +asyncio.run(main()) |
0 commit comments