Skip to content

Commit b9ca958

Browse files
chore: repaired tests
Signed-off-by: Matt Peterson <[email protected]>
1 parent 0fed3db commit b9ca958

File tree

10 files changed

+51
-36
lines changed

10 files changed

+51
-36
lines changed

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,8 @@ StreamObserver<PublishStreamRequest> publishBlockStream(
108108
System.Logger.Level.DEBUG,
109109
"Executing bidirectional publishBlockStream gRPC method");
110110

111-
if (serviceStatus.isRunning()) {
112-
return new ProducerBlockItemObserver(
113-
streamMediator, publishStreamResponseObserver, itemAckBuilder, serviceStatus);
114-
}
115-
116-
return null;
111+
return new ProducerBlockItemObserver(
112+
streamMediator, publishStreamResponseObserver, itemAckBuilder, serviceStatus);
117113
}
118114

119115
void subscribeBlockStream(

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ public static void main(final String[] args) {
7878
serviceStatus.setWebServer(webServer);
7979
streamMediator.register(serviceStatus);
8080

81-
8281
// Start the web server
8382
webServer.start();
8483
} catch (IOException e) {
@@ -87,7 +86,8 @@ public static void main(final String[] args) {
8786
}
8887

8988
private static StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
90-
buildStreamMediator(final Config config, final ServiceStatus serviceStatus) throws IOException {
89+
buildStreamMediator(final Config config, final ServiceStatus serviceStatus)
90+
throws IOException {
9191
return new LiveStreamMediatorImpl(
9292
new ConcurrentHashMap<>(32),
9393
new FileSystemPersistenceHandler(

server/src/main/java/com/hedera/block/server/ServiceStatus.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright (C) 2024 Hedera Hashgraph, LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package com.hedera.block.server;
218

319
import io.helidon.webserver.WebServer;

server/src/main/java/com/hedera/block/server/ServiceStatusImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.hedera.block.server;
1818

1919
import io.helidon.webserver.WebServer;
20-
2120
import java.util.concurrent.atomic.AtomicBoolean;
2221

2322
public class ServiceStatusImpl implements ServiceStatus {

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.hedera.block.server.mediator;
1818

19+
import static com.hedera.block.protos.BlockStreamService.*;
20+
1921
import com.hedera.block.server.ServiceStatus;
2022
import com.hedera.block.server.ServiceStatusImpl;
2123
import com.hedera.block.server.consumer.BlockItemEventHandler;
@@ -26,15 +28,12 @@
2628
import com.lmax.disruptor.RingBuffer;
2729
import com.lmax.disruptor.dsl.Disruptor;
2830
import com.lmax.disruptor.util.DaemonThreadFactory;
29-
3031
import java.io.IOException;
3132
import java.util.Map;
3233
import java.util.concurrent.ConcurrentHashMap;
3334
import java.util.concurrent.ExecutorService;
3435
import java.util.concurrent.Executors;
3536

36-
import static com.hedera.block.protos.BlockStreamService.*;
37-
3837
/**
3938
* LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible
4039
* for managing the subscribe and unsubscribe operations of downstream consumers. It also proxies
@@ -87,6 +86,12 @@ public LiveStreamMediatorImpl(
8786
this(new ConcurrentHashMap<>(), blockPersistenceHandler, new ServiceStatusImpl());
8887
}
8988

89+
public LiveStreamMediatorImpl(
90+
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
91+
final ServiceStatus serviceStatus) {
92+
this(new ConcurrentHashMap<>(), blockPersistenceHandler, serviceStatus);
93+
}
94+
9095
@Override
9196
public void publishEvent(final BlockItem blockItem) throws IOException {
9297

@@ -130,11 +135,6 @@ public void publishEvent(final BlockItem blockItem) throws IOException {
130135
}
131136
}
132137

133-
@Override
134-
public boolean isPublishing() {
135-
return serviceStatus.isRunning();
136-
}
137-
138138
@Override
139139
public void subscribe(
140140
final BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {

server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.hedera.block.server.ServiceStatus;
2020
import com.hedera.block.server.consumer.BlockItemEventHandler;
21-
2221
import java.io.IOException;
2322

2423
/**
@@ -36,8 +35,6 @@ public interface StreamMediator<U, V> {
3635

3736
void publishEvent(final U blockItem) throws IOException;
3837

39-
boolean isPublishing();
40-
4138
void subscribe(final BlockItemEventHandler<V> handler);
4239

4340
void unsubscribe(final BlockItemEventHandler<V> handler);

server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@
1616

1717
package com.hedera.block.server.producer;
1818

19+
import static com.hedera.block.protos.BlockStreamService.*;
20+
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;
21+
1922
import com.hedera.block.server.ServiceStatus;
2023
import com.hedera.block.server.data.ObjectEvent;
2124
import com.hedera.block.server.mediator.StreamMediator;
2225
import io.grpc.stub.StreamObserver;
23-
2426
import java.io.IOException;
2527
import java.security.NoSuchAlgorithmException;
2628

27-
import static com.hedera.block.protos.BlockStreamService.*;
28-
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;
29-
3029
/**
3130
* The ProducerBlockStreamObserver class plugs into Helidon's server-initiated bidirectional gRPC
3231
* service implementation. Helidon calls methods on this class as networking events occur with the
@@ -71,7 +70,7 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
7170
try {
7271
// Publish the block to all the subscribers unless
7372
// there's an issue with the StreamMediator.
74-
if (streamMediator.isPublishing()) {
73+
if (serviceStatus.isRunning()) {
7574

7675
// Publish the block to the mediator
7776
streamMediator.publishEvent(blockItem);
@@ -106,6 +105,7 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
106105
}
107106

108107
private static PublishStreamResponse buildErrorStreamResponse() {
108+
// TODO: Replace this with a real error enum.
109109
final EndOfStream endOfStream =
110110
EndOfStream.newBuilder()
111111
.setStatus(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN)

server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
import static com.hedera.block.protos.BlockStreamService.*;
2020
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;
2121
import static com.hedera.block.server.util.PersistTestUtils.generateBlockItems;
22-
import static org.junit.jupiter.api.Assertions.assertFalse;
23-
import static org.junit.jupiter.api.Assertions.assertTrue;
22+
import static org.junit.jupiter.api.Assertions.*;
2423
import static org.mockito.Mockito.*;
2524

2625
import com.hedera.block.server.consumer.BlockItemEventHandler;
@@ -73,6 +72,7 @@ public class BlockStreamServiceIT {
7372
@Mock private StreamObserver<SubscribeStreamResponse> subscribeStreamObserver6;
7473

7574
@Mock private WebServer webServer;
75+
@Mock private ServiceStatus serviceStatus;
7676

7777
@Mock private BlockReader<Block> blockReader;
7878
@Mock private BlockWriter<BlockItem> blockWriter;
@@ -104,8 +104,7 @@ public void testPublishBlockStreamRegistrationAndExecution()
104104

105105
final BlockStreamService blockStreamService =
106106
new BlockStreamService(50L, new ItemAckBuilder(), streamMediator);
107-
108-
when(streamMediator.isPublishing()).thenReturn(true);
107+
blockStreamService.register(webServer);
109108

110109
final StreamObserver<PublishStreamRequest> streamObserver =
111110
blockStreamService.publishBlockStream(publishStreamResponseObserver);
@@ -420,9 +419,14 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
420419
final List<BlockItem> blockItems = generateBlockItems(1);
421420
final PublishStreamRequest publishStreamRequest =
422421
PublishStreamRequest.newBuilder().setBlockItem(blockItems.getFirst()).build();
423-
424422
streamObserver.onNext(publishStreamRequest);
425423

424+
// Simulate another producer attempting to connect to the Block Node.
425+
// Later, verify they received a response indicating the stream is closed.
426+
final StreamObserver<PublishStreamRequest> expectedNoOpStreamObserver =
427+
blockStreamService.publishBlockStream(publishStreamResponseObserver);
428+
expectedNoOpStreamObserver.onNext(publishStreamRequest);
429+
426430
synchronized (lock) {
427431
lock.wait(50);
428432
}
@@ -453,7 +457,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
453457
.build();
454458
final var endOfStreamResponse =
455459
PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
456-
verify(publishStreamResponseObserver, times(1)).onNext(endOfStreamResponse);
460+
verify(publishStreamResponseObserver, times(2)).onNext(endOfStreamResponse);
457461
verify(webServer, times(1)).stop();
458462

459463
// Now verify the block was removed from the file system.
@@ -529,7 +533,8 @@ private StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> buildStr
529533
final ServiceStatus serviceStatus = new ServiceStatusImpl();
530534
serviceStatus.setWebServer(webServer);
531535
return new LiveStreamMediatorImpl(
532-
subscribers, new FileSystemPersistenceHandler(blockReader, blockWriter),
536+
subscribers,
537+
new FileSystemPersistenceHandler(blockReader, blockWriter),
533538
serviceStatus);
534539
}
535540

server/src/test/java/com/hedera/block/server/persistence/storage/BlockAsDirectoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public void testPartialBlockRemoval() throws IOException {
228228
JUNIT, testConfig, blockRemover, Util.defaultPerms, 23);
229229

230230
// Write all the block items for 2 blocks
231-
for (int i = 0;i < 20;i++) {
231+
for (int i = 0; i < 20; i++) {
232232
blockWriter.write(blockItems.get(i));
233233
}
234234

server/src/test/java/com/hedera/block/server/producer/ProducerBlockItemObserverTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.protobuf.ByteString;
2828
import com.hedera.block.protos.BlockStreamService;
2929
import com.hedera.block.server.ServiceStatus;
30+
import com.hedera.block.server.ServiceStatusImpl;
3031
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
3132
import com.hedera.block.server.data.ObjectEvent;
3233
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
@@ -73,7 +74,7 @@ public void testProducerOnNext()
7374
new ItemAckBuilder(),
7475
serviceStatus);
7576

76-
when(streamMediator.isPublishing()).thenReturn(true);
77+
when(serviceStatus.isRunning()).thenReturn(true);
7778

7879
BlockItem blockHeader = blockItems.getFirst();
7980
PublishStreamRequest publishStreamRequest =
@@ -106,9 +107,10 @@ public void testProducerOnNext()
106107
public void testProducerToManyConsumers() throws IOException, InterruptedException {
107108
final long TIMEOUT_THRESHOLD_MILLIS = 100L;
108109
final long TEST_TIME = 1_719_427_664_950L;
110+
final ServiceStatus serviceStatus = new ServiceStatusImpl();
109111
final var streamMediator =
110112
new LiveStreamMediatorImpl(
111-
new FileSystemPersistenceHandler(blockReader, blockWriter));
113+
new FileSystemPersistenceHandler(blockReader, blockWriter), serviceStatus);
112114

113115
final var concreteObserver1 =
114116
new ConsumerBlockItemObserver(
@@ -201,7 +203,7 @@ public void testItemAckBuilderExceptionTest()
201203
itemAckBuilder,
202204
serviceStatus);
203205

204-
when(streamMediator.isPublishing()).thenReturn(true);
206+
when(serviceStatus.isRunning()).thenReturn(true);
205207
when(itemAckBuilder.buildAck(any()))
206208
.thenThrow(new NoSuchAlgorithmException("Test exception"));
207209

0 commit comments

Comments
 (0)