17
17
18
18
package org .apache .seatunnel .translation .flink .source ;
19
19
20
+ import org .apache .seatunnel .shade .com .google .common .util .concurrent .ThreadFactoryBuilder ;
20
21
import org .apache .seatunnel .shade .com .typesafe .config .Config ;
21
22
22
23
import org .apache .seatunnel .api .source .SourceSplit ;
34
35
35
36
import java .util .List ;
36
37
import java .util .concurrent .CompletableFuture ;
38
+ import java .util .concurrent .Executors ;
39
+ import java .util .concurrent .ScheduledExecutorService ;
40
+ import java .util .concurrent .TimeUnit ;
37
41
import java .util .stream .Collectors ;
38
42
39
43
/**
@@ -55,10 +59,25 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
55
59
56
60
private InputStatus inputStatus = InputStatus .MORE_AVAILABLE ;
57
61
62
+ private volatile CompletableFuture <Void > availabilityFuture ;
63
+
64
+ private static final long DEFAULT_WAIT_TIME_MILLIS = 1000L ;
65
+
66
+ private final ScheduledExecutorService scheduledExecutor ;
67
+
58
68
public FlinkSourceReader (
59
69
org .apache .seatunnel .api .source .SourceReader <SeaTunnelRow , SplitT > sourceReader ,
60
70
org .apache .seatunnel .api .source .SourceReader .Context context ,
61
71
Config envConfig ) {
72
+ this .scheduledExecutor =
73
+ Executors .newSingleThreadScheduledExecutor (
74
+ new ThreadFactoryBuilder ()
75
+ .setDaemon (true )
76
+ .setNameFormat (
77
+ String .format (
78
+ "source-reader-scheduler-%d" ,
79
+ context .getIndexOfSubtask ()))
80
+ .build ());
62
81
this .sourceReader = sourceReader ;
63
82
this .context = context ;
64
83
this .flinkRowCollector = new FlinkRowCollector (envConfig , context .getMetricsContext ());
@@ -78,9 +97,19 @@ public void start() {
78
97
public InputStatus pollNext (ReaderOutput <SeaTunnelRow > output ) throws Exception {
79
98
if (!((FlinkSourceReaderContext ) context ).isSendNoMoreElementEvent ()) {
80
99
sourceReader .pollNext (flinkRowCollector .withReaderOutput (output ));
100
+ if (flinkRowCollector .isEmptyThisPollNext ()) {
101
+ synchronized (this ) {
102
+ if (availabilityFuture == null || availabilityFuture .isDone ()) {
103
+ availabilityFuture = new CompletableFuture <>();
104
+ scheduleComplete (availabilityFuture );
105
+ LOGGER .debug ("No data available, wait for next poll." );
106
+ }
107
+ }
108
+ return InputStatus .NOTHING_AVAILABLE ;
109
+ }
81
110
} else {
82
111
// reduce CPU idle
83
- Thread .sleep (1000L );
112
+ Thread .sleep (DEFAULT_WAIT_TIME_MILLIS );
84
113
}
85
114
return inputStatus ;
86
115
}
@@ -97,7 +126,8 @@ public List<SplitWrapper<SplitT>> snapshotState(long checkpointId) {
97
126
98
127
@ Override
99
128
public CompletableFuture <Void > isAvailable () {
100
- return CompletableFuture .completedFuture (null );
129
+ CompletableFuture <Void > future = availabilityFuture ;
130
+ return future != null ? future : CompletableFuture .completedFuture (null );
101
131
}
102
132
103
133
@ Override
@@ -123,8 +153,13 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
123
153
124
154
@ Override
125
155
public void close () throws Exception {
156
+ CompletableFuture <Void > future = availabilityFuture ;
157
+ if (future != null && !future .isDone ()) {
158
+ future .complete (null );
159
+ }
126
160
sourceReader .close ();
127
161
context .getEventListener ().onEvent (new ReaderCloseEvent ());
162
+ scheduledExecutor .shutdown ();
128
163
}
129
164
130
165
@ Override
@@ -136,4 +171,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
136
171
public void notifyCheckpointAborted (long checkpointId ) throws Exception {
137
172
sourceReader .notifyCheckpointAborted (checkpointId );
138
173
}
174
+
175
+ private void scheduleComplete (CompletableFuture <Void > future ) {
176
+ scheduledExecutor .schedule (
177
+ () -> future .complete (null ), DEFAULT_WAIT_TIME_MILLIS , TimeUnit .MILLISECONDS );
178
+ }
139
179
}
0 commit comments