@@ -62,6 +62,7 @@ public class BulkIngester<Context> implements AutoCloseable {
62
62
63
63
private @ Nullable ScheduledFuture <?> flushTask ;
64
64
private @ Nullable ScheduledExecutorService scheduler ;
65
+ private boolean isExternalScheduler = false ;
65
66
66
67
// Current state
67
68
private List <BulkOperation > operations = new ArrayList <>();
@@ -101,25 +102,20 @@ private BulkIngester(Builder<Context> builder) {
101
102
this .listener = builder .listener ;
102
103
this .flushIntervalMillis = builder .flushIntervalMillis ;
103
104
104
- // Create a scheduler if needed
105
- ScheduledExecutorService scheduler = null ;
106
105
if (flushIntervalMillis != null || listener != null ) {
107
-
106
+ // Create a scheduler if needed
108
107
if (builder .scheduler == null ) {
109
- scheduler = Executors .newScheduledThreadPool (maxRequests + 1 , (r ) -> {
108
+ this . scheduler = Executors .newScheduledThreadPool (maxRequests + 1 , (r ) -> {
110
109
Thread t = Executors .defaultThreadFactory ().newThread (r );
111
110
t .setName ("bulk-ingester-executor#" + ingesterId + "#" + t .getId ());
112
111
t .setDaemon (true );
113
112
return t ;
114
113
});
115
-
116
- // Keep it, we'll have to close it.
117
- this .scheduler = scheduler ;
118
114
} else {
119
115
// It's not ours, we will not close it.
120
- scheduler = builder .scheduler ;
116
+ this .scheduler = builder .scheduler ;
117
+ this .isExternalScheduler = true ;
121
118
}
122
-
123
119
}
124
120
125
121
if (flushIntervalMillis != null ) {
@@ -398,7 +394,7 @@ public void close() {
398
394
flushTask .cancel (false );
399
395
}
400
396
401
- if (scheduler != null ) {
397
+ if (scheduler != null && ! isExternalScheduler ) {
402
398
scheduler .shutdownNow ();
403
399
}
404
400
}
0 commit comments