-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathapp.py
132 lines (89 loc) · 3.36 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from flask import Flask,jsonify
import os
from core.models import *
from core.core import feishu_type_choice,feishu_emoji_choice
from core.config import *
from feishu.event import *
from dingding.dingding import DingDing
# 记录对话上下文
cwd = os.getcwd()
chatfile_path = os.path.join(cwd,'chatfile')
file_path = os.path.join(cwd,'file')
if not os.path.exists(chatfile_path):
os.makedirs(chatfile_path)
#print("文件夹创建成功!")
if not os.path.exists(file_path):
os.makedirs(file_path)
app = Flask(__name__)
from feishu.api import MessageApiClient
from feishu.event import EventManager
# init service
message_api_client = MessageApiClient(APP_ID, APP_SECRET, LARK_HOST)
event_manager = EventManager()
@event_manager.register("url_verification")
def request_url_verify_handler(req_data: UrlVerificationEvent):
# url verification, just need return challenge
if req_data.event.token != VERIFICATION_TOKEN:
raise Exception("VERIFICATION_TOKEN is invalid")
return jsonify({"challenge": req_data.event.challenge})
#https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/reference/im-v1/message-reaction/events/created
@event_manager.register("im.message.reaction.created_v1")
def reaction_receive_event_handler(req_data: MessageReactionCreateEvent):
message_id = req_data.event.message_id
reaction_type = req_data.event.reaction_type
emoji_type = reaction_type.emoji_type
#root_id, parent_id = get_root_and_parent_id_by_message_id(message_id)
root_id,parent_id = message_api_client.get_message(message_id)
if root_id==None:
root_id=''
if parent_id ==None:
parent_id =''
characteristic = message_id+"_"+emoji_type
#用来判断message_id,防止重放数据
message_status = is_characteristic_exist(characteristic)
if message_status:
return jsonify()
feishu_emoji_choice(root_id, parent_id,message_id,emoji_type,characteristic)
return jsonify()
@event_manager.register("im.message.receive_v1")
def message_receive_event_handler(req_data: MessageReceiveEvent):
message = req_data.event.message
message_id = message.message_id
msgcontent = message.content
try:
root_id = message.root_id
except:
root_id = ''
try:
parent_id = message.parent_id
except:
parent_id = ''
message_type=message.message_type
#用来判断message_id,防止重放数据
message_status = is_characteristic_exist(message_id)
if message_status:
return jsonify()
feishu_type_choice(message_id, root_id, parent_id, message_type, msgcontent)
return jsonify()
@event_manager.register("im.message.message_read_v1")
def message_read_event_handler(req_data: MessageReceiveEvent):
return jsonify()
@app.route("/feishu/callback", methods=["POST"])
def callback_event_handler():
# init callback instance and handle
# try:
event_handler, event = event_manager.get_handler_with_event(VERIFICATION_TOKEN, ENCRYPT_KEY)
return event_handler(event)
# except:
# return jsonify()
@app.route("/dingding/callback", methods=["GET", "POST"])
def callback():
if request.method == "GET":
return jsonify()
elif request.method == "POST":
rs = request.get_json()
dd = DingDing()
print(rs)
return jsonify()
if __name__ == "__main__":
app.run(host='0.0.0.0',port=5000)