Skip to content

Commit 2bf09b2

Browse files
committed
fix: Enhance WeChat Work (Wecom) adapter with comprehensive media handling and message conversion
1 parent e45d2a0 commit 2bf09b2

File tree

1 file changed

+135
-70
lines changed

1 file changed

+135
-70
lines changed

plugins/im_wecom_adapter/adapter.py

Lines changed: 135 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,35 @@
1-
from typing import Any
1+
from typing import Any, Optional
22

3+
from framework.im.sender import ChatSender
34
from framework.workflow.core.dispatch.dispatcher import WorkflowDispatcher
45

56
# 兼容新旧版本的 wechatpy 导入
67
try:
7-
from wechatpy.enterprise import create_reply, parse_message
8+
from wechatpy.enterprise import parse_message
89
from wechatpy.enterprise.client import WeChatClient
910
from wechatpy.enterprise.crypto import WeChatCrypto
1011
from wechatpy.enterprise.exceptions import InvalidCorpIdException
1112
except ImportError:
1213
from wechatpy.work.crypto import WeChatCrypto
1314
from wechatpy.work.client import WeChatClient
1415
from wechatpy.work.exceptions import InvalidCorpIdException
15-
from wechatpy.work import parse_message, create_reply
16+
from wechatpy.work import parse_message
1617

1718
import asyncio
1819
import base64
20+
import os
1921
from io import BytesIO
2022

23+
import aiohttp
2124
from pydantic import BaseModel, ConfigDict, Field
2225
from quart import Quart, abort, request
2326
from wechatpy.exceptions import InvalidSignatureException
2427

2528
from framework.im.adapter import IMAdapter
26-
from framework.im.message import ImageMessage, IMMessage, TextMessage, VoiceMessage
27-
from framework.logger import get_logger
28-
29+
from framework.im.message import FileElement, ImageMessage, IMMessage, TextMessage, VideoElement, VoiceMessage
30+
from framework.logger import HypercornLoggerWrapper, get_logger
2931

