Skip to content

Commit

Permalink
Fix static member (#655)
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek authored Dec 24, 2023
1 parent ae64ed6 commit 724bb58
Showing 1 changed file with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public final class KafkaClientConnectionPool extends KafkaClientSaslHandshaker
private static final int SIGNAL_STREAM_WINDOW = 0x80000006;
private static final int SIGNAL_CONNECTION_CLEANUP = 0x80000007;
private static final int SIGNAL_NEXT_REQUEST = 0x80000008;
private static final StringBuilder CLUSTER = new StringBuilder("");

private final BeginFW beginRO = new BeginFW();
private final DataFW dataRO = new DataFW();
Expand Down Expand Up @@ -173,29 +172,29 @@ private MessageConsumer newStream(
final ProxyBeginExFW proxyBeginEx = extension.get(proxyBeginExRO::tryWrap);

MessageConsumer newStream = null;
CLUSTER.setLength(0);
final StringBuilder cluster = new StringBuilder();

if (proxyBeginEx != null)
{
final ProxyAddressInetFW inet = proxyBeginEx.address().inet();
String host = inet.destination().asString();
int port = inet.destinationPort();

CLUSTER.append(host);
CLUSTER.append(":");
CLUSTER.append(port);
cluster.append(host);
cluster.append(":");
cluster.append(port);

if (proxyBeginEx.infos() != null)
{
proxyBeginEx.infos().forEach(i ->
{
CLUSTER.append(":");
CLUSTER.append(i.authority().asString());
cluster.append(":");
cluster.append(i.authority().asString());
});
}
}

final KafkaClientConnection connection = connectionPool.computeIfAbsent(CLUSTER.toString(), s ->
final KafkaClientConnection connection = connectionPool.computeIfAbsent(cluster.toString(), s ->
newConnection(originId, routedId, authorization));
newStream = connection.newStream(msgTypeId, buffer, index, length, sender);

Expand Down

0 comments on commit 724bb58

Please sign in to comment.