-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcyredis.pyx
81 lines (72 loc) · 2.9 KB
/
cyredis.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import sys
import gevent
from gevent.lock import BoundedSemaphore
import six
from cywrap import CommandError, ConnectionError, Connection, ConnectionLimitExceeded
import sys
import gevent
from gevent.lock import BoundedSemaphore
import six
from cywrap import CommandError, ConnectionError, Connection, ConnectionLimitExceeded
cdef class ConnectionPool(object):
cdef _max_connections
cdef _connection_class
cdef available_connections
cdef lock
cdef created_connections
def __init__(self, max_connections=10, connection_class=Connection):
self._max_connections = max_connections
self._connection_class = connection_class
self._reset()
cdef _reset(self):
self.available_connections = []
self.lock = BoundedSemaphore()
self.created_connections = 0
# def _get_connection(self):
# try:
# with self.lock:
# connection = self.available_connections.pop()
# self.in_use_connections.add(connection)
# except IndexError:
# with self.lock:
# if self.created_connections < self._max_connections:
# # new connections need to be created
# connection = self._connection_class()
# self.created_connections += 1
# self.in_use_connections.add(connection)
# else:
# raise ConnectionError("max connection limit exhausted, cannot create connection")
# return connection
cdef _get_connection(self):
with self.lock:
if self.available_connections:
connection = self.available_connections.pop()
elif self.created_connections < self._max_connections:
connection = self._connection_class()
self.created_connections += 1
else:
raise ConnectionLimitExceeded("max connection limit exhausted, cannot create connection")
return connection
cdef _release(self, connection):
with self.lock:
self.available_connections.append(connection)
def call(self, command):
connection = None
try:
connection = self._get_connection()
retval = connection.call(command)
self._release(connection)
return retval
except ConnectionError:
self._reset()
connection = self._get_connection()
try:
retval = connection.call(command)
self._release(connection)
return retval
except CommandError as e:
self._release(connection)
six.reraise(CommandError, CommandError(e.args[0]), sys.exc_info()[2])
except CommandError as e:
self._release(connection)
six.reraise(CommandError, CommandError(e.args[0]), sys.exc_info()[2])