25
25
import java .util .concurrent .Executors ;
26
26
import java .util .concurrent .LinkedBlockingQueue ;
27
27
import java .util .concurrent .TimeUnit ;
28
+ import java .util .function .Supplier ;
28
29
29
30
/** The default delaying queue implementation. */
30
31
public class DefaultDelayingQueue <T > extends DefaultWorkQueue <T > implements DelayingQueue <T > {
@@ -34,8 +35,10 @@ public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements Dela
34
35
private DelayQueue <WaitForEntry <T >> delayQueue ;
35
36
private ConcurrentMap <T , WaitForEntry <T >> waitingEntryByData ;
36
37
protected BlockingQueue <WaitForEntry <T >> waitingForAddQueue ;
38
+ private Supplier <Instant > timeSource ;
37
39
38
40
public DefaultDelayingQueue (ExecutorService waitingWorker ) {
41
+ this .timeSource = Instant ::now ;
39
42
this .delayQueue = new DelayQueue <>();
40
43
this .waitingEntryByData = new ConcurrentHashMap <>();
41
44
this .waitingForAddQueue = new LinkedBlockingQueue <>(1000 );
@@ -57,10 +60,16 @@ public void addAfter(T item, Duration duration) {
57
60
super .add (item );
58
61
return ;
59
62
}
60
- WaitForEntry <T > entry = new WaitForEntry <>(item , duration .addTo (Instant .now ()));
63
+ WaitForEntry <T > entry =
64
+ new WaitForEntry <>(item , duration .addTo (this .timeSource .get ()), this .timeSource );
61
65
this .waitingForAddQueue .offer (entry );
62
66
}
63
67
68
+ // Visible for testing
69
+ protected void injectTimeSource (Supplier <Instant > fn ) {
70
+ this .timeSource = fn ;
71
+ }
72
+
64
73
private void waitingLoop () {
65
74
try {
66
75
while (true ) {
@@ -78,7 +87,7 @@ private void waitingLoop() {
78
87
// a. if ready, remove it from the delay-queue and push it into underlying
79
88
// work-queue
80
89
// b. if not, refresh the next ready-at time.
81
- Instant now = Instant . now ();
90
+ Instant now = this . timeSource . get ();
82
91
if (!Duration .between (entry .readyAtMillis , now ).isNegative ()) {
83
92
delayQueue .remove (entry );
84
93
super .add (entry .data );
@@ -92,7 +101,7 @@ private void waitingLoop() {
92
101
WaitForEntry <T > waitForEntry =
93
102
waitingForAddQueue .poll (nextReadyAt .toMillis (), TimeUnit .MILLISECONDS );
94
103
if (waitForEntry != null ) {
95
- if (Duration .between (waitForEntry .readyAtMillis , Instant . now ()).isNegative ()) {
104
+ if (Duration .between (waitForEntry .readyAtMillis , this . timeSource . get ()).isNegative ()) {
96
105
// the item is not yet ready, insert it to the delay-queue
97
106
insert (this .delayQueue , this .waitingEntryByData , waitForEntry );
98
107
} else {
@@ -126,17 +135,19 @@ private void insert(
126
135
// WaitForEntry holds the data to add and the time it should be added.
127
136
private static class WaitForEntry <T > implements Delayed {
128
137
129
- private WaitForEntry (T data , Temporal readyAtMillis ) {
138
+ private WaitForEntry (T data , Temporal readyAtMillis , Supplier < Instant > timeSource ) {
130
139
this .data = data ;
131
140
this .readyAtMillis = readyAtMillis ;
141
+ this .timeSource = timeSource ;
132
142
}
133
143
134
144
private T data ;
135
145
private Temporal readyAtMillis ;
146
+ private Supplier <Instant > timeSource ;
136
147
137
148
@ Override
138
149
public long getDelay (TimeUnit unit ) {
139
- Duration duration = Duration .between (Instant . now (), readyAtMillis );
150
+ Duration duration = Duration .between (this . timeSource . get (), readyAtMillis );
140
151
return unit .convert (duration .toMillis (), TimeUnit .MILLISECONDS );
141
152
}
142
153
0 commit comments