Skip to content

Commit d735957

Browse files
sylwiaszunejkofruch
authored andcommitted
Reresolve DNS as fallback when all hosts are unreachable
If all nodes in the cluster change their IPs at one time, driver used to no longer be able to ever contact the cluster; the only solution was to restart the driver. A fallback is added to the control connection logic so that when no known host is reachable, Cluster one again resolves all the known hostnames and ControlConnection tries to connect them.
1 parent 69e0100 commit d735957

File tree

1 file changed

+69
-47
lines changed

1 file changed

+69
-47
lines changed

cassandra/cluster.py

Lines changed: 69 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,30 +1220,7 @@ def __init__(self,
12201220
self.endpoint_factory = endpoint_factory or DefaultEndPointFactory(port=self.port)
12211221
self.endpoint_factory.configure(self)
12221222

1223-
raw_contact_points = []
1224-
for cp in [cp for cp in self.contact_points if not isinstance(cp, EndPoint)]:
1225-
raw_contact_points.append(cp if isinstance(cp, tuple) else (cp, port))
1226-
1227-
self.endpoints_resolved = [cp for cp in self.contact_points if isinstance(cp, EndPoint)]
1228-
self._endpoint_map_for_insights = {repr(ep): '{ip}:{port}'.format(ip=ep.address, port=ep.port)
1229-
for ep in self.endpoints_resolved}
1230-
1231-
strs_resolved_map = _resolve_contact_points_to_string_map(raw_contact_points)
1232-
self.endpoints_resolved.extend(list(chain(
1233-
*[
1234-
[DefaultEndPoint(ip, port) for ip, port in xs if ip is not None]
1235-
for xs in strs_resolved_map.values() if xs is not None
1236-
]
1237-
)))
1238-
1239-
self._endpoint_map_for_insights.update(
1240-
{key: ['{ip}:{port}'.format(ip=ip, port=port) for ip, port in value]
1241-
for key, value in strs_resolved_map.items() if value is not None}
1242-
)
1243-
1244-
if contact_points and (not self.endpoints_resolved):
1245-
# only want to raise here if the user specified CPs but resolution failed
1246-
raise UnresolvableContactPoints(self._endpoint_map_for_insights)
1223+
self._resolve_hostnames()
12471224

12481225
self.compression = compression
12491226

@@ -1427,6 +1404,31 @@ def __init__(self,
14271404
if application_version is not None:
14281405
self.application_version = application_version
14291406

1407+
def _resolve_hostnames(self):
1408+
raw_contact_points = []
1409+
for cp in [cp for cp in self.contact_points if not isinstance(cp, EndPoint)]:
1410+
raw_contact_points.append(cp if isinstance(cp, tuple) else (cp, self.port))
1411+
1412+
self.endpoints_resolved = [cp for cp in self.contact_points if isinstance(cp, EndPoint)]
1413+
self._endpoint_map_for_insights = {repr(ep): '{ip}:{port}'.format(ip=ep.address, port=ep.port)
1414+
for ep in self.endpoints_resolved}
1415+
strs_resolved_map = _resolve_contact_points_to_string_map(raw_contact_points)
1416+
self.endpoints_resolved.extend(list(chain(
1417+
*[
1418+
[DefaultEndPoint(ip, port) for ip, port in xs if ip is not None]
1419+
for xs in strs_resolved_map.values() if xs is not None
1420+
]
1421+
)))
1422+
1423+
self._endpoint_map_for_insights.update(
1424+
{key: ['{ip}:{port}'.format(ip=ip, port=port) for ip, port in value]
1425+
for key, value in strs_resolved_map.items() if value is not None}
1426+
)
1427+
1428+
if self.contact_points and (not self.endpoints_resolved):
1429+
# only want to raise here if the user specified CPs but resolution failed
1430+
raise UnresolvableContactPoints(self._endpoint_map_for_insights)
1431+
14301432
def _create_thread_pool_executor(self, **kwargs):
14311433
"""
14321434
Create a ThreadPoolExecutor for the cluster. In most cases, the built-in
@@ -1720,6 +1722,20 @@ def protocol_downgrade(self, host_endpoint, previous_version):
17201722
"http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_endpoint)
17211723
self.protocol_version = new_version
17221724

1725+
def _add_resolved_hosts(self):
1726+
for endpoint in self.endpoints_resolved:
1727+
host, new = self.add_host(endpoint, signal=False)
1728+
if new:
1729+
host.set_up()
1730+
for listener in self.listeners:
1731+
listener.on_add(host)
1732+
1733+
self.profile_manager.populate(
1734+
weakref.proxy(self), self.metadata.all_hosts())
1735+
self.load_balancing_policy.populate(
1736+
weakref.proxy(self), self.metadata.all_hosts()
1737+
)
1738+
17231739
def connect(self, keyspace=None, wait_for_all_pools=False):
17241740
"""
17251741
Creates and returns a new :class:`~.Session` object.
@@ -1740,18 +1756,8 @@ def connect(self, keyspace=None, wait_for_all_pools=False):
17401756
self.contact_points, self.protocol_version)
17411757
self.connection_class.initialize_reactor()
17421758
_register_cluster_shutdown(self)
1743-
for endpoint in self.endpoints_resolved:
1744-
host, new = self.add_host(endpoint, signal=False)
1745-
if new:
1746-
host.set_up()
1747-
for listener in self.listeners:
1748-
listener.on_add(host)
1749-
1750-
self.profile_manager.populate(
1751-
weakref.proxy(self), self.metadata.all_hosts())
1752-
self.load_balancing_policy.populate(
1753-
weakref.proxy(self), self.metadata.all_hosts()
1754-
)
1759+
1760+
self._add_resolved_hosts()
17551761

17561762
try:
17571763
self.control_connection.connect()
@@ -3585,16 +3591,8 @@ def _set_new_connection(self, conn):
35853591
if old:
35863592
log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn)
35873593
old.close()
3588-
3589-
def _reconnect_internal(self):
3590-
"""
3591-
Tries to connect to each host in the query plan until one succeeds
3592-
or every attempt fails. If successful, a new Connection will be
3593-
returned. Otherwise, :exc:`NoHostAvailable` will be raised
3594-
with an "errors" arg that is a dict mapping host addresses
3595-
to the exception that was raised when an attempt was made to open
3596-
a connection to that host.
3597-
"""
3594+
3595+
def _connect_host_in_lbp(self):
35983596
errors = {}
35993597
lbp = (
36003598
self._cluster.load_balancing_policy
@@ -3604,7 +3602,7 @@ def _reconnect_internal(self):
36043602

36053603
for host in lbp.make_query_plan():
36063604
try:
3607-
return self._try_connect(host)
3605+
return (self._try_connect(host), None)
36083606
except ConnectionException as exc:
36093607
errors[str(host.endpoint)] = exc
36103608
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
@@ -3614,7 +3612,31 @@ def _reconnect_internal(self):
36143612
log.warning("[control connection] Error connecting to %s:", host, exc_info=True)
36153613
if self._is_shutdown:
36163614
raise DriverException("[control connection] Reconnection in progress during shutdown")
3615+
3616+
return (None, errors)
36173617

3618+
def _reconnect_internal(self):
3619+
"""
3620+
Tries to connect to each host in the query plan until one succeeds
3621+
or every attempt fails. If successful, a new Connection will be
3622+
returned. Otherwise, :exc:`NoHostAvailable` will be raised
3623+
with an "errors" arg that is a dict mapping host addresses
3624+
to the exception that was raised when an attempt was made to open
3625+
a connection to that host.
3626+
"""
3627+
(conn, _) = self._connect_host_in_lbp()
3628+
if conn is not None:
3629+
return conn
3630+
3631+
# Try to re-resolve hostnames as a fallback when all hosts are unreachable
3632+
self._cluster._resolve_hostnames()
3633+
3634+
self._cluster._add_resolved_hosts()
3635+
3636+
(conn, errors) = self._connect_host_in_lbp()
3637+
if conn is not None:
3638+
return conn
3639+
36183640
raise NoHostAvailable("Unable to connect to any servers", errors)
36193641

36203642
def _try_connect(self, host):

0 commit comments

Comments
 (0)