-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathneolog.py
executable file
·401 lines (382 loc) · 14.9 KB
/
neolog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
#!/usr/bin/env python
#
# neolog - read a NEO log
#
# Copyright (C) 2012-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import print_function
import argparse, bz2, gzip, errno, os, signal, sqlite3, sys, time
from bisect import insort
from itertools import chain
from logging import getLevelName
from zlib import decompress
try:
import zstd
except ImportError:
zstdcat = 'zstdcat'
else:
from cStringIO import StringIO
def zstdcat(path):
with open(path, 'rb') as f:
return StringIO(zstd.decompress(f.read()))
comp_dict = dict(bz2=bz2.BZ2File, gz=gzip.GzipFile, xz='xzcat', zst=zstdcat)
color_dict = dict(
DEBUG=34, # darkblue
INFO=32, # darkgreen
WARNING=33, # brown
ERROR=31, # darkred
CRITICAL='31;01', # red
)
class Log(object):
_log_date = _packet_date = 0
_protocol_date = None
def __init__(self, db_path, decode=0, date_format=None,
filter_from=None, show_cluster=False, no_nid=False,
node_column=True, node_list=None):
self._date_format = '%F %T' if date_format is None else date_format
self._decode = decode
self._filter_from = filter_from
self._no_nid = no_nid
self._node_column = node_column
self._node_list = node_list
self._node_dict = {}
self._show_cluster = show_cluster
name = os.path.basename(db_path)
try:
name, ext = name.rsplit(os.extsep, 1)
ZipFile = comp_dict[ext]
except (KeyError, ValueError):
# BBB: Python 2 does not support URI so we can't open in read-only
# mode. See https://bugs.python.org/issue13773
os.stat(db_path) # do not create empty DB if file is missing
self._db = sqlite3.connect(db_path)
else:
import shutil, subprocess, tempfile
with tempfile.NamedTemporaryFile() as f:
if type(ZipFile) is str:
subprocess.check_call((ZipFile, db_path), stdout=f)
else:
shutil.copyfileobj(ZipFile(db_path), f)
self._db = sqlite3.connect(f.name)
name = name.rsplit(os.extsep, 1)[0]
self._default_name = name
def __iter__(self):
q = self._db.execute
try:
q("BEGIN")
yield
date = self._filter_from
if date and max(self._log_date, self._packet_date) < date:
log_args = packet_args = date,
date = " WHERE date>=?"
else:
self._filter_from = None
log_args = self._log_date,
packet_args = self._packet_date,
date = " WHERE date>?"
old = "SELECT date, name, NULL, NULL, %s FROM %s" + date
new = ("SELECT date, name, cluster, nid, %s"
" FROM %s LEFT JOIN node ON node=id" + date)
log = 'level, pathname, lineno, msg'
pkt = 'msg_id, code, peer, body'
try:
nl = q(new % (log, 'log'), log_args)
except sqlite3.OperationalError:
nl = q(old % (log, 'log'), log_args)
np = q(old % (pkt, 'packet'), packet_args)
else:
np = q(new % (pkt, 'packet'), packet_args)
try:
nl = chain(q(old % (log, 'log1'), log_args), nl)
np = chain(q(old % (pkt, 'packet1'), packet_args), np)
except sqlite3.OperationalError:
pass
try:
p = np.next()
self._reload(p[0])
except StopIteration:
p = None
except sqlite3.DatabaseError as e:
yield time.time(), None, 'PACKET', self._exc(e)
p = None
try:
for date, name, cluster, nid, level, pathname, lineno, msg in nl:
while p and p[0] < date:
yield self._packet(*p)
try:
p = next(np, None)
except sqlite3.DatabaseError as e:
yield time.time(), None, 'PACKET', self._exc(e)
p = None
self._log_date = date
yield (date, self._node(name, cluster, nid),
getLevelName(level), msg.splitlines())
except sqlite3.DatabaseError as e:
yield time.time(), None, 'LOG', self._exc(e)
if p:
yield self._packet(*p)
try:
for p in np:
yield self._packet(*p)
except sqlite3.DatabaseError as e:
yield time.time(), None, 'PACKET', self._exc(e)
finally:
self._db.rollback()
@staticmethod
def _exc(e):
return ('%s: %s' % (type(e).__name__, e)).splitlines()
def _node(self, name, cluster, nid):
if nid and not self._no_nid:
name = self.uuid_str(nid)
if self._show_cluster:
name = cluster + '/' + name
return name
def _reload(self, date):
q = self._db.execute
date, text = q("SELECT * FROM protocol WHERE date<=?"
" ORDER BY date DESC", (date,)).next()
if self._protocol_date == date:
return
self._protocol_date = date
g = {}
exec bz2.decompress(text) in g
for x in 'uuid_str', 'Packets':
setattr(self, x, g[x])
x = {}
try:
Unpacker = g['Unpacker']
except KeyError:
unpackb = None
self.PacketMalformedError = g['PacketMalformedError']
else:
from msgpack import ExtraData, UnpackException
def unpackb(data):
u = Unpacker()
u.feed(data)
data = u.unpack()
if u.read_bytes(1):
raise ExtraData
return data
self.PacketMalformedError = UnpackException
self.unpackb = unpackb
if self._decode > 1:
try:
PStruct = g['PStruct']
except KeyError:
for p in self.Packets.itervalues():
data_path = getattr(p, 'data_path', (None,))
if p._code >> 15 == data_path[0]:
x[p._code] = data_path[1:]
else:
PBoolean = g['PBoolean']
def hasData(item):
items = item._items
for i, item in enumerate(items):
if isinstance(item, PStruct):
j = hasData(item)
if j:
return (i,) + j
elif (isinstance(item, PBoolean)
and item._name == 'compression'
and i + 2 < len(items)
and items[i+2]._name == 'data'):
return i,
for p in self.Packets.itervalues():
if p._fmt is not None:
path = hasData(p._fmt)
if path:
assert not hasattr(p, '_neolog'), p
x[p._code] = path
self._getDataPath = x.get
try:
self._next_protocol, = q("SELECT date FROM protocol WHERE date>?",
(date,)).next()
except StopIteration:
self._next_protocol = float('inf')
def _emit(self, date, name, levelname, msg_list, color=False):
if not name:
name = self._default_name
if self._node_list and name not in self._node_list:
return
prefix = self._date_format
if prefix:
d = int(date)
prefix = '%s.%04u ' % (time.strftime(prefix, time.localtime(d)),
int((date - d) * 10000))
if not color:
prefix += '%-9s ' % levelname
if self._node_column:
prefix += '%-10s ' % name
if color and levelname != 'PACKET':
x = '\x1b[%sm%s%%s\x1b[39;49;0m' % (color_dict[levelname], prefix)
for msg in msg_list:
print(x % msg)
else:
for msg in msg_list:
print(prefix + msg)
def _packet(self, date, name, cluster, nid, msg_id, code, peer, body):
self._packet_date = date
if self._next_protocol <= date:
self._reload(date)
try:
p = self.Packets[code]
msg = p.__name__
except KeyError:
msg = 'UnknownPacket[%u]' % code
body = None
msg = ['#0x%04x %-30s %s' % (msg_id, msg, peer)]
if body is not None:
log = getattr(p, '_neolog', None)
if log or self._decode:
try:
if self.unpackb:
args = self.unpackb(body)
else:
p = p()
p._body = body
args = p.decode()
except self.PacketMalformedError:
msg.append("Can't decode packet")
else:
if log:
args, extra = log(*args)
msg += extra
else:
path = self._getDataPath(code)
if path:
args = self._decompress(args, path)
if args and self._decode:
msg[0] += ' \t| ' + repr(args)
return date, self._node(name, cluster, nid), 'PACKET', msg
def _decompress(self, args, path):
if args:
args = list(args)
i = path[0]
path = path[1:]
if path:
args[i] = self._decompress(args[i], path)
else:
data = args[i+2]
if args[i]:
data = decompress(data)
args[i:i+3] = (len(data), data),
return tuple(args)
def emit_many(log_list, color=False):
log_list = [(log, iter(log).next) for log in log_list]
for x in log_list: # try to start all transactions at the same time
x[1]()
event_list = []
for log, next in log_list:
try:
event = next()
except StopIteration:
continue
event_list.append((-event[0], next, log._emit, event))
if event_list:
event_list.sort()
while True:
key, next, emit, event = event_list.pop()
try:
next_date = - event_list[-1][0]
except IndexError:
next_date = float('inf')
try:
while event[0] <= next_date:
emit(*event, color=color)
event = next()
except IOError as e:
if e.errno == errno.EPIPE:
sys.exit(1)
raise
except StopIteration:
if not event_list:
break
else:
insort(event_list, (-event[0], next, emit, event))
def main():
parser = argparse.ArgumentParser(description='NEO Log Reader')
_ = parser.add_argument
_('-a', '--all', action="store_true",
help='decode body of packets')
_('-A', '--decompress', action="store_true",
help='decompress data when decode body of packets (implies --all)')
_('-c', '--color', action="store_true",
help='color lines according to severity, remove severity column')
_('-d', '--date', metavar='FORMAT',
help='custom date format, according to strftime(3)')
_('-f', '--follow', action="store_true",
help='output appended data as the file grows')
_('-F', '--flush', action="append", type=int, metavar='PID',
help='with -f, tell process PID to flush logs approximately N'
' seconds (see -s)')
_('-n', '--node', action="append",
help='only show log entries from the given node'
' (only useful for logs produced by threaded tests),'
" special value '-' hides the column")
_('-s', '--sleep-interval', type=float, default=1., metavar='N',
help='with -f, sleep for approximately N seconds (default %(default)s)'
' between iterations')
_('--from', dest='filter_from', metavar='N',
help='show records more recent that timestamp N if N > 0, or now+N'
' if N < 0; N can also be a string that is parseable by dateutil')
_('file', nargs='+',
help='log file, compressed (gz, bz2 or xz) or not')
_ = parser.add_mutually_exclusive_group().add_argument
_('-C', '--cluster', action="store_true",
help='show cluster name in node column')
_('-N', '--no-nid', action="store_true",
help='always show node name (instead of NID) in node column')
args = parser.parse_args()
if args.sleep_interval <= 0:
parser.error("sleep_interval must be positive")
filter_from = args.filter_from
if filter_from:
try:
filter_from = float(args.filter_from)
except ValueError:
from dateutil.parser import parse
x = parse(filter_from)
if x.tzinfo:
filter_from = (x - x.fromtimestamp(0, x.tzinfo)).total_seconds()
else:
filter_from = time.mktime(x.timetuple()) + x.microsecond * 1e-6
else:
if filter_from < 0:
filter_from += time.time()
node_list = args.node or []
try:
node_list.remove('-')
node_column = False
except ValueError:
node_column = True
log_list = [Log(db_path,
2 if args.decompress else 1 if args.all else 0,
args.date, filter_from, args.cluster, args.no_nid,
node_column, node_list)
for db_path in args.file]
if args.follow:
try:
pid_list = args.flush or ()
while True:
emit_many(log_list, args.color)
for pid in pid_list:
os.kill(pid, signal.SIGRTMIN)
time.sleep(args.sleep_interval)
except KeyboardInterrupt:
pass
else:
emit_many(log_list, args.color)
if __name__ == "__main__":
main()