19
19
import org .elasticsearch .action .search .SearchResponse ;
20
20
import org .elasticsearch .action .search .TransportSearchAction ;
21
21
import org .elasticsearch .action .support .PlainActionFuture ;
22
+ import org .elasticsearch .action .support .SubscribableListener ;
22
23
import org .elasticsearch .client .internal .node .NodeClient ;
23
24
import org .elasticsearch .common .settings .Settings ;
24
25
import org .elasticsearch .common .util .set .Sets ;
44
45
import java .util .concurrent .atomic .AtomicBoolean ;
45
46
import java .util .concurrent .atomic .AtomicInteger ;
46
47
import java .util .concurrent .atomic .AtomicLong ;
47
- import java .util .concurrent .atomic .AtomicReference ;
48
48
import java .util .function .LongSupplier ;
49
49
50
50
public class RestCancellableNodeClientTests extends ESTestCase {
@@ -79,7 +79,9 @@ public void testCompletedTasks() throws Exception {
79
79
for (int j = 0 ; j < numTasks ; j ++) {
80
80
PlainActionFuture <SearchResponse > actionFuture = new PlainActionFuture <>();
81
81
RestCancellableNodeClient client = new RestCancellableNodeClient (testClient , channel );
82
- threadPool .generic ().submit (() -> client .execute (TransportSearchAction .TYPE , new SearchRequest (), actionFuture ));
82
+ futures .add (
83
+ threadPool .generic ().submit (() -> client .execute (TransportSearchAction .TYPE , new SearchRequest (), actionFuture ))
84
+ );
83
85
futures .add (actionFuture );
84
86
}
85
87
}
@@ -150,7 +152,7 @@ public void testChannelAlreadyClosed() {
150
152
assertEquals (totalSearches , testClient .cancelledTasks .size ());
151
153
}
152
154
153
- public void testConcurrentExecuteAndClose () throws Exception {
155
+ public void testConcurrentExecuteAndClose () {
154
156
final var testClient = new TestClient (Settings .EMPTY , threadPool , true );
155
157
int initialHttpChannels = RestCancellableNodeClient .getNumChannels ();
156
158
int numTasks = randomIntBetween (1 , 30 );
@@ -254,7 +256,7 @@ public String getLocalNodeId() {
254
256
255
257
private class TestHttpChannel implements HttpChannel {
256
258
private final AtomicBoolean open = new AtomicBoolean (true );
257
- private final AtomicReference <ActionListener <Void >> closeListener = new AtomicReference <>();
259
+ private final SubscribableListener <ActionListener <Void >> closeListener = new SubscribableListener <>();
258
260
private final CountDownLatch closeLatch = new CountDownLatch (1 );
259
261
260
262
@ Override
@@ -273,8 +275,7 @@ public InetSocketAddress getRemoteAddress() {
273
275
@ Override
274
276
public void close () {
275
277
assertTrue ("HttpChannel is already closed" , open .compareAndSet (true , false ));
276
- ActionListener <Void > listener = closeListener .get ();
277
- if (listener != null ) {
278
+ closeListener .andThenAccept (listener -> {
278
279
boolean failure = randomBoolean ();
279
280
threadPool .generic ().submit (() -> {
280
281
if (failure ) {
@@ -284,11 +285,10 @@ public void close() {
284
285
}
285
286
closeLatch .countDown ();
286
287
});
287
- }
288
+ });
288
289
}
289
290
290
291
private void awaitClose () throws InterruptedException {
291
- assertNotNull ("must set closeListener before calling awaitClose" , closeListener .get ());
292
292
close ();
293
293
closeLatch .await ();
294
294
}
@@ -304,9 +304,8 @@ public void addCloseListener(ActionListener<Void> listener) {
304
304
if (open .get () == false ) {
305
305
listener .onResponse (null );
306
306
} else {
307
- if (closeListener .compareAndSet (null , listener ) == false ) {
308
- throw new AssertionError ("close listener already set, only one is allowed!" );
309
- }
307
+ assertFalse ("close listener already set, only one is allowed!" , closeListener .isDone ());
308
+ closeListener .onResponse (ActionListener .assertOnce (listener ));
310
309
}
311
310
}
312
311
}
0 commit comments