Skip to content

Commit aa6a8db

Browse files
committed
[hotfix] Rename the buffer config options for async state processing
1 parent 4facda6 commit aa6a8db

File tree

6 files changed

+50
-49
lines changed

6 files changed

+50
-49
lines changed

Diff for: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -481,35 +481,35 @@ public void setUseSnapshotCompression(boolean useSnapshotCompression) {
481481
// --------------------------------------------------------------------------------------------
482482

483483
@Experimental
484-
public int getAsyncInflightRecordsLimit() {
485-
return configuration.get(ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT);
484+
public int getAsyncStateTotalBufferSize() {
485+
return configuration.get(ExecutionOptions.ASYNC_STATE_TOTAL_BUFFER_SIZE);
486486
}
487487

488488
@Experimental
489-
public ExecutionConfig setAsyncInflightRecordsLimit(int limit) {
490-
configuration.set(ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT, limit);
489+
public ExecutionConfig setAsyncStateTotalBufferSize(int limit) {
490+
configuration.set(ExecutionOptions.ASYNC_STATE_TOTAL_BUFFER_SIZE, limit);
491491
return this;
492492
}
493493

494494
@Experimental
495-
public int getAsyncStateBufferSize() {
496-
return configuration.get(ExecutionOptions.ASYNC_STATE_BUFFER_SIZE);
495+
public int getAsyncStateActiveBufferSize() {
496+
return configuration.get(ExecutionOptions.ASYNC_STATE_ACTIVE_BUFFER_SIZE);
497497
}
498498

499499
@Experimental
500-
public ExecutionConfig setAsyncStateBufferSize(int bufferSize) {
501-
configuration.set(ExecutionOptions.ASYNC_STATE_BUFFER_SIZE, bufferSize);
500+
public ExecutionConfig setAsyncStateActiveBufferSize(int bufferSize) {
501+
configuration.set(ExecutionOptions.ASYNC_STATE_ACTIVE_BUFFER_SIZE, bufferSize);
502502
return this;
503503
}
504504

505505
@Experimental
506-
public long getAsyncStateBufferTimeout() {
507-
return configuration.get(ExecutionOptions.ASYNC_STATE_BUFFER_TIMEOUT);
506+
public long getAsyncStateActiveBufferTimeout() {
507+
return configuration.get(ExecutionOptions.ASYNC_STATE_ACTIVE_BUFFER_TIMEOUT);
508508
}
509509

510510
@Experimental
511-
public ExecutionConfig setAsyncStateBufferTimeout(long timeout) {
512-
configuration.set(ExecutionOptions.ASYNC_STATE_BUFFER_TIMEOUT, timeout);
511+
public ExecutionConfig setAsyncStateActiveBufferTimeout(long timeout) {
512+
configuration.set(ExecutionOptions.ASYNC_STATE_ACTIVE_BUFFER_TIMEOUT, timeout);
513513
return this;
514514
}
515515

Diff for: flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ public class ExecutionOptions {
194194
@Experimental
195195
@Documentation.ExcludeFromDocumentation(
196196
"This is an experimental option, internal use only for now.")
197-
public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
198-
ConfigOptions.key("execution.async-state.in-flight-records-limit")
197+
public static final ConfigOption<Integer> ASYNC_STATE_TOTAL_BUFFER_SIZE =
198+
ConfigOptions.key("execution.async-state.total-buffer-size")
199199
.intType()
200200
.defaultValue(6000)
201201
.withDescription(
@@ -208,36 +208,36 @@ public class ExecutionOptions {
208208
* The size of buffer under async state execution. Async state execution provides a buffer
209209
* mechanism to reduce state access. When the number of state requests in the buffer exceeds the
210210
* batch size, a batched state execution would be triggered. Larger batch sizes will bring
211-
* higher end-to-end latency, this option works with {@link #ASYNC_STATE_BUFFER_TIMEOUT} to
212-
* control the frequency of triggering.
211+
* higher end-to-end latency, this option works with {@link #ASYNC_STATE_ACTIVE_BUFFER_TIMEOUT}
212+
* to control the frequency of triggering.
213213
*/
214214
@Experimental
215215
@Documentation.ExcludeFromDocumentation(
216216
"This is an experimental option, internal use only for now.")
217-
public static final ConfigOption<Integer> ASYNC_STATE_BUFFER_SIZE =
218-
ConfigOptions.key("execution.async-state.buffer-size")
217+
public static final ConfigOption<Integer> ASYNC_STATE_ACTIVE_BUFFER_SIZE =
218+
ConfigOptions.key("execution.async-state.active-buffer-size")
219219
.intType()
220220
.defaultValue(1000)
221221
.withDescription(
222222
"The size of buffer under async state execution. Async state execution provides a buffer mechanism to reduce state access."
223223
+ " When the number of state requests in the active buffer exceeds the batch size,"
224224
+ " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency,"
225-
+ " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering.");
225+
+ " this option works with 'execution.async-state.active-buffer-timeout' to control the frequency of triggering.");
226226

227227
/**
228228
* The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link
229-
* #ASYNC_STATE_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform
229+
* #ASYNC_STATE_ACTIVE_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform
230230
* actively.
231231
*/
232232
@Experimental
233233
@Documentation.ExcludeFromDocumentation(
234234
"This is an experimental option, internal use only for now.")
235-
public static final ConfigOption<Long> ASYNC_STATE_BUFFER_TIMEOUT =
236-
ConfigOptions.key("execution.async-state.buffer-timeout")
235+
public static final ConfigOption<Long> ASYNC_STATE_ACTIVE_BUFFER_TIMEOUT =
236+
ConfigOptions.key("execution.async-state.active-buffer-timeout")
237237
.longType()
238238
.defaultValue(1000L)
239239
.withDescription(
240240
"The timeout of buffer triggering in milliseconds. If the buffer has not reached the"
241-
+ " 'execution.async-state.buffer-size' within 'buffer-timeout' milliseconds,"
241+
+ " 'execution.async-state.active-buffer-size' within 'buffer-timeout' milliseconds,"
242242
+ " a trigger will perform actively.");
243243
}

Diff for: flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,11 @@ public final void beforeInitializeStateHandler() {
102102
final MailboxExecutor mailboxExecutor = environment.getMainMailboxExecutor();
103103
final int maxParallelism = environment.getTaskInfo().getMaxNumberOfParallelSubtasks();
104104
final int inFlightRecordsLimit =
105-
environment.getExecutionConfig().getAsyncInflightRecordsLimit();
106-
final int asyncBufferSize = environment.getExecutionConfig().getAsyncStateBufferSize();
105+
environment.getExecutionConfig().getAsyncStateTotalBufferSize();
106+
final int asyncBufferSize =
107+
environment.getExecutionConfig().getAsyncStateActiveBufferSize();
107108
final long asyncBufferTimeout =
108-
environment.getExecutionConfig().getAsyncStateBufferTimeout();
109+
environment.getExecutionConfig().getAsyncStateActiveBufferTimeout();
109110

110111
this.declarationManager = new DeclarationManager();
111112
if (isAsyncStateProcessingEnabled()) {

Diff for: flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ public final void beforeInitializeStateHandler() {
9999
((DefaultKeyedStateStore) stateStore).setSupportKeyedStateApiSetV2();
100100
}
101101

102-
final int inFlightRecordsLimit = getExecutionConfig().getAsyncInflightRecordsLimit();
103-
final int asyncBufferSize = getExecutionConfig().getAsyncStateBufferSize();
104-
final long asyncBufferTimeout = getExecutionConfig().getAsyncStateBufferTimeout();
102+
final int inFlightRecordsLimit = getExecutionConfig().getAsyncStateTotalBufferSize();
103+
final int asyncBufferSize = getExecutionConfig().getAsyncStateActiveBufferSize();
104+
final long asyncBufferTimeout = getExecutionConfig().getAsyncStateActiveBufferTimeout();
105105
int maxParallelism = getExecutionConfig().getMaxParallelism();
106106

107107
this.declarationManager = new DeclarationManager();

Diff for: flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ void testAsyncProcessWithKey() throws Exception {
198198
void testManyAsyncProcessWithKey() throws Exception {
199199
// This test is for verifying AsyncExecutionController could avoid deadlock for derived
200200
// processing requests.
201-
int requests = ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT.defaultValue() + 1;
201+
int requests = ExecutionOptions.ASYNC_STATE_TOTAL_BUFFER_SIZE.defaultValue() + 1;
202202
TestOperatorWithMultipleDirectAsyncProcess testOperator =
203203
new TestOperatorWithMultipleDirectAsyncProcess(ElementOrder.RECORD_ORDER, requests);
204204
AsyncKeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, String>

Diff for: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java

+19-19
Original file line numberDiff line numberDiff line change
@@ -468,28 +468,28 @@ void testAsyncExecutionConfigurations() {
468468
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
469469
env.configure(config, this.getClass().getClassLoader());
470470

471-
assertThat(env.getConfig().getAsyncInflightRecordsLimit())
472-
.isEqualTo(ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT.defaultValue());
473-
assertThat(env.getConfig().getAsyncStateBufferSize())
474-
.isEqualTo(ExecutionOptions.ASYNC_STATE_BUFFER_SIZE.defaultValue());
475-
assertThat(env.getConfig().getAsyncStateBufferTimeout())
476-
.isEqualTo(ExecutionOptions.ASYNC_STATE_BUFFER_TIMEOUT.defaultValue());
477-
478-
config.set(ExecutionOptions.ASYNC_INFLIGHT_RECORDS_LIMIT, 3);
479-
config.set(ExecutionOptions.ASYNC_STATE_BUFFER_SIZE, 2);
480-
config.set(ExecutionOptions.ASYNC_STATE_BUFFER_TIMEOUT, 1L);
471+
assertThat(env.getConfig().getAsyncStateTotalBufferSize())
472+
.isEqualTo(ExecutionOptions.ASYNC_STATE_TOTAL_BUFFER_SIZE.defaultValue());
473+
assertThat(env.getConfig().getAsyncStateActiveBufferSize())
474+
.isEqualTo(ExecutionOptions.ASYNC_STATE_ACTIVE_BUFFER_SIZE.defaultValue());
475+
assertThat(env.getConfig().getAsyncStateActiveBufferTimeout())
476+
.isEqualTo(ExecutionOptions.ASYNC_STATE_ACTIVE_BUFFER_TIMEOUT.defaultValue());
477+
478+
config.set(ExecutionOptions.ASYNC_STATE_TOTAL_BUFFER_SIZE, 3);
479+
config.set(ExecutionOptions.ASYNC_STATE_ACTIVE_BUFFER_SIZE, 2);
480+
config.set(ExecutionOptions.ASYNC_STATE_ACTIVE_BUFFER_TIMEOUT, 1L);
481481
env.configure(config, this.getClass().getClassLoader());
482-
assertThat(env.getConfig().getAsyncInflightRecordsLimit()).isEqualTo(3);
483-
assertThat(env.getConfig().getAsyncStateBufferSize()).isEqualTo(2);
484-
assertThat(env.getConfig().getAsyncStateBufferTimeout()).isEqualTo(1);
482+
assertThat(env.getConfig().getAsyncStateTotalBufferSize()).isEqualTo(3);
483+
assertThat(env.getConfig().getAsyncStateActiveBufferSize()).isEqualTo(2);
484+
assertThat(env.getConfig().getAsyncStateActiveBufferTimeout()).isEqualTo(1);
485485

486486
env.getConfig()
487-
.setAsyncInflightRecordsLimit(6)
488-
.setAsyncStateBufferSize(5)
489-
.setAsyncStateBufferTimeout(4);
490-
assertThat(env.getConfig().getAsyncInflightRecordsLimit()).isEqualTo(6);
491-
assertThat(env.getConfig().getAsyncStateBufferSize()).isEqualTo(5);
492-
assertThat(env.getConfig().getAsyncStateBufferTimeout()).isEqualTo(4);
487+
.setAsyncStateTotalBufferSize(6)
488+
.setAsyncStateActiveBufferSize(5)
489+
.setAsyncStateActiveBufferTimeout(4);
490+
assertThat(env.getConfig().getAsyncStateTotalBufferSize()).isEqualTo(6);
491+
assertThat(env.getConfig().getAsyncStateActiveBufferSize()).isEqualTo(5);
492+
assertThat(env.getConfig().getAsyncStateActiveBufferTimeout()).isEqualTo(4);
493493
}
494494

495495
private void testBufferTimeout(Configuration config, StreamExecutionEnvironment env) {

0 commit comments

Comments
 (0)