-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathtarantool_admin.py
More file actions
110 lines (83 loc) · 2.47 KB
/
tarantool_admin.py
File metadata and controls
110 lines (83 loc) · 2.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
This module provides helpers to setup running Tarantool server.
"""
import socket
import re
import yaml
from .version import parse_version
class TarantoolAdmin():
"""
Class to setup running Tarantool server.
"""
def __init__(self, host, port):
self.host = host
self.port = port
self.is_connected = False
self.socket = None
self._tnt_version = None
def connect(self):
"""
Connect to running Tarantool server.
"""
self.socket = socket.create_connection((self.host, self.port))
self.socket.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
self.is_connected = True
self.socket.recv(256) # skip greeting
def disconnect(self):
"""
Disconnect from the Tarantool server.
"""
if self.is_connected:
self.socket.close()
self.socket = None
self.is_connected = False
def reconnect(self):
"""
Reconnect to the running Tarantool server.
"""
self.disconnect()
self.connect()
def __enter__(self):
self.connect()
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
self.disconnect()
def __call__(self, command):
return self.execute(command)
def execute(self, command):
"""
Evaluate some Lua code on the Tarantool server.
"""
if not command:
return None
if not self.is_connected:
self.connect()
cmd = (command.replace('\n', ' ') + '\n').encode()
try:
self.socket.sendall(cmd)
except socket.error:
# reconnect and try again
self.reconnect()
self.socket.sendall(cmd)
bufsiz = 4096
res = ""
while True:
buf = self.socket.recv(bufsiz)
if not buf:
break
res = res + buf.decode()
if (res.rfind("\n...\n") >= 0 or res.rfind("\r\n...\r\n") >= 0):
break
return yaml.safe_load(res)
@property
def tnt_version(self):
"""
Connected Tarantool server version.
"""
if self._tnt_version is not None:
return self._tnt_version
raw_version = re.match(
r'[\d.]+', self.execute('box.info.version')[0]
).group()
self._tnt_version = parse_version(raw_version)
return self._tnt_version