Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23856 Ignite Thin Client channels unwanted duplication #11855

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -38,7 +40,6 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.client.ClientAuthenticationException;
Expand Down Expand Up @@ -535,8 +536,11 @@ private void onChannelFailure(
) {
log.warning("Channel failure [channel=" + ch + ", err=" + t.getMessage() + ']', t);

if (ch != null && ch == hld.ch)
if (ch != null && ch == hld.ch) {
hld.closeChannel();
if (channelsCnt.get() == 0 && attemptsLimit == 1 && partitionAwarenessEnabled)
attemptsLimit = clientCfg.getRetryLimit() > 0 ? Math.min(clientCfg.getRetryLimit(), 2) : 2;
}

chFailLsnrs.forEach(Runnable::run);

Expand Down Expand Up @@ -625,90 +629,65 @@ synchronized void initChannelHolders() {

Collection<List<InetSocketAddress>> newAddrs = discoveryCtx.getEndpoints();

if (newAddrs == null) {
if (newAddrs == null || newAddrs.isEmpty()) {
finishChannelsReInit = System.currentTimeMillis();

return;
}

// Add connected channels to the list to avoid unnecessary reconnects, unless address finder is used.
if (holders != null && clientCfg.getAddressesFinder() == null) {
// Do not modify the original list.
newAddrs = new ArrayList<>(newAddrs);

for (ClientChannelHolder h : holders) {
ClientChannel ch = h.ch;

if (ch != null && !ch.closed())
newAddrs.add(h.getAddresses());
}
}

Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();

Set<InetSocketAddress> newAddrsSet = newAddrs.stream().flatMap(Collection::stream).collect(Collectors.toSet());

// Close obsolete holders or map old but valid addresses to holders
if (holders != null) {
for (ClientChannelHolder h : holders) {
boolean found = false;

for (InetSocketAddress addr : h.getAddresses()) {
// If new endpoints contain at least one of channel addresses, don't close this channel.
if (newAddrsSet.contains(addr)) {
ClientChannelHolder oldHld = curAddrs.putIfAbsent(addr, h);

if (oldHld == null || oldHld == h) // If not duplicate.
found = true;
}
}

if (!found)
h.close();
}
}
// Add connected channels to the list to avoid unnecessary reconnects, unless address finder is used.
if (holders != null && clientCfg.getAddressesFinder() == null)
for (ClientChannelHolder h : holders)
h.getAddresses().forEach(addr -> curAddrs.putIfAbsent(addr, h));

List<ClientChannelHolder> reinitHolders = new ArrayList<>();

int idx = curChIdx;

// The variable holds a new index of default channel after topology change.
// Suppose that reuse of the channel is better than open new connection.
int dfltChannelIdx = -1;
ClientChannelHolder currDfltHolder = (idx != -1 && holders != null) ? holders.get(idx) : null;

ClientChannelHolder currDfltHolder = null;
// Process new addresses merging with existing holders.
for (List<InetSocketAddress> addrs : newAddrs) {
// Try to find already created channel holder. If not found, create the new one.
ClientChannelHolder hld = addrs.stream()
.map(curAddrs::get)
.filter(Objects::nonNull)
.findFirst()
.orElseGet(() -> new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addrs)));

int idx = curChIdx;
// Expanding the list of addresses by adding new addresses to the existing holder.
Set<InetSocketAddress> existingAddrs = new HashSet<>(hld.getAddresses());
existingAddrs.addAll(addrs);

if (idx != -1)
currDfltHolder = holders.get(idx);
List<InetSocketAddress> updatedAddrs = new ArrayList<>(existingAddrs);

for (List<InetSocketAddress> addrs : newAddrs) {
ClientChannelHolder hld = null;
if (hld.getAddresses().size() != updatedAddrs.size())
hld.setConfiguration(new ClientChannelConfiguration(clientCfg, updatedAddrs));

// Try to find already created channel holder.
for (InetSocketAddress addr : addrs) {
hld = curAddrs.get(addr);
reinitHolders.add(hld);

if (hld != null) {
if (!hld.getAddresses().equals(addrs)) // Enrich holder addresses.
hld.setConfiguration(new ClientChannelConfiguration(clientCfg, addrs));
updatedAddrs.forEach(addr -> curAddrs.putIfAbsent(addr, hld));
}

break;
}
}
// Add old holders that were not processed via newAddrs.
if (holders != null) {
Set<ClientChannelHolder> existingInReinit = new HashSet<>(reinitHolders);

if (hld == null) { // If not found, create the new one.
hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addrs));
for (ClientChannelHolder h : holders) {
if (!existingInReinit.contains(h)) {
reinitHolders.add(h);

for (InetSocketAddress addr : addrs)
curAddrs.putIfAbsent(addr, hld);
existingInReinit.add(h);
}
}

reinitHolders.add(hld);

if (hld == currDfltHolder)
dfltChannelIdx = reinitHolders.size() - 1;
}

int dfltChannelIdx = reinitHolders.indexOf(currDfltHolder);

if (dfltChannelIdx == -1) {
// If holder is not specified get the random holder from the range of holders with the same port.
reinitHolders.sort(Comparator.comparingInt(h -> F.first(h.getAddresses()).getPort()));
Expand Down