-
Notifications
You must be signed in to change notification settings - Fork 151
/
Copy pathrequest.py
96 lines (78 loc) · 2.73 KB
/
request.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from datetime import date, datetime
from io import BytesIO
from gzip import GzipFile
import logging
import json
from dateutil.tz import tzutc
from requests.auth import HTTPBasicAuth
from requests import sessions
from segment.analytics.version import VERSION
from segment.analytics.utils import remove_trailing_slash
_session = sessions.Session()
def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manager=None, **kwargs):
"""Post the `kwargs` to the API"""
log = logging.getLogger('segment')
body = kwargs
if not "sentAt" in body.keys():
body["sentAt"] = datetime.now(tz=tzutc()).isoformat()
body["writeKey"] = write_key
url = remove_trailing_slash(host or 'https://api.segment.io') + '/v1/batch'
auth = None
if oauth_manager:
auth = oauth_manager.get_token()
data = json.dumps(body, cls=DatetimeSerializer)
log.debug('making request: %s', data)
headers = {
'Content-Type': 'application/json',
'User-Agent': 'analytics-python/' + VERSION
}
if auth:
headers['Authorization'] = 'Bearer {}'.format(auth)
if gzip:
headers['Content-Encoding'] = 'gzip'
buf = BytesIO()
with GzipFile(fileobj=buf, mode='w') as gz:
# 'data' was produced by json.dumps(),
# whose default encoding is utf-8.
gz.write(data.encode('utf-8'))
data = buf.getvalue()
kwargs = {
"data": data,
"headers": headers,
"timeout": timeout,
}
if proxies:
kwargs['proxies'] = {
'http': proxies,
'https': proxies,
}
try:
res = _session.post(url, **kwargs)
except Exception as e:
log.error(e)
raise e
if res.status_code == 200:
log.debug('data uploaded successfully')
return res
if oauth_manager and res.status_code in [400, 401, 403]:
oauth_manager.clear_token()
try:
payload = res.json()
log.debug('received response: %s', payload)
raise APIError(res.status_code, payload['code'], payload['message'])
except ValueError:
log.error('Unknown error: [%s] %s', res.status_code, res.reason)
raise APIError(res.status_code, 'unknown', res.text)
class APIError(Exception):
def __init__(self, status, code, message):
self.message = message
self.status = status
self.code = code
def __str__(self):
msg = "[Segment] {0}: {1} ({2})"
return msg.format(self.code, self.message, self.status)
class DatetimeSerializer(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (date, datetime)):
return obj.isoformat()
return json.JSONEncoder.default(self, obj)