Skip to content

Commit 2671390

Browse files
committed
fix: Publisher API is missing some request forms
* Added the missing `EndStream` item for publish request * Moved publish end stream code and response end stream code into their respective messages * Removed `Acknowledgement` wrapper around `BlockAcknowledgement`. * Removed unnecessary enum name prefixes * Discovered another PBJ bug (unit tests generation) related to message name conflicts * Fixed all references in code and tests Fixes #1151 Signed-off-by: Joseph S <[email protected]>
1 parent 9feb19c commit 2671390

File tree

8 files changed

+213
-166
lines changed

8 files changed

+213
-166
lines changed

block-node/publisher/src/main/java/org/hiero/block/node/publisher/BlockStreamProducerSession.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020
import java.util.concurrent.locks.ReentrantLock;
2121
import java.util.function.Consumer;
2222
import org.hiero.block.api.PublishStreamResponse;
23-
import org.hiero.block.api.PublishStreamResponse.Acknowledgement;
2423
import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement;
2524
import org.hiero.block.api.PublishStreamResponse.EndOfStream;
2625
import org.hiero.block.api.PublishStreamResponse.ResendBlock;
2726
import org.hiero.block.api.PublishStreamResponse.ResponseOneOfType;
2827
import org.hiero.block.api.PublishStreamResponse.SkipBlock;
29-
import org.hiero.block.api.PublishStreamResponseCode;
28+
import org.hiero.block.api.PublishStreamResponse.EndOfStream.Code;
3029
import org.hiero.block.internal.BlockItemUnparsed;
3130
import org.hiero.block.node.publisher.UpdateCallback.UpdateType;
3231
import org.hiero.block.node.spi.BlockNodePlugin;
@@ -201,9 +200,8 @@ void sendDuplicateAck(final long latestAckBlock) {
201200
// sending a duplicate ack should also update the latestAck.
202201
latestAcknowledgedBlock = latestAckBlock;
203202
final BlockAcknowledgement ack = new BlockAcknowledgement(latestAckBlock, null, true);
204-
final Acknowledgement acknowledgement = new Acknowledgement(ack);
205203
final PublishStreamResponse duplicateResponse =
206-
new PublishStreamResponse(new OneOf<>(ResponseOneOfType.ACKNOWLEDGEMENT, acknowledgement));
204+
new PublishStreamResponse(new OneOf<>(ResponseOneOfType.ACKNOWLEDGEMENT, ack));
207205
sendResponse(duplicateResponse);
208206
}
209207

@@ -215,7 +213,7 @@ void sendStreamItemsBehind(final long latestAckBlock) {
215213
currentBlockState = BlockState.WAITING_FOR_RESEND;
216214
newBlockItems.clear();
217215

218-
final EndOfStream endOfStream = new EndOfStream(PublishStreamResponseCode.STREAM_ITEMS_BEHIND, latestAckBlock);
216+
final EndOfStream endOfStream = new EndOfStream(Code.BEHIND, latestAckBlock);
219217
final PublishStreamResponse response =
220218
new PublishStreamResponse(new OneOf<>(ResponseOneOfType.END_STREAM, endOfStream));
221219
sendResponse(response);
@@ -249,7 +247,7 @@ void close() {
249247

250248
final PublishStreamResponse closeResponse = new PublishStreamResponse(new OneOf<>(
251249
ResponseOneOfType.END_STREAM,
252-
new EndOfStream(PublishStreamResponseCode.STREAM_ITEMS_SUCCESS, currentBlockNumber)));
250+
new EndOfStream(Code.SUCCESS, currentBlockNumber)));
253251
sendResponse(closeResponse);
254252

255253
if (subscription != null) {
@@ -287,7 +285,7 @@ void handlePersisted(PersistedNotification notification) {
287285
// TODO BlockAcknowledgement block hash should be removed from spec as not needed
288286
final PublishStreamResponse goodBlockResponse = new PublishStreamResponse(new OneOf<>(
289287
ResponseOneOfType.ACKNOWLEDGEMENT,
290-
new Acknowledgement(new BlockAcknowledgement(blockToSend, null, false))));
288+
new BlockAcknowledgement(blockToSend, null, false)));
291289
// send the acknowledgment to the client
292290
sendResponse(goodBlockResponse);
293291
blockToSend++;

