-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathutil.py
137 lines (100 loc) · 3.49 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from collections import defaultdict
import struct
from threading import Thread, Event
from time import sleep, time
from kafka.common import BufferUnderflowError
def write_int_string(s):
if s is None:
return struct.pack('>i', -1)
else:
return struct.pack('>i%ds' % len(s), len(s), s)
def write_short_string(s):
if s is None:
return struct.pack('>h', -1)
else:
return struct.pack('>h%ds' % len(s), len(s), s)
def read_short_string(data, cur):
if len(data) < cur + 2:
raise BufferUnderflowError("Not enough data left")
(strlen,) = struct.unpack('>h', data[cur:cur + 2])
if strlen == -1:
return None, cur + 2
cur += 2
if len(data) < cur + strlen:
raise BufferUnderflowError("Not enough data left")
out = data[cur:cur + strlen]
return out, cur + strlen
def read_int_string(data, cur):
if len(data) < cur + 4:
raise BufferUnderflowError(
"Not enough data left to read string len (%d < %d)" %
(len(data), cur + 4))
(strlen,) = struct.unpack('>i', data[cur:cur + 4])
if strlen == -1:
return None, cur + 4
cur += 4
if len(data) < cur + strlen:
raise BufferUnderflowError("Not enough data left")
out = data[cur:cur + strlen]
return out, cur + strlen
def relative_unpack(fmt, data, cur):
size = struct.calcsize(fmt)
if len(data) < cur + size:
raise BufferUnderflowError("Not enough data left")
out = struct.unpack(fmt, data[cur:cur + size])
return out, cur + size
def group_by_topic_and_partition(tuples):
out = defaultdict(dict)
for t in tuples:
out[t.topic][t.partition] = t
return out
class ReentrantTimer(object):
"""
A timer that can be restarted, unlike threading.Timer
(although this uses threading.Timer)
t: timer interval in milliseconds
fn: a callable to invoke
args: tuple of args to be passed to function
kwargs: keyword arguments to be passed to function
"""
def __init__(self, t, fn, *args, **kwargs):
if t <= 0:
raise ValueError('Invalid timeout value')
if not callable(fn):
raise ValueError('fn must be callable')
self.thread = None
self.t = t / 1000.0
self.fn = fn
self.args = args
self.kwargs = kwargs
self.active = None
def _timer(self, active):
while not active.wait(self.t):
self.fn(*self.args, **self.kwargs)
def start(self):
if self.thread is not None:
self.stop()
self.active = Event()
self.thread = Thread(target=self._timer, args=(self.active,))
self.thread.daemon = True # So the app exits when main thread exits
self.thread.start()
def stop(self):
if self.thread is None:
return
self.active.set()
self.thread.join(self.t + 1)
self.timer = None
DEFAULT_TOPIC_CREATION_TIMEOUT_SECONDS = 30
def ensure_topic_creation(client, topic,
timeout=DEFAULT_TOPIC_CREATION_TIMEOUT_SECONDS):
if timeout is not None:
max_time = time() + timeout
while timeout is None or timeout > 0:
client.load_metadata_for_topics(topic)
if client.has_metadata_for_topic(topic):
return
if timeout is not None:
# If we have a timeout, reduce it to the appropriate value
timeout = max_time - time()
sleep(1)
raise RuntimeError("Unable to create topic %s" % topic)