Skip to content

Commit e0587ea

Browse files
author
Ilya Volodarsky
committed
the async thread will no longer exit if there's anything left to flush, added timeouts to requests
1 parent d5855d3 commit e0587ea

File tree

4 files changed

+59
-69
lines changed

4 files changed

+59
-69
lines changed

analytics/client.py

+41-39
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,12 @@
88
import requests
99

1010
from stats import Statistics
11-
from errors import ApiError, BatchError
11+
from errors import ApiError
1212
from utils import guess_timezone
1313

1414
import options
1515

1616

17-
1817
logging_enabled = True
1918
import logging
2019
logger = logging.getLogger('analytics')
@@ -65,29 +64,32 @@ def request(client, url, data):
6564
try:
6665

6766
response = requests.post(url, data=json.dumps(data),
68-
headers={'content-type': 'application/json'})
67+
headers={'content-type': 'application/json'}, timeout=client.timeout)
6968

7069
log('debug', 'Finished Segment.io request.')
7170

7271
package_response(client, data, response)
7372

73+
return response.status_code == 200
74+
7475
except requests.ConnectionError as e:
7576
package_exception(client, data, e)
77+
except requests.Timeout as e:
78+
package_exception(client, data, e)
79+
80+
return False
7681

7782

7883
class FlushThread(threading.Thread):
7984

80-
def __init__(self, client, url, batches):
85+
def __init__(self, client):
8186
threading.Thread.__init__(self)
8287
self.client = client
83-
self.url = url
84-
self.batches = batches
8588

8689
def run(self):
8790
log('debug', 'Flushing thread running ...')
8891

89-
for data in self.batches:
90-
request(self.client, self.url, data)
92+
self.client._sync_flush()
9193

9294
log('debug', 'Flushing thread done.')
9395

@@ -102,7 +104,8 @@ def __init__(self, secret=None,
102104
log_level=logging.INFO, log=True,
103105
flush_at=20, flush_after=datetime.timedelta(0, 10),
104106
async=True, max_queue_size=10000,
105-
stats=Statistics()):
107+
stats=Statistics(),
108+
timeout=10):
106109
"""Create a new instance of a analytics-python Client
107110
108111
:param str secret: The Segment.io API secret
@@ -117,6 +120,7 @@ def __init__(self, secret=None,
117120
: param bool async: True to have the client flush to the server on another
118121
thread, therefore not blocking code (this is the default). False to
119122
enable blocking and making the request on the calling thread.
123+
: param float timeout: Number of seconds before timing out request to Segment.io
120124
121125
"""
122126

@@ -140,6 +144,8 @@ def __init__(self, secret=None,
140144
self.flush_at = flush_at
141145
self.flush_after = flush_after
142146

147+
self.timeout = timeout
148+
143149
self.stats = stats
144150

145151
self.flush_lock = threading.Lock()
@@ -321,7 +327,7 @@ def _enqueue(self, action):
321327
submitted = True
322328

323329
else:
324-
log('warn', 'Segment.io queue is full')
330+
log('warn', 'analytics-python queue is full')
325331

326332
if self._should_flush():
327333
self.flush()
@@ -350,8 +356,6 @@ def flush(self, async=None):
350356

351357
flushing = False
352358

353-
url = options.host + options.endpoints['batch']
354-
355359
# if the async parameter is provided, it overrides the client's settings
356360
if async == None:
357361
async = self.async
@@ -363,54 +367,52 @@ def flush(self, async=None):
363367

364368
if self._flush_thread_is_free():
365369

366-
log('debug', 'Attempting asynchronous flush ...')
367-
368-
batches = self._get_batches()
369-
if len(batches) > 0:
370-
371-
self.flushing_thread = FlushThread(self,
372-
url, batches)
370+
log('debug', 'Initiating asynchronous flush ..')
373371

374-
self.flushing_thread.start()
372+
self.flushing_thread = FlushThread(self)
373+
self.flushing_thread.start()
375374

376-
flushing = True
375+
flushing = True
377376

378377
else:
379-
log('debug', 'The flushing thread is still active, ' +
380-
'cant flush right now')
378+
log('debug', 'The flushing thread is still active.')
381379
else:
382380

383381
# Flushes on this thread
384-
log('debug', 'Starting synchronous flush ...')
385-
386-
batches = self._get_batches()
387-
if len(batches) > 0:
388-
for data in batches:
389-
request(self, url, data)
390-
flushing = True
391-
392-
log('debug', 'Finished synchronous flush.')
382+
log('debug', 'Initiating synchronous flush ..')
383+
self._sync_flush()
384+
flushing = True
393385

394386
if flushing:
395387
self.last_flushed = datetime.datetime.now()
396388
self.stats.flushes += 1
397389

398390
return flushing
399391

400-
def _get_batches(self):
392+
def _sync_flush(self):
393+
394+
log('debug', 'Starting flush ..')
395+
396+
successful = 0
397+
failed = 0
401398

402-
batches = []
399+
url = options.host + options.endpoints['batch']
403400

404401
while len(self.queue) > 0:
402+
405403
batch = []
406404
for i in range(self.max_flush_size):
407405
if len(self.queue) == 0:
408406
break
407+
409408
batch.append(self.queue.pop())
410409

411-
batches.append({
412-
'batch': batch,
413-
'secret': self.secret
414-
})
410+
payload = {'batch': batch, 'secret': self.secret}
411+
412+
if request(self, url, payload):
413+
successful += len(batch)
414+
else:
415+
failed += len(batch)
415416

416-
return batches
417+
log('debug', 'Successfully flushed ' + str(successful) + ' items [' +
418+
str(failed) + ' failed].')

analytics/errors.py

-12
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,3 @@ def __repr__(self):
1111

1212
def __str__(self):
1313
return repr(self.message)
14-
15-
16-
class BatchError(Exception):
17-
18-
def __init__(self, errors):
19-
self.errors = errors
20-
21-
def __repr__(self):
22-
return self.__str__()
23-
24-
def __str__(self):
25-
return repr(self.errors)

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
setup(
1515
name='analytics-python',
16-
version='0.2.8',
16+
version='0.3.0',
1717
url='https://github.com/segmentio/analytics-python',
1818
author='Ilya Volodarsky',
1919
author_email='[email protected]',

test.py

+17-17
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def test_timezone_utils(self):
4545

4646
shouldnt_be_edited = analytics.utils.guess_timezone(utcnow)
4747

48-
self.assertTrue(utcnow == shouldnt_be_edited)
48+
self.assertEqual(utcnow, shouldnt_be_edited)
4949

5050
def test_clean(self):
5151
supported = {
@@ -67,7 +67,7 @@ def test_clean(self):
6767

6868
analytics.default_client._clean(combined)
6969

70-
self.assertTrue(combined == supported)
70+
self.assertEqual(combined, supported)
7171

7272
def test_async_basic_identify(self):
7373
# flush after every message
@@ -83,20 +83,20 @@ def test_async_basic_identify(self):
8383
"Friends": 30
8484
})
8585

86-
self.assertTrue(analytics.stats.identifies == last_identifies + 1)
86+
self.assertEqual(analytics.stats.identifies, last_identifies + 1)
8787

8888
# this should flush because we set the flush_at to 1
89-
self.assertTrue(analytics.stats.flushes == last_flushes + 1)
89+
self.assertEqual(analytics.stats.flushes, last_flushes + 1)
9090

9191
# this should do nothing, as the async thread is currently active
9292
analytics.flush()
9393

9494
# we should see no more flushes here
95-
self.assertTrue(analytics.stats.flushes == last_flushes + 1)
95+
self.assertEqual(analytics.stats.flushes, last_flushes + 1)
9696

9797
sleep(1)
9898

99-
self.assertTrue(analytics.stats.successful == last_successful + 1)
99+
self.assertEqual(analytics.stats.successful, last_successful + 1)
100100

101101
def test_async_basic_track(self):
102102

@@ -111,13 +111,13 @@ def test_async_basic_track(self):
111111
"Song": "Eleanor Rigby"
112112
})
113113

