20
20
import static org .awaitility .Awaitility .await ;
21
21
22
22
import java .util .concurrent .CompletableFuture ;
23
- import java .util .concurrent .Executors ;
24
- import java .util .concurrent .ScheduledExecutorService ;
25
23
import java .util .concurrent .atomic .AtomicBoolean ;
26
24
import java .util .concurrent .atomic .AtomicReference ;
27
25
33
31
import com .linecorp .armeria .client .grpc .GrpcClients ;
34
32
import com .linecorp .armeria .common .FilteredHttpRequest ;
35
33
import com .linecorp .armeria .common .HttpObject ;
36
- import com .linecorp .armeria .common .util .BlockingTaskExecutor ;
37
34
import com .linecorp .armeria .server .ServerBuilder ;
38
35
import com .linecorp .armeria .server .grpc .GrpcService ;
39
36
import com .linecorp .armeria .testing .junit5 .server .ServerExtension ;
@@ -57,12 +54,6 @@ class AbstractServerCallTest {
57
54
static final ServerExtension server = new ServerExtension () {
58
55
@ Override
59
56
protected void configure (ServerBuilder sb ) throws Exception {
60
- // Use a single-threaded executor to run the blocking task once at a time without accessing
61
- // the sequential executor.
62
- final ScheduledExecutorService scheduledExecutorService =
63
- Executors .newSingleThreadScheduledExecutor ();
64
- sb .blockingTaskExecutor (BlockingTaskExecutor .of (scheduledExecutorService ), true );
65
-
66
57
final AtomicReference <ServerCall <?, ?>> serverCallCaptor = new AtomicReference <>();
67
58
final GrpcService grpcService =
68
59
GrpcService .builder ()
@@ -88,12 +79,12 @@ public <T, U> Listener<T> interceptCall(
88
79
protected void beforeSubscribe (Subscriber <? super HttpObject > subscriber ,
89
80
Subscription subscription ) {
90
81
// This is called right before
91
- // blockingExecutor.execute(() -> invokeOnMessage(request, endOfStream)); is called
82
+ // blockingExecutor.execute(() -> invokeOnMessage(request, endOfStream));
92
83
// in AbstractServerCall.
93
84
// https://github.com/line/armeria/blob/0960d091bfc7f350c17e68f57cc627de584b9705/grpc/src/main/java/com/linecorp/armeria/internal/server/grpc/AbstractServerCall.java#L363
94
- scheduledExecutorService . execute (() -> {
95
- final ServerCall <?, ?> serverCall = serverCallCaptor . get ( );
96
- assertThat ( serverCall ).isNotNull ();
85
+ final ServerCall <?, ?> serverCall = serverCallCaptor . get ();
86
+ assertThat ( serverCall ). isInstanceOf ( AbstractServerCall . class );
87
+ (( AbstractServerCall <?, ?>) serverCall ).blockingExecutor . execute (() -> {
97
88
// invokeOnMessage is not called until the request is cancelled.
98
89
await ().until (serverCall ::isCancelled );
99
90
// Now, AbstractServerCall.invokeOnMessage() is called and it doesn't call
0 commit comments