32+
WECOM_TEMP_DIR = os.path.join(os.getcwd(), 'data', 'temp', 'wecom')
3033
class WecomConfig(BaseModel):
3134
"""企业微信配置
3235
文档: https://work.weixin.qq.com/api/doc/90000/90136/91770
@@ -43,6 +46,41 @@ class WecomConfig(BaseModel):
4346
model_config = ConfigDict(extra="allow")
4447

4548

49+
class WeComUtils:
50+
"""企业微信相关的工具类"""
51+
52+
def __init__(self, access_token: str):
53+
self.access_token = access_token
54+
self.logger = get_logger("WeComUtils")
55+
56+
async def download_and_save_media(self, media_id: str, file_name: str) -> Optional[str]:
57+
"""下载并保存媒体文件到本地"""
58+
file_path = os.path.join(WECOM_TEMP_DIR, file_name)
59+
try:
60+
media_data = await self.download_media(media_id)
61+
if media_data:
62+
os.makedirs(os.path.dirname(file_path), exist_ok=True)
63+
with open(file_path, "wb") as f:
64+
f.write(media_data)
65+
return file_path
66+
except Exception as e:
67+
self.logger.error(f"Failed to save media: {str(e)}")
68+
return None
69+
70+
async def download_media(self, media_id: str) -> Optional[bytes]:
71+
"""下载企业微信的媒体文件"""
72+
url = f"https://qyapi.weixin.qq.com/cgi-bin/media/get?access_token={self.access_token}&media_id={media_id}"
73+
try:
74+
async with aiohttp.ClientSession() as session:
75+
async with session.get(url) as response:
76+
if response.status == 200:
77+
return await response.read()
78+
self.logger.error(f"Failed to download media: {response.status}")
79+
except Exception as e:
80+
self.logger.error(f"Failed to download media: {str(e)}")
81+
return None
82+
83+
4684
class WecomAdapter(IMAdapter):
4785
"""企业微信适配器"""
4886

@@ -59,101 +97,128 @@ def __init__(self, config: WecomConfig):
5997
self.setup_routes()
6098

6199
def setup_routes(self):
62-
@self.app.route("/wechat", methods=["GET", "POST"])
63-
async def wechat():
100+
@self.app.get("/wechat")
101+
async def handle_check_request():
102+
"""处理 GET 请求"""
64103
signature = request.args.get("msg_signature", "")
65104
timestamp = request.args.get("timestamp", "")
66105
nonce = request.args.get("nonce", "")
67-
68-
if request.method == "GET":
69-
echo_str = request.args.get("echostr", "")
70-
try:
71-
echo_str = self.crypto.check_signature(
72-
signature, timestamp, nonce, echo_str
73-
)
74-
except InvalidSignatureException:
75-
abort(403)
76-
return echo_str
77-
else:
78-
try:
79-
msg = self.crypto.decrypt_message(
80-
await request.data, signature, timestamp, nonce
81-
)
82-
except (InvalidSignatureException, InvalidCorpIdException):
83-
abort(403)
84-
msg = parse_message(msg)
85-
if msg.type == "text":
86-
# 构造消息对象
87-
message = self.convert_to_message(msg)
88-
# 分发消息
89-
await self.dispatcher.dispatch(self, message)
90-
return "ok"
91-
else:
92-
reply = create_reply("暂不支持该类型消息", msg).render()
93-
return self.crypto.encrypt_message(reply, nonce, timestamp)
94-
95-
def convert_to_message(self, raw_message: Any) -> IMMessage:
106+
echo_str = request.args.get("echostr", "")
107+
try:
108+
echo_str = self.crypto.check_signature(
109+
signature, timestamp, nonce, echo_str
110+
)
111+
except InvalidSignatureException:
112+
abort(403)
113+
return echo_str
114+
115+
@self.app.post("/wechat")
116+
async def handle_message():
117+
"""处理 POST 请求"""
118+
signature = request.args.get("msg_signature", "")
119+
timestamp = request.args.get("timestamp", "")
120+
nonce = request.args.get("nonce", "")
121+
try:
122+
msg = self.crypto.decrypt_message(
123+
await request.data, signature, timestamp, nonce
124+
)
125+
except (InvalidSignatureException, InvalidCorpIdException):
126+
abort(403)
127+
msg = parse_message(msg)
128+
129+
# 预处理媒体消息
130+
media_path = None
131+
if msg.type in ["voice", "video", "file"]:
132+
media_id = msg.media_id
133+
file_name = f"temp_{msg.type}_{media_id}.{msg.type}"
134+
media_path = await self.wecom_utils.download_and_save_media(media_id, file_name)
135+
136+
# 转换消息
137+
message = self.convert_to_message(msg, media_path)
138+
# 分发消息
139+
await self.dispatcher.dispatch(self, message)
140+
return "ok"
141+
142+
def convert_to_message(self, raw_message: Any, media_path: Optional[str] = None) -> IMMessage:
96143
"""将企业微信消息转换为统一消息格式"""
97-
sender = raw_message.source
144+
# 企业微信应用似乎没有群聊的概念,所以这里只能用单聊
145+
sender = ChatSender.from_c2c_chat(raw_message.source, raw_message.source)
146+
98147
message_elements = []
99148
raw_message_dict = raw_message.__dict__
100149

101-
# 处理文本消息
102150
if raw_message.type == "text":
103-
text_element = TextMessage(text=raw_message.content)
104-
message_elements.append(text_element)
151+
message_elements.append(TextMessage(text=raw_message.content))
152+
elif raw_message.type == "image":
153+
message_elements.append(ImageMessage(url=raw_message.image))
154+
elif raw_message.type == "voice" and media_path:
155+
message_elements.append(VoiceMessage(url=media_path))
156+
elif raw_message.type == "video" and media_path:
157+
message_elements.append(VideoElement(file=media_path))
158+
elif raw_message.type == "file" and media_path:
159+
message_elements.append(FileElement(path=media_path))
160+
elif raw_message.type == "location":
161+
location_text = f"[Location] {raw_message.label} (X: {raw_message.location_x}, Y: {raw_message.location_y})"
162+
message_elements.append(TextMessage(text=location_text))
163+
elif raw_message.type == "link":
164+
link_text = f"[Link] {raw_message.title}: {raw_message.description} ({raw_message.url})"
165+
message_elements.append(TextMessage(text=link_text))
166+
else:
167+
message_elements.append(TextMessage(text=f"Unsupported message type: {raw_message.type}"))
105168

106169
return IMMessage(
107170
sender=sender,
108171
message_elements=message_elements,
109172
raw_message=raw_message_dict,
110173
)
111174

112-
async def send_message(self, message: IMMessage, recipient: Any):
113-
"""发送消息到企业微信"""
114-
user_id = recipient
115-
116-
for element in message.message_elements:
117-
if isinstance(element, TextMessage):
118-
await self._send_text(user_id, element.text)
119-
elif isinstance(element, ImageMessage):
120-
await self._send_image(user_id, element.url)
121-
elif isinstance(element, VoiceMessage):
122-
await self._send_voice(user_id, element.url)
123-
124175
async def _send_text(self, user_id: str, text: str):
125176
"""发送文本消息"""
126177
try:
127-
self.client.message.send_text(self.config.agent_id, user_id, text)
178+
return self.client.message.send_text(self.config.agent_id, user_id, text)
128179
except Exception as e:
129180
self.logger.error(f"Failed to send text message: {e}")
130181

131-
async def _send_image(self, user_id: str, image_data: str):
132-
"""发送图片消息"""
133-
try:
134-
image_bytes = BytesIO(base64.b64decode(image_data))
135-
media_id = self.client.media.upload("image", image_bytes)["media_id"]
136-
self.client.message.send_image(self.config.agent_id, user_id, media_id)
137-
except Exception as e:
138-
self.logger.error(f"Failed to send image message: {e}")
139-
140-
async def _send_voice(self, user_id: str, voice_data: str):
141-
"""发送语音消息"""
182+
async def _send_media(self, user_id: str, media_data: str, media_type: str):
183+
"""发送媒体消息的通用方法"""
142184
try:
143-
voice_bytes = BytesIO(base64.b64decode(voice_data))
144-
media_id = self.client.media.upload("voice", voice_bytes)["media_id"]
145-
self.client.message.send_voice(self.config.agent_id, user_id, media_id)
185+
media_bytes = BytesIO(base64.b64decode(media_data))
186+
media_id = self.client.media.upload(media_type, media_bytes)["media_id"]
187+
send_method = getattr(self.client.message, f"send_{media_type}")
188+
return send_method(self.config.agent_id, user_id, media_id)
146189
except Exception as e:
147-
self.logger.error(f"Failed to send voice message: {e}")
190+
self.logger.error(f"Failed to send {media_type} message: {e}")
148191

192+
async def send_message(self, message: IMMessage, recipient: ChatSender):
193+
"""发送消息到企业微信"""
194+
user_id = recipient.user_id
195+
res = None
196+
for element in message.message_elements:
197+
if isinstance(element, TextMessage) and element.text:
198+
res = await self._send_text(user_id, element.text)
199+
elif isinstance(element, ImageMessage) and element.url:
200+
res = await self._send_media(user_id, element.url, "image")
201+
elif isinstance(element, VoiceMessage) and element.url:
202+
res = await self._send_media(user_id, element.url, "voice")
203+
elif isinstance(element, VideoElement) and element.file:
204+
res = await self._send_media(user_id, element.file, "video")
205+
elif isinstance(element, FileElement) and element.path:
206+
res = await self._send_media(user_id, element.path, "file")
207+
if res:
208+
print(res)
149209
async def start(self):
150210
"""启动服务"""
151211
from hypercorn.asyncio import serve
152212
from hypercorn.config import Config
213+
from hypercorn.logging import Logger
153214

154215
config = Config()
155216
config.bind = [f"{self.config.host}:{self.config.port}"]
156-
config._log = get_logger("Wecom-API")
217+
# config._log = get_logger("Wecom-API")
218+
# hypercorn 的 logger 需要做转换
219+
config._log = Logger(config)
220+
config._log.access_logger = HypercornLoggerWrapper(self.logger)
221+
config._log.error_logger = HypercornLoggerWrapper(self.logger)
157222

158223
self.server_task = asyncio.create_task(serve(self.app, config))
159224

0 commit comments

Comments
 (0)