-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathmitm_rainbird.py
98 lines (81 loc) · 2.75 KB
/
mitm_rainbird.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""
An mitmproxy Addon for rainbird traffic.
See https://docs.mitmproxy.org/stable/addons-overview/ for more technical details of
mitm addons.
"""
import json
import logging
import os
from typing import Optional
from mitmproxy import contentviews, flow, http
import pyrainbird
import pyrainbird.encryption
from pyrainbird import rainbird
class DecodeRainbirdView(contentviews.View):
name = "rainbird"
def __call__(
self,
data: bytes,
*,
content_type: Optional[str] = None,
flow: Optional[flow.Flow] = None,
http_message: Optional[http.Message] = None,
**unknown_metadata,
) -> contentviews.TViewResult:
logging.debug("raw %s", data)
output = ["--- Raw ---", data]
passwd = os.environ["RAINBIRD_PASSWORD"]
decrypted_data = (
pyrainbird.encryption.decrypt(data, passwd)
.decode("UTF-8")
.rstrip("\x10")
.rstrip("\x0A")
.rstrip("\x00")
.rstrip()
)
logging.debug("decrypted %s", decrypted_data)
output.extend(["--- Decrypted ---", decrypted_data])
json_body = json.loads(decrypted_data)
if "params" in json_body:
# Request
params = json_body["params"]
output.extend(["--- request params ---", str(params)])
if params_data := params.get("data"):
decoded_data = rainbird.decode(params_data)
output.extend(["--- decoded request data ---", str(decoded_data)])
elif "result" in json_body:
# Response
result = json_body["result"]
logging.info("response: %s", result)
output.extend(["--- response result ---", str(result)])
if result_data := result.get("data"):
decoded_data = rainbird.decode(result_data)
output.extend(["--- decoded response data ---", str(decoded_data)])
def result():
for row in output:
logging.info("yielding: %s", row)
yield [("text", row)]
return "rainbird", result()
def render_priority(
self,
data: bytes,
*,
content_type: Optional[str] = None,
flow: Optional[flow.Flow] = None,
http_message: Optional[http.Message] = None,
**unknown_metadata,
) -> float:
if not content_type or not http_message:
return 0
if content_type != "application/octet-stream":
return 0
assert isinstance(flow, http.HTTPFlow)
if flow.request:
if "/stick" in flow.request.path:
return 1
return 0
view = DecodeRainbirdView()
def load(_l):
contentviews.add(view)
def done():
contentviews.remove(view)