Skip to content

Commit

Permalink
addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitk-me committed Feb 4, 2025
1 parent 3d7ba71 commit b4eab8b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 59 deletions.
2 changes: 1 addition & 1 deletion runtime/binding-grpc-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</licenses>

<properties>
<jacoco.coverage.ratio>0.87</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.88</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ private void onGrpcEnd(

assert initialAck <= initialSeq;

producer.doKafkaEnd(traceId, authorization, true);
producer.doKafkaEnd(traceId, authorization);
}

private void onGrpcAbort(
Expand Down Expand Up @@ -1470,7 +1470,7 @@ protected void onKafkaEnd(
{
if (GrpcKafkaState.initialClosed(state))
{
producer.doKafkaEnd(traceId, authorization, true);
producer.doKafkaEnd(traceId, authorization);
correlater.doKafkaEnd(traceId, authorization);
}
}
Expand Down Expand Up @@ -1651,7 +1651,7 @@ private void doGrpcReset(

private final class GrpcProduceNoReplyProxy extends GrpcProxy
{
private final KafkaProduceProxy delegate;
private final KafkaProduceNoReplyProxy delegate;
private final long resolvedId;

private GrpcProduceNoReplyProxy(
Expand All @@ -1664,7 +1664,7 @@ private GrpcProduceNoReplyProxy(
{
super(grpc, originId, routedId, initialId);
this.resolvedId = resolvedId;
this.delegate = new KafkaProduceProxy(routedId, resolvedId, this, result);
this.delegate = new KafkaProduceNoReplyProxy(routedId, resolvedId, this, result);
}

private void onGrpcMessage(
Expand Down Expand Up @@ -1785,7 +1785,7 @@ private void onGrpcEnd(

assert initialAck <= initialSeq;

delegate.doKafkaEnd(traceId, authorization, false);
delegate.doKafkaEnd(traceId, authorization);

if (!GrpcKafkaState.replyOpening(state))
{
Expand Down Expand Up @@ -1903,26 +1903,14 @@ protected void onKafkaBegin(
}
}

@Override
protected void onKafkaData(
long traceId,
long authorization,
long budgetId,
int reserved,
int flags,
OctetsFW payload,
KafkaDataExFW kafkaDataEx)
{
}

@Override
protected void onKafkaEnd(
long traceId,
long authorization)
{
if (GrpcKafkaState.initialClosed(state))
{
delegate.doKafkaEnd(traceId, authorization, false);
delegate.doKafkaEnd(traceId, authorization);
}
}

Expand Down Expand Up @@ -2045,31 +2033,31 @@ private void doGrpcReset(
}
}

private final class KafkaProduceProxy
private class KafkaProduceProxy
{
private MessageConsumer kafka;
private final long originId;
private final long routedId;
private final long initialId;
private final long replyId;
private final GrpcKafkaWithProduceResult result;
private final GrpcProxy delegate;
protected MessageConsumer kafka;
protected final long originId;
protected final long routedId;
protected final long initialId;
protected final long replyId;
protected final GrpcKafkaWithProduceResult result;
protected final GrpcProxy delegate;

private int state;
protected int state;

private long initialSeq;
private long initialAck;
private int initialMax;
private long initialBud;
protected long initialSeq;
protected long initialAck;
protected int initialMax;
protected long initialBud;

private long replySeq;
private long replyAck;
private int replyMax;
private long replyBud;
private int replyPad;
private int replyCap;
protected long replySeq;
protected long replyAck;
protected int replyMax;
protected long replyBud;
protected int replyPad;
protected int replyCap;

private KafkaProduceProxy(
protected KafkaProduceProxy(
long originId,
long routedId,
GrpcProxy delegate,
Expand All @@ -2083,7 +2071,7 @@ private KafkaProduceProxy(
this.replyId = supplyReplyId.applyAsLong(initialId);
}

private void doKafkaBegin(
protected void doKafkaBegin(
long traceId,
long authorization,
long affinity)
Expand All @@ -2097,7 +2085,7 @@ private void doKafkaBegin(
traceId, authorization, affinity, result);
}

private void doKafkaData(
protected void doKafkaData(
long traceId,
long authorization,
long budgetId,
Expand All @@ -2114,10 +2102,9 @@ private void doKafkaData(
assert initialSeq <= initialAck + initialMax;
}

private void doKafkaEnd(
protected void doKafkaEnd(
long traceId,
long authorization,
boolean tombstone)
long authorization)
{
if (!GrpcKafkaState.initialClosed(state))
{
Expand All @@ -2126,17 +2113,14 @@ private void doKafkaEnd(
initialMax = delegate.initialMax;
state = GrpcKafkaState.closeInitial(state);

if (tombstone)
{
doKafkaDataNull(traceId, authorization);
}
doKafkaDataNull(traceId, authorization);

doEnd(kafka, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization);
}
}

private void doKafkaAbort(
protected void doKafkaAbort(
long traceId,
long authorization)
{
Expand Down Expand Up @@ -2185,7 +2169,7 @@ private void onKafkaMessage(
}
}

private void onKafkaBegin(
protected void onKafkaBegin(
BeginFW begin)
{
final long sequence = begin.sequence();
Expand All @@ -2205,7 +2189,7 @@ private void onKafkaBegin(
doKafkaWindow(traceId);
}

private void onKafkaEnd(
protected void onKafkaEnd(
EndFW end)
{
final long sequence = end.sequence();
Expand All @@ -2224,7 +2208,7 @@ private void onKafkaEnd(
delegate.onKafkaEnd(traceId, authorization);
}

private void onKafkaAbort(
protected void onKafkaAbort(
AbortFW abort)
{
final long sequence = abort.sequence();
Expand All @@ -2243,7 +2227,7 @@ private void onKafkaAbort(
delegate.onKafkaAbort(traceId, authorization);
}

private void onKafkaWindow(
protected void onKafkaWindow(
WindowFW window)
{
final long sequence = window.sequence();
Expand Down Expand Up @@ -2271,15 +2255,17 @@ private void onKafkaWindow(
doKafkaEndAck(authorization, traceId);
}

private void doKafkaEndAck(long authorization, long traceId)
protected void doKafkaEndAck(
long authorization,
long traceId)
{
if (GrpcKafkaState.initialClosing(state) && initialSeq == initialAck)
{
doKafkaEnd(traceId, authorization, true);
doKafkaEnd(traceId, authorization);
}
}

private void onKafkaReset(
protected void onKafkaReset(
ResetFW reset)
{
final long sequence = reset.sequence();
Expand All @@ -2300,7 +2286,7 @@ private void onKafkaReset(
doKafkaReset(traceId, authorization);
}

private void doKafkaReset(
protected void doKafkaReset(
long traceId,
long authorization)
{
Expand All @@ -2313,7 +2299,7 @@ private void doKafkaReset(
}
}

private void doKafkaWindow(
protected void doKafkaWindow(
long traceId)
{
if (kafka != null && !GrpcKafkaState.replyClosed(state))
Expand All @@ -2329,7 +2315,7 @@ private void doKafkaWindow(
}
}

private void doKafkaDataNull(
protected void doKafkaDataNull(
long traceId,
long authorization)
{
Expand All @@ -2347,6 +2333,35 @@ private void doKafkaDataNull(
}
}

private final class KafkaProduceNoReplyProxy extends KafkaProduceProxy
{
private KafkaProduceNoReplyProxy(
long originId,
long routedId,
GrpcProxy delegate,
GrpcKafkaWithProduceResult result)
{
super(originId, routedId, delegate, result);
}

@Override
protected void doKafkaEnd(
long traceId,
long authorization)
{
if (!GrpcKafkaState.initialClosed(state))
{
initialSeq = delegate.initialSeq;
initialAck = delegate.initialAck;
initialMax = delegate.initialMax;
state = GrpcKafkaState.closeInitial(state);

doEnd(kafka, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization);
}
}
}

private final class KafkaCorrelateProxy
{
private MessageConsumer kafka;
Expand Down

0 comments on commit b4eab8b

Please sign in to comment.