block-node/publisher/src/main/java/org/hiero/block/node/publisher/PublisherServicePlugin.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public final class PublisherServicePlugin implements BlockNodePlugin, ServiceInt
9696
/** The current ACKED block number */
9797
private long latestAckedBlockNumber = UNKNOWN_BLOCK_NUMBER;
9898
/** The current chosen primary consensus node session, or null if there is no primary */
99-
private BlockStreamProducerSession currentPrimarySession = null;
99+
private BlockStreamProducerSession currentPrimarySession;
100100
/** The next session id to use when a new session is created */
101101
private long nextSessionId = 0;
102102

block-node/publisher/src/test/java/org/hiero/block/node/publisher/PublisherTest.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@
2424
import org.hiero.block.api.PublishStreamRequest;
2525
import org.hiero.block.api.PublishStreamRequest.RequestOneOfType;
2626
import org.hiero.block.api.PublishStreamResponse;
27-
import org.hiero.block.api.PublishStreamResponse.Acknowledgement;
2827
import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement;
29-
import org.hiero.block.api.PublishStreamResponseCode;
28+
import org.hiero.block.api.PublishStreamResponse.EndOfStream.Code;
3029
import org.hiero.block.internal.BlockUnparsed;
3130
import org.hiero.block.node.app.fixtures.plugintest.GrpcPluginTestBase;
3231
import org.hiero.block.node.app.fixtures.plugintest.NoBlocksHistoricalBlockFacility;
@@ -108,14 +107,14 @@ void testPublisherSendsOnBlockPersistedNotification() throws ParseException {
108107
assertEquals(1, fromPluginBytes.size());
109108
PublishStreamResponse response = PublishStreamResponse.PROTOBUF.parse(fromPluginBytes.getFirst());
110109
assertEquals(ACKNOWLEDGEMENT, response.response().kind());
111-
final Acknowledgement ack = response.response().as();
112-
assertEquals(new BlockAcknowledgement(0, null, false), ack.blockAck());
110+
final BlockAcknowledgement ack = response.response().as();
111+
assertEquals(new BlockAcknowledgement(0, null, false), ack);
113112
// check second block
114113
blockMessaging.sendBlockPersisted(new PersistedNotification(1, 1, 1));
115114
PublishStreamResponse response1 = PublishStreamResponse.PROTOBUF.parse(fromPluginBytes.getLast());
116115
assertEquals(ACKNOWLEDGEMENT, response1.response().kind());
117-
final Acknowledgement ack1 = response1.response().as();
118-
assertEquals(new BlockAcknowledgement(1, null, false), ack1.blockAck());
116+
final BlockAcknowledgement ack1 = response1.response().as();
117+
assertEquals(new BlockAcknowledgement(1, null, false), ack1);
119118
}
120119

121120
@Test
@@ -142,7 +141,7 @@ void testPublisherDuplicateBlock() throws ParseException {
142141
// last fromPluginBytes should be Duplicate
143142
PublishStreamResponse response = PublishStreamResponse.PROTOBUF.parse(fromPluginBytes.getLast());
144143
assertEquals(ACKNOWLEDGEMENT, response.response().kind());
145-
BlockAcknowledgement ack = response.acknowledgement().blockAck();
144+
BlockAcknowledgement ack = response.acknowledgement();
146145
BlockAcknowledgement expectedAck = new BlockAcknowledgement(0, null, true);
147146
assertEquals(expectedAck, ack, "Expected an ACK with flag of duplicate (blockAlreadyExists=true)");
148147
}
@@ -171,9 +170,9 @@ void testFutureBlock() throws ParseException {
171170
assertEquals(2, fromPluginBytes.size());
172171
PublishStreamResponse aheadResponse = PublishStreamResponse.PROTOBUF.parse(fromPluginBytes.getLast());
173172

174-
// verify we get an endOfStream with status code STREAM_ITEMS_BEHIND
173+
// verify we get an endOfStream with status code BEHIND
175174
assertEquals(
176-
PublishStreamResponseCode.STREAM_ITEMS_BEHIND,
175+
Code.BEHIND,
177176
aheadResponse.endStream().status());
178177
assertEquals(0, aheadResponse.endStream().blockNumber());
179178
}

0 commit comments

Comments
 (0)