Skip to content

Commit 706f4ba

Browse files
wip: refactored class names
Signed-off-by: Matt Peterson <[email protected]>
1 parent 2c30b16 commit 706f4ba

File tree

6 files changed

+17
-26
lines changed

6 files changed

+17
-26
lines changed

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,14 @@
1818

1919
import com.google.protobuf.Descriptors;
2020
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
21-
import com.hedera.block.server.consumer.ConsumerBlockStreamObserver;
21+
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
2222
import com.hedera.block.server.data.ObjectEvent;
2323
import com.hedera.block.server.mediator.StreamMediator;
24-
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
25-
import com.lmax.disruptor.BatchEventProcessorBuilder;
26-
import com.lmax.disruptor.RingBuffer;
27-
import com.lmax.disruptor.dsl.Disruptor;
28-
import com.lmax.disruptor.util.DaemonThreadFactory;
24+
import com.hedera.block.server.producer.ProducerBlockItemObserver;
2925
import io.grpc.stub.StreamObserver;
3026
import io.helidon.webserver.grpc.GrpcService;
3127

3228
import java.time.Clock;
33-
import java.util.concurrent.ExecutorService;
34-
import java.util.concurrent.Executors;
3529

3630
import static com.hedera.block.server.Constants.*;
3731

@@ -105,7 +99,7 @@ private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(
10599
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
106100
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");
107101

108-
return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver);
102+
return new ProducerBlockItemObserver(streamMediator, responseStreamObserver);
109103
}
110104

111105
/**
@@ -121,7 +115,7 @@ private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(f
121115
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");
122116

123117
// Return a custom StreamObserver to handle streaming blocks from the producer.
124-
final var streamObserver = new ConsumerBlockStreamObserver(
118+
final var streamObserver = new ConsumerBlockItemObserver(
125119
timeoutThresholdMillis,
126120
Clock.systemDefaultZone(),
127121
Clock.systemDefaultZone(),

server/src/main/java/com/hedera/block/server/consumer/BlockEventHandler.java renamed to server/src/main/java/com/hedera/block/server/consumer/BlockItemEventHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@
1818
import com.lmax.disruptor.EventHandler;
1919
import io.grpc.stub.StreamObserver;
2020

21-
public interface BlockEventHandler<U, V> extends StreamObserver<V>, EventHandler<U> {
21+
public interface BlockItemEventHandler<U, V> extends StreamObserver<V>, EventHandler<U> {
2222
void awaitShutdown() throws InterruptedException;
2323
}

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockStreamObserver.java renamed to server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
3030
* via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
3131
*/
32-
public class ConsumerBlockStreamObserver implements BlockEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> {
32+
public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> {
3333

3434
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3535

@@ -52,7 +52,7 @@ public class ConsumerBlockStreamObserver implements BlockEventHandler<ObjectEven
5252
*
5353
* @param responseStreamObserver the response stream observer
5454
*/
55-
public ConsumerBlockStreamObserver(
55+
public ConsumerBlockItemObserver(
5656
final long timeoutThresholdMillis,
5757
final InstantSource producerLivenessClock,
5858
final InstantSource consumerLivenessClock,

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.hedera.block.server.mediator;
1818

1919
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
20-
import com.hedera.block.server.consumer.BlockEventHandler;
20+
import com.hedera.block.server.consumer.BlockItemEventHandler;
2121
import com.hedera.block.server.data.ObjectEvent;
2222
import com.hedera.block.server.persistence.BlockPersistenceHandler;
2323
import com.lmax.disruptor.BatchEventProcessor;
@@ -43,7 +43,7 @@ public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockS
4343
private final RingBuffer<ObjectEvent<BlockStreamServiceGrpcProto.Block>> ringBuffer;
4444
private final ExecutorService executor;
4545

46-
private final Map<BlockEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse>,
46+
private final Map<BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse>,
4747
BatchEventProcessor<ObjectEvent<BlockStreamServiceGrpcProto.Block>>> subscribers = new HashMap<>();
4848

4949
private final BlockPersistenceHandler<BlockStreamServiceGrpcProto.Block> blockPersistenceHandler;
@@ -74,7 +74,7 @@ public void publishEvent(BlockStreamServiceGrpcProto.Block block) {
7474
}
7575

7676
@Override
77-
public void subscribe(final BlockEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> handler) {
77+
public void subscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> handler) {
7878

7979
// Initialize the batch event processor and set it on the ring buffer
8080
final var batchEventProcessor = new BatchEventProcessorBuilder()
@@ -87,7 +87,7 @@ public void subscribe(final BlockEventHandler<ObjectEvent<BlockStreamServiceGrpc
8787
}
8888

8989
@Override
90-
public void unsubscribe(final BlockEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> handler) {
90+
public void unsubscribe(final BlockItemEventHandler<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> handler) {
9191
final var batchEventProcessor = subscribers.remove(handler);
9292

9393
// Stop the processor

server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
package com.hedera.block.server.mediator;
1818

1919
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
20-
import com.hedera.block.server.consumer.BlockEventHandler;
21-
import com.hedera.block.server.data.ObjectEvent;
22-
import com.lmax.disruptor.EventTranslator;
20+
import com.hedera.block.server.consumer.BlockItemEventHandler;
2321

2422
/**
2523
* The StreamMediator interface represents a one-to-many bridge between a bidirectional stream of blocks from a
@@ -36,7 +34,7 @@ public interface StreamMediator<U, V> {
3634

3735
void publishEvent(final BlockStreamServiceGrpcProto.Block block);
3836

39-
void subscribe(final BlockEventHandler<U, V> handler);
37+
void subscribe(final BlockItemEventHandler<U, V> handler);
4038

41-
void unsubscribe(final BlockEventHandler<U, V> handler);
39+
void unsubscribe(final BlockItemEventHandler<U, V> handler);
4240
}

server/src/main/java/com/hedera/block/server/producer/ProducerBlockStreamObserver.java renamed to server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
2020
import com.hedera.block.server.data.ObjectEvent;
2121
import com.hedera.block.server.mediator.StreamMediator;
22-
import com.lmax.disruptor.RingBuffer;
2322
import io.grpc.stub.StreamObserver;
2423

2524
/**
@@ -28,7 +27,7 @@
2827
* with the connection to the upstream producer (e.g. blocks streamed from the Consensus Node to
2928
* the server).
3029
*/
31-
public class ProducerBlockStreamObserver implements StreamObserver<BlockStreamServiceGrpcProto.Block> {
30+
public class ProducerBlockItemObserver implements StreamObserver<BlockStreamServiceGrpcProto.Block> {
3231

3332
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3433

@@ -42,8 +41,8 @@ public class ProducerBlockStreamObserver implements StreamObserver<BlockStreamSe
4241
*
4342
* @param responseStreamObserver the response stream observer
4443
*/
45-
public ProducerBlockStreamObserver(final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator,
46-
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
44+
public ProducerBlockItemObserver(final StreamMediator<ObjectEvent<BlockStreamServiceGrpcProto.Block>, BlockStreamServiceGrpcProto.BlockResponse> streamMediator,
45+
final StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
4746
this.streamMediator = streamMediator;
4847
this.responseStreamObserver = responseStreamObserver;
4948
}

0 commit comments

Comments
 (0)