Skip to content

Commit 77111f7

Browse files
authored
[FLINK-35410][state] Avoid sync waiting in coordinator thread of ForSt executor (apache#24819)
1 parent d43aeaf commit 77111f7

File tree

2 files changed

+19
-22
lines changed

2 files changed

+19
-22
lines changed

flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
2222
import org.apache.flink.runtime.asyncprocessing.StateRequest;
2323
import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
24-
import org.apache.flink.util.FlinkRuntimeException;
2524
import org.apache.flink.util.Preconditions;
2625
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
2726
import org.apache.flink.util.concurrent.FutureUtils;
@@ -34,7 +33,6 @@
3433
import java.util.ArrayList;
3534
import java.util.List;
3635
import java.util.concurrent.CompletableFuture;
37-
import java.util.concurrent.ExecutionException;
3836
import java.util.concurrent.ExecutorService;
3937
import java.util.concurrent.Executors;
4038

@@ -77,7 +75,8 @@ public CompletableFuture<Void> executeBatchRequests(
7775
Preconditions.checkArgument(stateRequestContainer instanceof ForStStateRequestClassifier);
7876
ForStStateRequestClassifier stateRequestClassifier =
7977
(ForStStateRequestClassifier) stateRequestContainer;
80-
return CompletableFuture.supplyAsync(
78+
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
79+
coordinatorThread.execute(
8180
() -> {
8281
long startTime = System.currentTimeMillis();
8382
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
@@ -98,22 +97,20 @@ public CompletableFuture<Void> executeBatchRequests(
9897
futures.add(getOperations.process());
9998
}
10099

101-
try {
102-
FutureUtils.waitForAll(futures).get();
103-
} catch (InterruptedException | ExecutionException e) {
104-
throw new FlinkRuntimeException(
105-
"Exception when executing ForSt writeBatch or multiGet operation",
106-
e);
107-
}
108-
long duration = System.currentTimeMillis() - startTime;
109-
LOG.debug(
110-
"Complete executing a batch of state requests, putRequest size {}, getRequest size {}, duration {} ms",
111-
putRequests.size(),
112-
getRequests.size(),
113-
duration);
114-
return null;
115-
},
116-
coordinatorThread);
100+
FutureUtils.combineAll(futures)
101+
.thenAcceptAsync(
102+
(e) -> {
103+
long duration = System.currentTimeMillis() - startTime;
104+
LOG.debug(
105+
"Complete executing a batch of state requests, putRequest size {}, getRequest size {}, duration {} ms",
106+
putRequests.size(),
107+
getRequests.size(),
108+
duration);
109+
resultFuture.complete(null);
110+
},
111+
coordinatorThread);
112+
});
113+
return resultFuture;
117114
}
118115

119116
@Override

flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateExecutorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void testExecuteValueStateRequest() throws Exception {
5757
buildStateRequest(state, StateRequestType.VALUE_UPDATE, i, "test-" + i, i * 2));
5858
}
5959

60-
forStStateExecutor.executeBatchRequests(stateRequestContainer);
60+
forStStateExecutor.executeBatchRequests(stateRequestContainer).get();
6161

6262
List<StateRequest<?, ?, ?>> checkList = new ArrayList<>();
6363
stateRequestContainer = forStStateExecutor.createStateRequestContainer();
@@ -99,7 +99,7 @@ public void testExecuteValueStateRequest() throws Exception {
9999
stateRequestContainer.offer(
100100
buildStateRequest(state, StateRequestType.VALUE_UPDATE, i, null, i * 2));
101101
}
102-
forStStateExecutor.executeBatchRequests(stateRequestContainer);
102+
forStStateExecutor.executeBatchRequests(stateRequestContainer).get();
103103

104104
// 5. Check that the deleted value is null : keyRange [keyNum - 100, keyNum + 100)
105105
stateRequestContainer = forStStateExecutor.createStateRequestContainer();
@@ -111,7 +111,7 @@ public void testExecuteValueStateRequest() throws Exception {
111111
stateRequestContainer.offer(getRequest);
112112
checkList.add(getRequest);
113113
}
114-
forStStateExecutor.executeBatchRequests(stateRequestContainer);
114+
forStStateExecutor.executeBatchRequests(stateRequestContainer).get();
115115
for (StateRequest<?, ?, ?> getRequest : checkList) {
116116
assertThat(getRequest.getRequestType()).isEqualTo(StateRequestType.VALUE_GET);
117117
assertThat(((TestStateFuture<String>) getRequest.getFuture()).getCompletedResult())

0 commit comments

Comments
 (0)