20
20
import java .util .concurrent .locks .ReentrantLock ;
21
21
import java .util .function .Consumer ;
22
22
import org .hiero .block .api .PublishStreamResponse ;
23
- import org .hiero .block .api .PublishStreamResponse .Acknowledgement ;
24
23
import org .hiero .block .api .PublishStreamResponse .BlockAcknowledgement ;
25
24
import org .hiero .block .api .PublishStreamResponse .EndOfStream ;
25
+ import org .hiero .block .api .PublishStreamResponse .EndOfStream .Code ;
26
26
import org .hiero .block .api .PublishStreamResponse .ResendBlock ;
27
27
import org .hiero .block .api .PublishStreamResponse .ResponseOneOfType ;
28
28
import org .hiero .block .api .PublishStreamResponse .SkipBlock ;
29
- import org .hiero .block .api .PublishStreamResponseCode ;
30
29
import org .hiero .block .internal .BlockItemUnparsed ;
31
30
import org .hiero .block .node .publisher .UpdateCallback .UpdateType ;
32
31
import org .hiero .block .node .spi .BlockNodePlugin ;
@@ -201,9 +200,8 @@ void sendDuplicateAck(final long latestAckBlock) {
201
200
// sending a duplicate ack should also update the latestAck.
202
201
latestAcknowledgedBlock = latestAckBlock ;
203
202
final BlockAcknowledgement ack = new BlockAcknowledgement (latestAckBlock , null , true );
204
- final Acknowledgement acknowledgement = new Acknowledgement (ack );
205
203
final PublishStreamResponse duplicateResponse =
206
- new PublishStreamResponse (new OneOf <>(ResponseOneOfType .ACKNOWLEDGEMENT , acknowledgement ));
204
+ new PublishStreamResponse (new OneOf <>(ResponseOneOfType .ACKNOWLEDGEMENT , ack ));
207
205
sendResponse (duplicateResponse );
208
206
}
209
207
@@ -215,7 +213,7 @@ void sendStreamItemsBehind(final long latestAckBlock) {
215
213
currentBlockState = BlockState .WAITING_FOR_RESEND ;
216
214
newBlockItems .clear ();
217
215
218
- final EndOfStream endOfStream = new EndOfStream (PublishStreamResponseCode . STREAM_ITEMS_BEHIND , latestAckBlock );
216
+ final EndOfStream endOfStream = new EndOfStream (Code . BEHIND , latestAckBlock );
219
217
final PublishStreamResponse response =
220
218
new PublishStreamResponse (new OneOf <>(ResponseOneOfType .END_STREAM , endOfStream ));
221
219
sendResponse (response );
@@ -247,9 +245,8 @@ void close() {
247
245
currentBlockState = BlockState .DISCONNECTED ;
248
246
// try to send a close response to the client
249
247
250
- final PublishStreamResponse closeResponse = new PublishStreamResponse (new OneOf <>(
251
- ResponseOneOfType .END_STREAM ,
252
- new EndOfStream (PublishStreamResponseCode .STREAM_ITEMS_SUCCESS , currentBlockNumber )));
248
+ final PublishStreamResponse closeResponse = new PublishStreamResponse (
249
+ new OneOf <>(ResponseOneOfType .END_STREAM , new EndOfStream (Code .SUCCESS , currentBlockNumber )));
253
250
sendResponse (closeResponse );
254
251
255
252
if (subscription != null ) {
@@ -286,8 +283,7 @@ void handlePersisted(PersistedNotification notification) {
286
283
latestAcknowledgedBlock = blockToSend ;
287
284
// TODO BlockAcknowledgement block hash should be removed from spec as not needed
288
285
final PublishStreamResponse goodBlockResponse = new PublishStreamResponse (new OneOf <>(
289
- ResponseOneOfType .ACKNOWLEDGEMENT ,
290
- new Acknowledgement (new BlockAcknowledgement (blockToSend , null , false ))));
286
+ ResponseOneOfType .ACKNOWLEDGEMENT , new BlockAcknowledgement (blockToSend , null , false )));
291
287
// send the acknowledgment to the client
292
288
sendResponse (goodBlockResponse );
293
289
blockToSend ++;
0 commit comments