Skip to content

Commit

Permalink
OffsetFetch Request should connect to the coordinator instead of a ra…
Browse files Browse the repository at this point in the history
…ndom member of cluster (#654)
  • Loading branch information
akrambek authored Dec 24, 2023
1 parent 4ff723b commit ae64ed6
Show file tree
Hide file tree
Showing 43 changed files with 244 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ final class KafkaCacheClientConsumerFan
private long replySeq;
private long replyAck;
private int replyMax;
private String host;
private int port;


private KafkaCacheClientConsumerFan(
Expand Down Expand Up @@ -720,6 +722,14 @@ private void onConsumerFanReplyBegin(
BeginFW begin)
{
final long traceId = begin.traceId();
final OctetsFW extension = begin.extension();

final ExtensionFW beginEx = extensionRO.tryWrap(extension.buffer(), extension.offset(), extension.limit());
final KafkaBeginExFW kafkaBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null;
final KafkaConsumerBeginExFW kafkaConsumerBeginEx = kafkaBeginEx != null ? kafkaBeginEx.consumer() : null;

host = kafkaConsumerBeginEx.host().asString();
port = kafkaConsumerBeginEx.port();

state = KafkaState.openingReply(state);

Expand Down Expand Up @@ -1029,7 +1039,16 @@ private void doConsumerReplyBegin(
state = KafkaState.openingReply(state);

doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, EMPTY_EXTENSION);
traceId, authorization, affinity, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.consumer(c -> c
.groupId(fan.groupId)
.consumerId(fan.consumerId)
.host(fan.host)
.port(fan.port)
.timeout(fan.timeout)
.topic(fan.topic))
.build().sizeof()));
}

private void doConsumerReplyDataIfNecessary(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,6 @@ private void onOffsetFetchInitialBegin(
final long sequence = begin.sequence();
final long acknowledge = begin.acknowledge();
final long traceId = begin.traceId();
final long authorization = begin.authorization();
final long affinity = begin.affinity();
final OctetsFW extension = begin.extension();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ final class KafkaCacheServerConsumerFanout
private String leaderId;
private String memberId;
private String instanceId;
private String host;
private int port;
private int timeout;
private int generationId;

Expand Down Expand Up @@ -846,6 +848,8 @@ private void onConsumerReplyBegin(
final KafkaGroupBeginExFW kafkaGroupBeginEx = kafkaBeginEx != null ? kafkaBeginEx.group() : null;

instanceId = kafkaGroupBeginEx.instanceId().asString();
host = kafkaGroupBeginEx.host().asString();
port = kafkaGroupBeginEx.port();

state = KafkaState.openedReply(state);

Expand Down Expand Up @@ -1353,7 +1357,16 @@ private void doConsumerReplyBegin(
state = KafkaState.openingReply(state);

doBegin(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
traceId, authorization, affinity, EMPTY_OCTETS);
traceId, authorization, affinity, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.consumer(c -> c
.groupId(fanout.groupId)
.consumerId(fanout.consumerId)
.host(fanout.host)
.port(fanout.port)
.timeout(fanout.timeout)
.topic(topic))
.build().sizeof()));
}

private void doConsumerReplyData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.ExtensionFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.FlushFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaDataExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaFlushExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaGroupBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaGroupFlushExFW;
Expand Down Expand Up @@ -172,7 +171,6 @@ public final class KafkaClientGroupFactory extends KafkaClientSaslHandshaker imp
private final ResetFW.Builder resetRW = new ResetFW.Builder();
private final WindowFW.Builder windowRW = new WindowFW.Builder();
private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
private final KafkaDataExFW.Builder kafkaDataExRW = new KafkaDataExFW.Builder();
private final KafkaFlushExFW.Builder kafkaFlushExRW = new KafkaFlushExFW.Builder();
private final KafkaResetExFW.Builder kafkaResetExRW = new KafkaResetExFW.Builder();
private final ProxyBeginExFW.Builder proxyBeginExRW = new ProxyBeginExFW.Builder();
Expand Down Expand Up @@ -1405,7 +1403,7 @@ private void onStreamFlush(
final long sequence = flush.sequence();
final long acknowledge = flush.acknowledge();
final long traceId = flush.traceId();
final long authorizationId = flush.authorization();
final long budgetId = flush.budgetId();
final int reserved = flush.reserved();
final OctetsFW extension = flush.extension();

Expand Down Expand Up @@ -1440,7 +1438,14 @@ private void onStreamFlush(
}
});

