Skip to content

Commit ae707a0

Browse files
ktosoviktorklang
authored andcommitted
=tck Improve Promise to be race-free and avoiding queue if not needed
1 parent 3c82229 commit ae707a0

File tree

1 file changed

+15
-11
lines changed

1 file changed

+15
-11
lines changed

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CountDownLatch;
2626
import java.util.concurrent.TimeUnit;
2727
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicReference;
2829

2930
import static org.testng.Assert.assertTrue;
3031
import static org.testng.Assert.fail;
@@ -192,7 +193,7 @@ public void flop(Throwable thr, String msg) {
192193
asyncErrors.add(thr);
193194
}
194195
}
195-
196+
196197
/**
197198
* To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
198199
* This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
@@ -886,34 +887,39 @@ public Promise(TestEnvironment env) {
886887
}
887888

888889
private ArrayBlockingQueue<T> abq = new ArrayBlockingQueue<T>(1);
889-
private volatile T _value = null;
890+
private AtomicReference<T> _value = new AtomicReference<T>();
890891

891892
public T value() {
892-
if (isCompleted()) {
893-
return _value;
893+
final T value = _value.get();
894+
if (value != null) {
895+
return value;
894896
} else {
895897
env.flop("Cannot access promise value before completion");
896898
return null;
897899
}
898900
}
899901

900902
public boolean isCompleted() {
901-
return _value != null;
903+
return _value.get() != null;
902904
}
903905

904906
/**
905907
* Allows using expectCompletion to await for completion of the value and complete it _then_
906908
*/
907909
public void complete(T value) {
908-
abq.add(value);
910+
if (_value.compareAndSet(null, value)) {
911+
// we add the value to the queue such to wake up any expectCompletion which was triggered before complete() was called
912+
abq.add(value);
913+
}
909914
}
910915

911916
/**
912-
* Completes the promise right away, it is not possible to expectCompletion on a Promise completed this way
917+
* Same as complete.
918+
*
919+
* Keeping this method for binary compatibility.
913920
*/
914921
public void completeImmediatly(T value) {
915-
complete(value); // complete!
916-
_value = value; // immediatly!
922+
complete(value);
917923
}
918924

919925
public void expectCompletion(long timeoutMillis, String errorMsg) throws InterruptedException {
@@ -922,8 +928,6 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
922928

923929
if (val == null) {
924930
env.flop(String.format("%s within %d ms", errorMsg, timeoutMillis));
925-
} else {
926-
_value = val;
927931
}
928932
}
929933
}

0 commit comments

Comments
 (0)