-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhttp_server_extension.py
111 lines (89 loc) · 3.69 KB
/
http_server_extension.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import asyncio
from aiohttp import web
import json
from ten import (
AsyncExtension,
AsyncTenEnv,
Cmd,
Data,
StatusCode,
CmdResult,
)
class HTTPServerExtension(AsyncExtension):
def __init__(self, name: str):
super().__init__(name)
self.listen_addr: str = "127.0.0.1"
self.listen_port: int = 8888
self.ten_env: AsyncTenEnv = None
# http server instances
self.app = web.Application()
self.runner = None
# POST /cmd/{cmd_name}
async def handle_post_cmd(self, request):
ten_env = self.ten_env
try:
cmd_name = request.match_info.get("cmd_name")
req_json = await request.json()
input_json = json.dumps(req_json, ensure_ascii=False)
ten_env.log_debug(
f"process incoming request {request.method} {request.path} {input_json}"
)
cmd = Cmd.create(cmd_name)
cmd.set_property_from_json("", input_json)
[cmd_result, _] = await asyncio.wait_for(ten_env.send_cmd(cmd), 5.0)
# return response
status = 200 if cmd_result.get_status_code() == StatusCode.OK else 502
return web.json_response(cmd_result.get_property_to_json(""), status=status)
except json.JSONDecodeError:
return web.Response(status=400)
except asyncio.TimeoutError:
return web.Response(status=504)
except Exception as e:
ten_env.log_warn(
"failed to handle request with unknown exception, err {}".format(e)
)
return web.Response(status=500)
# POST /data/{data_name}
async def handle_post_data(self, request):
ten_env = self.ten_env
try:
data_name = request.match_info.get("data_name")
req_json = await request.json()
input_json = json.dumps(req_json, ensure_ascii=False)
ten_env.log_debug(
f"process incoming request {request.method} {request.path} {input_json}"
)
data = Data.create(data_name)
data.set_property_from_json("", input_json)
await ten_env.send_data(data)
# return response
return web.Response(status=200)
except json.JSONDecodeError:
return web.Response(status=400)
except Exception as e:
ten_env.log_warn(
"failed to handle request with unknown exception, err {}".format(e)
)
return web.Response(status=500)
async def on_start(self, async_ten_env: AsyncTenEnv):
if await async_ten_env.is_property_exist("listen_addr"):
self.listen_addr = await async_ten_env.get_property_string("listen_addr")
if await async_ten_env.is_property_exist("listen_port"):
self.listen_port = await async_ten_env.get_property_int("listen_port")
self.ten_env = async_ten_env
async_ten_env.log_info(
f"http server listening on {self.listen_addr}:{self.listen_port}"
)
self.app.router.add_post("/cmd/{cmd_name}", self.handle_post_cmd)
self.app.router.add_post("/data/{data_name}", self.handle_post_data)
self.runner = web.AppRunner(self.app)
await self.runner.setup()
site = web.TCPSite(self.runner, self.listen_addr, self.listen_port)
await site.start()
async def on_stop(self, _: AsyncTenEnv):
await self.runner.cleanup()
self.ten_env = None
async def on_cmd(self, async_ten_env: AsyncTenEnv, cmd: Cmd):
cmd_name = cmd.get_name()
async_ten_env.log_debug(f"on_cmd {cmd_name}")
async_ten_env.return_result(CmdResult.create(StatusCode.OK), cmd)