coordinatorClient.doJoinGroupRequest(traceId);
if (host != null)
{
coordinatorClient.doJoinGroupRequest(traceId);
}
else
{
clusterClient.doEncodeRequestIfNecessary(traceId, budgetId);
}
}
else
{
Expand Down Expand Up @@ -1525,6 +1530,8 @@ private void doStreamBegin(
.groupId(groupId)
.protocol(protocol)
.instanceId(groupMembership.instanceId)
.host(host)
.port(port)
.timeout(timeout))
.build();

Expand Down Expand Up @@ -2709,6 +2716,7 @@ private void doNetworkBegin(

final KafkaClientRoute clientRoute = supplyClientRoute.apply(routedId);
final KafkaBrokerInfo broker = clientRoute.brokers.get(Long.parseLong(delegate.nodeId));

if (broker != null)
{
extension = e -> e.set((b, o, l) -> proxyBeginExRW.wrap(b, o, l)
Expand Down Expand Up @@ -4008,7 +4016,7 @@ private void doJoinGroupRequest(
encoders.add(encodeJoinGroupRequest);
signaler.signalNow(originId, routedId, initialId, traceId, SIGNAL_NEXT_REQUEST, 0);
}
else
else if (delegate.host != null)
{
delegate.doStreamBeginIfNecessary(traceId, authorization);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.stream;

import static io.aklivity.zilla.runtime.binding.kafka.internal.types.ProxyAddressProtocol.STREAM;
import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -53,6 +54,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaDataExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaOffsetFetchBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaResetExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.ProxyBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.ResetFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.SignalFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.WindowFW;
Expand Down Expand Up @@ -93,7 +95,7 @@ public final class KafkaClientOffsetFetchFactory extends KafkaClientSaslHandshak
private final AbortFW.Builder abortRW = new AbortFW.Builder();
private final ResetFW.Builder resetRW = new ResetFW.Builder();
private final WindowFW.Builder windowRW = new WindowFW.Builder();
private final KafkaBeginExFW.Builder kafkaBeginExRW = new KafkaBeginExFW.Builder();
private final ProxyBeginExFW.Builder proxyBeginExRW = new ProxyBeginExFW.Builder();
private final KafkaDataExFW.Builder kafkaDataExRW = new KafkaDataExFW.Builder();
private final KafkaResetExFW.Builder kafkaResetExRW = new KafkaResetExFW.Builder();

Expand Down Expand Up @@ -127,6 +129,7 @@ public final class KafkaClientOffsetFetchFactory extends KafkaClientSaslHandshak
private final KafkaOffsetFetchClientDecoder decodeReject = this::decodeReject;

private final int kafkaTypeId;
private final int proxyTypeId;
private final MutableDirectBuffer writeBuffer;
private final MutableDirectBuffer extBuffer;
private final BufferPool decodePool;
Expand All @@ -142,6 +145,7 @@ public KafkaClientOffsetFetchFactory(
{
super(config, context);
this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME);
this.proxyTypeId = context.supplyTypeId("proxy");
this.signaler = context.signaler();
this.streamFactory = context.streamFactory();
this.writeBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
Expand Down Expand Up @@ -173,6 +177,8 @@ public MessageConsumer newStream(
assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_OFFSET_FETCH;
final KafkaOffsetFetchBeginExFW kafkaOffsetFetchBeginEx = kafkaBeginEx.offsetFetch();
final String groupId = kafkaOffsetFetchBeginEx.groupId().asString();
final String host = kafkaOffsetFetchBeginEx.host().asString();
final int port = kafkaOffsetFetchBeginEx.port();
final String topic = kafkaOffsetFetchBeginEx.topic().asString();
IntHashSet partitions = new IntHashSet();
kafkaOffsetFetchBeginEx.partitions().forEach(p -> partitions.add(p.partitionId()));
Expand All @@ -196,6 +202,8 @@ public MessageConsumer newStream(
affinity,
resolvedId,
groupId,
host,
port,
topic,
partitions,
sasl)::onApplication;
Expand Down Expand Up @@ -757,6 +765,8 @@ private final class KafkaOffsetFetchStream
long affinity,
long resolvedId,
String groupId,
String host,
int port,
String topic,
IntHashSet partitions,
KafkaSaslConfig sasl)
Expand All @@ -767,7 +777,8 @@ private final class KafkaOffsetFetchStream
this.initialId = initialId;
this.replyId = supplyReplyId.applyAsLong(initialId);
this.affinity = affinity;
this.client = new KafkaOffsetFetchClient(this, routedId, resolvedId, groupId, topic, partitions, sasl);
this.client = new KafkaOffsetFetchClient(this, routedId, resolvedId, groupId, host, port,
topic, partitions, sasl);
}

private void onApplication(
Expand Down Expand Up @@ -1020,6 +1031,8 @@ private final class KafkaOffsetFetchClient extends KafkaSaslClient

private final KafkaOffsetFetchStream delegate;
private final String groupId;
private final String host;
private final int port;
private final String topic;
private final IntHashSet partitions;
private final ObjectHashSet<KafkaPartitionOffset> topicPartitions;
Expand Down Expand Up @@ -1060,13 +1073,17 @@ private final class KafkaOffsetFetchClient extends KafkaSaslClient
long originId,
long routedId,
String groupId,
String host,
int port,
String topic,
IntHashSet partitions,
KafkaSaslConfig sasl)
{
super(sasl, originId, routedId);
this.delegate = delegate;
this.groupId = requireNonNull(groupId);
this.host = host;
this.port = port;
this.topic = topic;
this.partitions = partitions;
this.topicPartitions = new ObjectHashSet<>();
Expand Down Expand Up @@ -1274,8 +1291,19 @@ private void doNetworkBegin(
{
state = KafkaState.openingInitial(state);

Consumer<OctetsFW.Builder> extension = e -> e.set((b, o, l) -> proxyBeginExRW.wrap(b, o, l)
.typeId(proxyTypeId)
.address(a -> a.inet(i -> i.protocol(p -> p.set(STREAM))
.source("0.0.0.0")
.destination(host)
.sourcePort(0)
.destinationPort(port)))
.infos(i -> i.item(ii -> ii.authority(host)))
.build()
.sizeof());

network = newStream(this::onNetwork, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, affinity, EMPTY_EXTENSION);
traceId, authorization, affinity, extension);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.FlushFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConsumerAssignmentFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConsumerBeginExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConsumerDataExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConsumerFlushExFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaDataExFW;
Expand Down Expand Up @@ -2776,6 +2777,8 @@ private final class KafkaUnmergedConsumerStream
private long replySeq;
private long replyAck;
private int replyMax;
private String host;
private int port;

private KafkaUnmergedConsumerStream(
KafkaMergedStream merged)
Expand Down Expand Up @@ -2926,9 +2929,17 @@ private void onConsumerReplyBegin(
BeginFW begin)
{
final long traceId = begin.traceId();
final OctetsFW extension = begin.extension();

state = KafkaState.openingReply(state);

final ExtensionFW beginEx = extensionRO.tryWrap(extension.buffer(), extension.offset(), extension.limit());
final KafkaBeginExFW kafkaBeginEx = beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::wrap) : null;
final KafkaConsumerBeginExFW kafkaConsumerBeginEx = kafkaBeginEx != null ? kafkaBeginEx.consumer() : null;

host = kafkaConsumerBeginEx.host().asString();
port = kafkaConsumerBeginEx.port();

doConsumerReplyWindow(traceId, 0, 8192);
}

Expand Down Expand Up @@ -3148,6 +3159,8 @@ private void doOffsetFetchInitialBegin(
.typeId(kafkaTypeId)
.offsetFetch(c -> c
.groupId(merged.groupId)
.host(merged.consumerStream.host)
.port(merged.consumerStream.port)
.topic(merged.topic)
.partitions(p -> merged.leadersByAssignedId.forEach((k, v) ->
p.item(tp -> tp.partitionId(k))))
Expand Down
Loading

0 comments on commit ae64ed6

Please sign in to comment.