114-
self.assertTrue(analytics.stats.tracks == last_tracks + 1)
114+
self.assertEqual(analytics.stats.tracks, last_tracks + 1)
115115

116116
analytics.flush()
117117

118118
sleep(1)
119119

120-
self.assertTrue(analytics.stats.successful == last_successful + 1)
120+
self.assertEqual(analytics.stats.successful, last_successful + 1)
121121

122122
def test_async_full_identify(self):
123123

@@ -147,11 +147,11 @@ def test_async_full_identify(self):
147147
analytics.identify('[email protected]', traits,
148148
context=context, timestamp=datetime.now())
149149

150-
self.assertTrue(analytics.stats.identifies == last_identifies + 1)
150+
self.assertEqual(analytics.stats.identifies, last_identifies + 1)
151151

152152
sleep(1)
153153

154-
self.assertTrue(analytics.stats.successful == last_successful + 1)
154+
self.assertEqual(analytics.stats.successful, last_successful + 1)
155155

156156
def test_async_full_track(self):
157157

@@ -169,11 +169,11 @@ def test_async_full_track(self):
169169
analytics.track('[email protected]', 'Played a Song',
170170
properties, timestamp=datetime.now())
171171

172-
self.assertTrue(analytics.stats.tracks == last_tracks + 1)
172+
self.assertEqual(analytics.stats.tracks, last_tracks + 1)
173173

174174
sleep(1)
175175

176-
self.assertTrue(analytics.stats.successful == last_successful + 1)
176+
self.assertEqual(analytics.stats.successful, last_successful + 1)
177177

178178
def test_blocking_flush(self):
179179

@@ -191,8 +191,8 @@ def test_blocking_flush(self):
191191
analytics.track('[email protected]', 'Played a Song',
192192
properties, timestamp=datetime.today())
193193

194-
self.assertTrue(analytics.stats.tracks == last_tracks + 1)
195-
self.assertTrue(analytics.stats.successful == last_successful + 1)
194+
self.assertEqual(analytics.stats.tracks, last_tracks + 1)
195+
self.assertEqual(analytics.stats.successful, last_successful + 1)
196196

197197
def test_time_policy(self):
198198

@@ -221,7 +221,7 @@ def test_time_policy(self):
221221
})
222222

223223
# that shouldn't of triggered a flush
224-
self.assertTrue(analytics.stats.flushes == last_flushes)
224+
self.assertEqual(analytics.stats.flushes, last_flushes)
225225

226226
# sleep past the time-flush policy
227227
sleep(1.2)
@@ -232,7 +232,7 @@ def test_time_policy(self):
232232
"Song": "Eleanor Rigby"
233233
})
234234

235-
self.assertTrue(analytics.stats.flushes == last_flushes + 1)
235+
self.assertEqual(analytics.stats.flushes, last_flushes + 1)
236236

237237
def test_performance(self):
238238

@@ -243,7 +243,7 @@ def test_performance(self):
243243
analytics.default_client.async = True
244244
analytics.default_client.flush_at = 200
245245
analytics.default_client.max_flush_size = 50
246-
analytics.default_client.set_log_level(logging.WARN)
246+
analytics.default_client.set_log_level(logging.DEBUG)
247247

248248
for i in range(to_send):
249249
analytics.track('[email protected]', 'Played a Song', {

0 commit comments

Comments
 (0)