Skip to content

Commit d5b1a9f

Browse files
fix: repaired type errors after proto change
Signed-off-by: Matt Peterson <[email protected]>
1 parent 582cfae commit d5b1a9f

File tree

13 files changed

+111
-111
lines changed

13 files changed

+111
-111
lines changed

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,15 @@ public class BlockStreamService implements GrpcService {
4242
private final System.Logger LOGGER = System.getLogger(getClass().getName());
4343

4444
private final long timeoutThresholdMillis;
45-
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
45+
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator;
4646

4747
/**
4848
* Constructor for the BlockStreamService class.
4949
*
5050
* @param timeoutThresholdMillis the timeout threshold in milliseconds
5151
*/
5252
public BlockStreamService(final long timeoutThresholdMillis,
53-
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {
53+
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator) {
5454
this.timeoutThresholdMillis = timeoutThresholdMillis;
5555
this.streamMediator = streamMediator;
5656
}
@@ -92,11 +92,11 @@ public void update(final Routing routing) {
9292
*
9393
* @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer.
9494
*
95-
* @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer
95+
* @return a custom StreamObserver to handle streaming blockItems from the producer to all subscribed consumer
9696
* via the streamMediator as well as sending responses back to the producer.
9797
*/
98-
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(
99-
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
98+
private StreamObserver<BlockStreamServiceGrpcProto.BlockItem> streamSink(
99+
final StreamObserver<BlockStreamServiceGrpcProto.BlockItemResponse> responseStreamObserver) {
100100
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");
101101

102102
return new ProducerBlockItemObserver(streamMediator, responseStreamObserver);
@@ -111,7 +111,7 @@ private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(
111111
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well
112112
* as handling responses from the consumer.
113113
*/
114-
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
114+
private StreamObserver<BlockStreamServiceGrpcProto.BlockItemResponse> streamSource(final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver) {
115115
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");
116116

117117
// Return a custom StreamObserver to handle streaming blocks from the producer.

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838
public class Server {
3939

4040
// Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class.
41-
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.Block>, StreamObserver<BlockStreamServiceGrpcProto.Block>> clientBidiStreamingMethod;
42-
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockResponse>, StreamObserver<BlockStreamServiceGrpcProto.Block>> serverBidiStreamingMethod;
41+
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockItem>, StreamObserver<BlockStreamServiceGrpcProto.BlockItem>> clientBidiStreamingMethod;
42+
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockItemResponse>, StreamObserver<BlockStreamServiceGrpcProto.BlockItem>> serverBidiStreamingMethod;
4343

4444
private static final System.Logger LOGGER = System.getLogger(Server.class.getName());
4545

@@ -62,7 +62,7 @@ public static void main(final String[] args) {
6262
final long consumerTimeoutThreshold = config.get(BLOCKNODE_SERVER_CONSUMER_TIMEOUT_THRESHOLD_KEY).asLong().orElse(1500L);
6363

6464
// Initialize the block storage, cache, and service
65-
final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
65+
final BlockStorage<BlockStreamServiceGrpcProto.BlockItem> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
6666
final BlockStreamService blockStreamService = new BlockStreamService(consumerTimeoutThreshold,
6767
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage)));
6868

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@
2929
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
3030
* via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
3131
*/
32-
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> {
32+
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> {
3333

3434
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3535

36-
private final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;
36+
private final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver;
3737

3838
private final long timeoutThresholdMillis;
3939

@@ -45,7 +45,7 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
4545

4646
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
4747

48-
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
48+
private final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator;
4949

5050
/**
5151
* Constructor for the LiveStreamObserverImpl class.
@@ -56,8 +56,8 @@ public ConsumerBlockItemObserver(
5656
final long timeoutThresholdMillis,
5757
final InstantSource producerLivenessClock,
5858
final InstantSource consumerLivenessClock,
59-
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator,
60-
final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
59+
final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> streamMediator,
60+
final StreamObserver<BlockStreamServiceGrpcProto.BlockItem> responseStreamObserver) {
6161

6262
this.timeoutThresholdMillis = timeoutThresholdMillis;
6363
this.producerLivenessClock = producerLivenessClock;
@@ -75,7 +75,7 @@ public ConsumerBlockItemObserver(
7575
*
7676
*/
7777
@Override
78-
public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.Block> event, final long l, final boolean b) throws Exception {
78+
public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.BlockItem> event, final long l, final boolean b) throws Exception {
7979

8080
// Check if the consumer has timed out. If so, unsubscribe the observer from the mediator.
8181
if (isThresholdExceeded(consumerLivenessMillis)) {
@@ -91,18 +91,18 @@ public void onEvent(final ObjectEvent<BlockStreamServiceGrpcProto.Block> event,
9191
/**
9292
* The onNext() method is triggered by Helidon when a consumer sends a blockResponse via the bidirectional stream.
9393
*
94-
* @param blockResponse the BlockResponse passed back to the server via the bidirectional stream to the downstream consumer.
94+
* @param blockItemResponse the BlockItemResponse passed back to the server via the bidirectional stream to the downstream consumer.
9595
*/
9696
@Override
97-
public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse) {
97+
public void onNext(final BlockStreamServiceGrpcProto.BlockItemResponse blockItemResponse) {
9898

9999
// Check if the producer has timed out. If so, unsubscribe the observer from the mediator.
100100
if (isThresholdExceeded(producerLivenessMillis)) {
101101
LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer.");
102102
streamMediator.unsubscribe(this);
103103
} else {
104104
// Refresh the consumer liveness
105-
LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockResponse);
105+
LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockItemResponse);
106106
consumerLivenessMillis = consumerLivenessClock.millis();
107107
}
108108
}

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,45 +36,45 @@
3636
* managing the subscribe and unsubscribe operations of downstream consumers. It also proxies live
3737
* blocks to the subscribers as they arrive and persists the blocks to the block persistence store.
3838
*/
39-
public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> {
39+
public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> {
4040

4141
private final System.Logger LOGGER = System.getLogger(getClass().getName());
4242

43-
private final RingBuffer<ObjectEvent<BlockStreamServiceGrpcProto.Block>> ringBuffer;
43+
private final RingBuffer<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>> ringBuffer;
4444
private final ExecutorService executor;
4545

46-
private final Map<BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse>,
47-
BatchEventProcessor<ObjectEvent<BlockStreamServiceGrpcProto.Block>>> subscribers = new HashMap<>();
46+
private final Map<BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse>,
47+
BatchEventProcessor<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>>> subscribers = new HashMap<>();
4848

49-
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
49+
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.BlockItem> blockPersistenceHandler;
5050

5151
/**
5252
* Constructor for the LiveStreamMediatorImpl class.
5353
*
5454
* @param blockPersistenceHandler the block persistence handler
5555
*/
56-
public LiveStreamMediatorImpl(final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler) {
56+
public LiveStreamMediatorImpl(final BlockPersistenceHandler<BlockStreamServiceGrpcProto.BlockItem> blockPersistenceHandler) {
5757
this.blockPersistenceHandler = blockPersistenceHandler;
5858

5959
// Initialize and start the disruptor
60-
final Disruptor<ObjectEvent<BlockStreamServiceGrpcProto.Block>> disruptor = new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE);
60+
final Disruptor<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>> disruptor = new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE);
6161
this.ringBuffer = disruptor.start();
6262
this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
6363
}
6464

6565
@Override
66-
public void publishEvent(BlockStreamServiceGrpcProto.Block block) {
66+
public void publishEvent(BlockStreamServiceGrpcProto.BlockItem blockItem) {
6767

6868
// Publish the block for all subscribers to receive
69-
LOGGER.log(System.Logger.Level.INFO, "Publishing block: {0}", block);
70-
ringBuffer.publishEvent((event, sequence) -> event.set(block));
69+
LOGGER.log(System.Logger.Level.INFO, "Publishing block: {0}", blockItem);
70+
ringBuffer.publishEvent((event, sequence) -> event.set(blockItem));
7171

7272
// Block persistence
73-
blockPersistenceHandler.persist(block);
73+
blockPersistenceHandler.persist(blockItem);
7474
}
7575

7676
@Override
77-
public void subscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> handler) {
77+
public void subscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> handler) {
7878

7979
// Initialize the batch event processor and set it on the ring buffer
8080
final var batchEventProcessor = new BatchEventProcessorBuilder()
@@ -87,7 +87,7 @@ public void subscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamService
8787
}
8888

8989
@Override
90-
public void unsubscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> handler) {
90+
public void unsubscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.BlockItem>, BlockStreamServiceGrpcProto.BlockItemResponse> handler) {
9191
final var batchEventProcessor = subscribers.remove(handler);
9292

9393
// Stop the processor

server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
*/
3333
public interface StreamMediator<U, V> {
3434

35-
void publishEvent(final BlockStreamServiceGrpcProto.Block block);
35+
void publishEvent(final BlockStreamServiceGrpcProto.BlockItem blockItem);
3636

3737
void subscribe(final BlockItemEventHandler<U, V> handler);
3838

server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,31 +28,31 @@
2828
* Write-Through cache handler coordinates between the block storage and the block cache to ensure the block
2929
* is persisted to the storage before being cached.
3030
*/
31-
public class WriteThroughCacheHandler implements BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> {
31+
public class WriteThroughCacheHandler implements BlockPersistenceHandler<BlockStreamServiceGrpcProto.BlockItem> {
3232

33-
private final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage;
33+
private final BlockStorage<BlockStreamServiceGrpcProto.BlockItem> blockStorage;
3434

3535
/**
3636
* Constructor for the WriteThroughCacheHandler class.
3737
*
3838
* @param blockStorage the block storage
3939
*/
40-
public WriteThroughCacheHandler(final BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage) {
40+
public WriteThroughCacheHandler(final BlockStorage<BlockStreamServiceGrpcProto.BlockItem> blockStorage) {
4141
this.blockStorage = blockStorage;
4242
}
4343

4444
/**
4545
* Persists the block to the block storage and cache the block.
4646
*
47-
* @param block the block to persist
47+
* @param blockItem the block to persist
4848
* @return the block id
4949
*/
5050
@Override
51-
public Long persist(final BlockStreamServiceGrpcProto.Block block) {
51+
public Long persist(final BlockStreamServiceGrpcProto.BlockItem blockItem) {
5252

5353
// Write-Through cache
54-
blockStorage.write(block);
55-
return block.getId();
54+
blockStorage.write(blockItem);
55+
return blockItem.getId();
5656
}
5757

5858
/**
@@ -63,10 +63,10 @@ public Long persist(final BlockStreamServiceGrpcProto.Block block) {
6363
* @return a queue of blocks
6464
*/
6565
@Override
66-
public Queue<BlockStreamServiceGrpcProto.Block> readRange(final long startBlockId, final long endBlockId) {
67-
final Queue<BlockStreamServiceGrpcProto.Block> blocks = new LinkedList<>();
66+
public Queue<BlockStreamServiceGrpcProto.BlockItem> readRange(final long startBlockId, final long endBlockId) {
67+
final Queue<BlockStreamServiceGrpcProto.BlockItem> blocks = new LinkedList<>();
6868
for (long count = startBlockId; count <= endBlockId; count++) {
69-
final Optional<BlockStreamServiceGrpcProto.Block> blockOpt = read(count);
69+
final Optional<BlockStreamServiceGrpcProto.BlockItem> blockOpt = read(count);
7070
blockOpt.ifPresent(blocks::add);
7171
}
7272

@@ -82,7 +82,7 @@ public Queue<BlockStreamServiceGrpcProto.Block> readRange(final long startBlockI
8282
* @return an Optional with the block
8383
*/
8484
@Override
85-
public Optional<BlockStreamServiceGrpcProto.Block> read(final long id) {
85+
public Optional<BlockStreamServiceGrpcProto.BlockItem> read(final long id) {
8686
return blockStorage.read(id);
8787
}
8888
}

server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ public interface BlockStorage<V> {
2828
/**
2929
* Writes a block to storage.
3030
*
31-
* @param block the block to write
31+
* @param blockItem the block to write
3232
* @return the id of the block
3333
*/
34-
Optional<Long> write(final V block);
34+
Optional<Long> write(final V blockItem);
3535

3636
/**
3737
* Reads a block from storage.

server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
/**
3333
* The FileSystemBlockStorage class implements the BlockStorage interface to store blocks to the filesystem.
3434
*/
35-
public class FileSystemBlockStorage implements BlockStorage<BlockStreamServiceGrpcProto.Block> {
35+
public class FileSystemBlockStorage implements BlockStorage<BlockStreamServiceGrpcProto.BlockItem> {
3636

3737
public static final String BLOCK_FILE_EXTENSION = ".blk";
3838

@@ -74,16 +74,16 @@ public FileSystemBlockStorage(final String key, final Config config) throws IOEx
7474
/**
7575
* Writes a block to the filesystem.
7676
*
77-
* @param block the block to write
77+
* @param blockItem the block to write
7878
* @return the id of the block
7979
*/
8080
@Override
81-
public Optional<Long> write(final BlockStreamServiceGrpcProto.Block block) {
82-
Long id = block.getId();
81+
public Optional<Long> write(final BlockStreamServiceGrpcProto.BlockItem blockItem) {
82+
Long id = blockItem.getId();
8383
final String fullPath = resolvePath(id);
8484

8585
try (FileOutputStream fos = new FileOutputStream(fullPath)) {
86-
block.writeTo(fos);
86+
blockItem.writeTo(fos);
8787
LOGGER.log(System.Logger.Level.DEBUG, "Successfully wrote the block file: " + fullPath);
8888

8989
return Optional.of(id);
@@ -100,14 +100,14 @@ public Optional<Long> write(final BlockStreamServiceGrpcProto.Block block) {
100100
* @return the block
101101
*/
102102
@Override
103-
public Optional<BlockStreamServiceGrpcProto.Block> read(final Long id) {
103+
public Optional<BlockStreamServiceGrpcProto.BlockItem> read(final Long id) {
104104
return read(resolvePath(id));
105105
}
106106

107-
private Optional<BlockStreamServiceGrpcProto.Block> read(final String filePath) {
107+
private Optional<BlockStreamServiceGrpcProto.BlockItem> read(final String filePath) {
108108

109109
try (FileInputStream fis = new FileInputStream(filePath)) {
110-
return Optional.of(BlockStreamServiceGrpcProto.Block.parseFrom(fis));
110+
return Optional.of(BlockStreamServiceGrpcProto.BlockItem.parseFrom(fis));
111111
} catch (FileNotFoundException io) {
112112
LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + filePath, io);
113113
return Optional.empty();

0 commit comments

Comments
 (0)