Skip to content

Commit 48c8d90

Browse files
fix: enabled mockito. changed LiveStreamObserverImpl to use Java Clock. added tests"
Signed-off-by: Matt Peterson <[email protected]>
1 parent 95e4da4 commit 48c8d90

File tree

8 files changed

+290
-15
lines changed

8 files changed

+290
-15
lines changed

buildSrc/src/main/kotlin/com.hedera.block.jpms-modules.gradle.kts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,8 @@ extraJavaModuleInfo {
8787
}
8888
module("io.grpc:grpc-util", "io.grpc.util")
8989
module("io.perfmark:perfmark-api", "io.perfmark")
90+
91+
module("junit:junit", "junit")
92+
module("org.mockito:mockito-core", "org.mockito")
93+
module("org.mockito:mockito-junit-jupiter", "org.mockito.junit.jupiter")
9094
}

gradle/modules.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ io.helidon.webserver=io.helidon.webserver:helidon-webserver
33
io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc
44
io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5
55
io.grpc=io.grpc:grpc-stub
6-
grpc.protobuf=io.grpc:grpc-protobuf:1.20.0
6+
grpc.protobuf=io.grpc:grpc-protobuf

server/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ application {
2626

2727
testModuleInfo {
2828
requires("org.junit.jupiter.api")
29+
requires("org.mockito")
30+
requires("org.mockito.junit.jupiter")
2931
}

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.grpc.stub.StreamObserver;
2626
import io.helidon.webserver.grpc.GrpcService;
2727

28+
import java.time.Clock;
29+
2830
import static com.hedera.block.server.Constants.*;
2931

3032
/**
@@ -117,6 +119,8 @@ private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(f
117119
// Return a custom StreamObserver to handle streaming blocks from the producer.
118120
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamObserver = new LiveStreamObserverImpl(
119121
timeoutThresholdMillis,
122+
Clock.systemDefaultZone(),
123+
Clock.systemDefaultZone(),
120124
streamMediator,
121125
responseStreamObserver);
122126

server/src/main/java/com/hedera/block/server/consumer/LiveStreamObserverImpl.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import com.hedera.block.server.mediator.StreamMediator;
2121
import io.grpc.stub.StreamObserver;
2222

23+
import java.time.Clock;
24+
import java.time.Duration;
25+
import java.time.Instant;
26+
2327
/**
2428
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
2529
* via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
@@ -31,10 +35,14 @@ public class LiveStreamObserverImpl implements LiveStreamObserver<BlockStreamSer
3135
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> mediator;
3236
private final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;
3337

34-
private long consumerLivenessMillis;
35-
private long producerLivenessMillis;
3638
private final long timeoutThresholdMillis;
3739

40+
private final Clock producerLivenessClock;
41+
private Instant producerLivenessInstant;
42+
43+
private final Clock consumerLivenessClock;
44+
private Instant consumerLivenessInstant;
45+
3846
/**
3947
* Constructor for the LiveStreamObserverImpl class.
4048
*
@@ -43,15 +51,19 @@ public class LiveStreamObserverImpl implements LiveStreamObserver<BlockStreamSer
4351
*/
4452
public LiveStreamObserverImpl(
4553
final long timeoutThresholdMillis,
54+
final Clock producerLivenessClock,
55+
final Clock consumerLivenessClock,
4656
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> mediator,
4757
final StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
4858

59+
this.timeoutThresholdMillis = timeoutThresholdMillis;
60+
this.producerLivenessClock = producerLivenessClock;
61+
this.consumerLivenessClock = consumerLivenessClock;
4962
this.mediator = mediator;
5063
this.responseStreamObserver = responseStreamObserver;
5164

52-
this.timeoutThresholdMillis = timeoutThresholdMillis;
53-
this.consumerLivenessMillis = System.currentTimeMillis();
54-
this.producerLivenessMillis = System.currentTimeMillis();
65+
this.producerLivenessInstant = Instant.now(producerLivenessClock);
66+
this.consumerLivenessInstant = Instant.now(consumerLivenessClock);
5567
}
5668

5769
/**
@@ -62,13 +74,15 @@ public LiveStreamObserverImpl(
6274
@Override
6375
public void notify(final BlockStreamServiceGrpcProto.Block block) {
6476

65-
if (System.currentTimeMillis() - consumerLivenessMillis > timeoutThresholdMillis) {
77+
// Check if the consumer has timed out. If so, unsubscribe the observer from the mediator.
78+
if (Duration.between(consumerLivenessInstant, Instant.now(consumerLivenessClock)).toMillis() > timeoutThresholdMillis) {
6679
if (mediator.isSubscribed(this)) {
6780
LOGGER.log(System.Logger.Level.DEBUG, "Consumer timeout threshold exceeded. Unsubscribing observer.");
6881
mediator.unsubscribe(this);
6982
}
7083
} else {
71-
producerLivenessMillis = System.currentTimeMillis();
84+
// Refresh the producer liveness and pass the block to the observer.
85+
producerLivenessInstant = Instant.now(producerLivenessClock);
7286
responseStreamObserver.onNext(block);
7387
}
7488
}
@@ -81,14 +95,12 @@ public void notify(final BlockStreamServiceGrpcProto.Block block) {
8195
@Override
8296
public void onNext(final BlockStreamServiceGrpcProto.BlockResponse blockResponse) {
8397

84-
if (System.currentTimeMillis() - producerLivenessMillis > timeoutThresholdMillis) {
85-
if (mediator.isSubscribed(this)) {
86-
LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer.");
87-
mediator.unsubscribe(this);
88-
}
98+
if (Duration.between(producerLivenessInstant, Instant.now(producerLivenessClock)).toMillis() > timeoutThresholdMillis) {
99+
LOGGER.log(System.Logger.Level.DEBUG, "Producer timeout threshold exceeded. Unsubscribing observer.");
100+
mediator.unsubscribe(this);
89101
} else {
90102
LOGGER.log(System.Logger.Level.DEBUG, "Received response block " + blockResponse);
91-
consumerLivenessMillis = System.currentTimeMillis();
103+
consumerLivenessInstant = Instant.now(consumerLivenessClock);
92104
}
93105
}
94106

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright (C) 2024 Hedera Hashgraph, LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hedera.block.server.consumer;
18+
19+
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
20+
import com.hedera.block.server.mediator.StreamMediator;
21+
import io.grpc.stub.StreamObserver;
22+
import org.junit.jupiter.api.Test;
23+
import org.junit.jupiter.api.extension.ExtendWith;
24+
import org.mockito.Mock;
25+
import org.mockito.junit.jupiter.MockitoExtension;
26+
27+
import java.time.Clock;
28+
29+
import static org.mockito.Mockito.*;
30+
31+
@ExtendWith(MockitoExtension.class)
32+
public class LiveStreamObserverImplTest {
33+
34+
@Mock
35+
private StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
36+
37+
@Mock
38+
private StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver;
39+
40+
41+
@Test
42+
public void testConsumerTimeoutWithinWindow() {
43+
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
44+
50,
45+
Clock.systemDefaultZone(),
46+
Clock.systemDefaultZone(),
47+
streamMediator,
48+
responseStreamObserver);
49+
BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
50+
liveStreamObserver.notify(newBlock);
51+
52+
// verify the observer is called with the next
53+
// block and the stream mediator is not unsubscribed
54+
verify(responseStreamObserver).onNext(newBlock);
55+
verify(streamMediator, never()).unsubscribe(liveStreamObserver);
56+
}
57+
58+
@Test
59+
public void testConsumerTimeoutOutsideWindow() throws InterruptedException {
60+
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
61+
50,
62+
Clock.systemDefaultZone(),
63+
Clock.systemDefaultZone(),
64+
streamMediator,
65+
responseStreamObserver);
66+
67+
Thread.sleep(51);
68+
BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
69+
when(streamMediator.isSubscribed(liveStreamObserver)).thenReturn(true);
70+
liveStreamObserver.notify(newBlock);
71+
verify(streamMediator).unsubscribe(liveStreamObserver);
72+
}
73+
74+
@Test
75+
public void testProducerTimeoutWithinWindow() {
76+
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
77+
50,
78+
Clock.systemDefaultZone(),
79+
Clock.systemDefaultZone(),
80+
streamMediator,
81+
responseStreamObserver);
82+
83+
BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build();
84+
liveStreamObserver.onNext(blockResponse);
85+
86+
// verify the mediator is NOT called to unsubscribe the observer
87+
verify(streamMediator, never()).unsubscribe(liveStreamObserver);
88+
}
89+
90+
@Test
91+
public void testProducerTimeoutOutsideWindow() throws InterruptedException {
92+
final LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
93+
50,
94+
Clock.systemDefaultZone(),
95+
Clock.systemDefaultZone(),
96+
streamMediator,
97+
responseStreamObserver);
98+
99+
Thread.sleep(51);
100+
BlockStreamServiceGrpcProto.BlockResponse blockResponse = BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build();
101+
liveStreamObserver.onNext(blockResponse);
102+
103+
verify(streamMediator).unsubscribe(liveStreamObserver);
104+
}
105+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright (C) 2024 Hedera Hashgraph, LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hedera.block.server.mediator;
18+
19+
20+
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
21+
import com.hedera.block.server.consumer.LiveStreamObserver;
22+
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
23+
import com.hedera.block.server.persistence.cache.BlockCache;
24+
import com.hedera.block.server.persistence.storage.BlockStorage;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.extension.ExtendWith;
27+
import org.mockito.Mock;
28+
import org.mockito.junit.jupiter.MockitoExtension;
29+
30+
import static org.junit.jupiter.api.Assertions.assertFalse;
31+
import static org.junit.jupiter.api.Assertions.assertTrue;
32+
import static org.mockito.Mockito.verify;
33+
34+
@ExtendWith(MockitoExtension.class)
35+
public class LiveStreamMediatorImplTest {
36+
37+
@Mock
38+
private LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver1;
39+
40+
@Mock
41+
private LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver2;
42+
43+
@Mock
44+
private LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver3;
45+
46+
@Mock
47+
private BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage;
48+
49+
@Mock
50+
private BlockCache<BlockStreamServiceGrpcProto.Block> blockCache;
51+
52+
@Test
53+
public void testUnsubscribeAll() {
54+
55+
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
56+
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));
57+
58+
// Set up the subscribers
59+
streamMediator.subscribe(liveStreamObserver1);
60+
streamMediator.subscribe(liveStreamObserver2);
61+
streamMediator.subscribe(liveStreamObserver3);
62+
63+
assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed");
64+
assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed");
65+
assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed");
66+
67+
streamMediator.unsubscribeAll();
68+
69+
assertFalse(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have unsubscribed liveStreamObserver1");
70+
assertFalse(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have unsubscribed liveStreamObserver2");
71+
assertFalse(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have unsubscribed liveStreamObserver3");
72+
}
73+
74+
@Test
75+
public void testUnsubscribeEach() {
76+
77+
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
78+
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));
79+
80+
// Set up the subscribers
81+
streamMediator.subscribe(liveStreamObserver1);
82+
streamMediator.subscribe(liveStreamObserver2);
83+
streamMediator.subscribe(liveStreamObserver3);
84+
85+
assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed");
86+
assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed");
87+
assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed");
88+
89+
streamMediator.unsubscribe(liveStreamObserver1);
90+
assertFalse(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have unsubscribed liveStreamObserver1");
91+
92+
streamMediator.unsubscribe(liveStreamObserver2);
93+
assertFalse(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have unsubscribed liveStreamObserver2");
94+
95+
streamMediator.unsubscribe(liveStreamObserver3);
96+
assertFalse(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have unsubscribed liveStreamObserver3");
97+
}
98+
99+
@Test
100+
public void testMediatorPersistenceWithoutSubscribers() {
101+
102+
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
103+
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));
104+
105+
final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
106+
107+
// Acting as a producer, notify the mediator of a new block
108+
streamMediator.notifyAll(newBlock);
109+
110+
// Confirm the block was persisted to storage and cache
111+
// even though there are no subscribers
112+
verify(blockStorage).write(newBlock);
113+
verify(blockCache).insert(newBlock);
114+
}
115+
116+
@Test
117+
public void testMediatorNotifyAll() {
118+
119+
final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator =
120+
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache));
121+
122+
// Set up the subscribers
123+
streamMediator.subscribe(liveStreamObserver1);
124+
streamMediator.subscribe(liveStreamObserver2);
125+
streamMediator.subscribe(liveStreamObserver3);
126+
127+
assertTrue(streamMediator.isSubscribed(liveStreamObserver1), "Expected the mediator to have liveStreamObserver1 subscribed");
128+
assertTrue(streamMediator.isSubscribed(liveStreamObserver2), "Expected the mediator to have liveStreamObserver2 subscribed");
129+
assertTrue(streamMediator.isSubscribed(liveStreamObserver3), "Expected the mediator to have liveStreamObserver3 subscribed");
130+
131+
final BlockStreamServiceGrpcProto.Block newBlock = BlockStreamServiceGrpcProto.Block.newBuilder().build();
132+
133+
// Acting as a producer, notify the mediator of a new block
134+
streamMediator.notifyAll(newBlock);
135+
136+
// Confirm each subscriber was notified of the new block
137+
verify(liveStreamObserver1).notify(newBlock);
138+
verify(liveStreamObserver2).notify(newBlock);
139+
verify(liveStreamObserver3).notify(newBlock);
140+
141+
// Confirm the block was persisted to storage and cache
142+
verify(blockStorage).write(newBlock);
143+
verify(blockCache).insert(newBlock);
144+
}
145+
146+
}

settings.gradle.kts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ dependencyResolutionManagement {
4040

4141
// Testing only versions
4242
version("org.assertj.core", "3.23.1")
43-
version("org.junit.jupiter.api", "5.10.0")
43+
version("org.junit.jupiter.api", "5.10.2")
44+
version("org.mockito", "5.8.0")
45+
version("org.mockito.junit.jupiter", "5.8.0")
4446
}
4547
}
4648
}

0 commit comments

Comments
 (0)