Skip to content

Commit 68e4c4c

Browse files
Add defaults for PollerBehaviorAutoscaling (#2574)
Add defaults for PollerBehaviorAutoscaling
1 parent 40d148c commit 68e4c4c

File tree

7 files changed

+128
-27
lines changed

7 files changed

+128
-27
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ public boolean start() {
103103
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
104104
log.info("Starting async poller: {}", asyncTaskPoller.getLabel());
105105
AdjustableSemaphore pollerSemaphore =
106-
new AdjustableSemaphore(pollerBehavior.getInitialMaxConcurrentTaskPollers());
106+
new AdjustableSemaphore(pollerBehavior.getInitialConcurrentTaskPollers());
107107
PollScaleReportHandle<T> pollScaleReportHandle =
108108
new PollScaleReportHandle<>(
109109
pollerBehavior.getMinConcurrentTaskPollers(),
110110
pollerBehavior.getMaxConcurrentTaskPollers(),
111-
pollerBehavior.getInitialMaxConcurrentTaskPollers(),
111+
pollerBehavior.getInitialConcurrentTaskPollers(),
112112
(newTarget) -> {
113113
log.debug(
114114
"Updating maximum number of pollers for {} to: {}",

temporal-sdk/src/main/java/io/temporal/worker/tuning/PollerBehaviorAutoscaling.java

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.temporal.common.Experimental;
44
import java.util.Objects;
5+
import javax.annotation.Nullable;
56

67
/**
78
* A poller behavior that will automatically scale the number of pollers based on feedback from the
@@ -16,17 +17,40 @@ public final class PollerBehaviorAutoscaling implements PollerBehavior {
1617
private final int maxConcurrentTaskPollers;
1718
private final int initialConcurrentTaskPollers;
1819

20+
/**
21+
* Creates a new PollerBehaviorAutoscaling with default parameters.
22+
*
23+
* <p>Default parameters are:
24+
*
25+
* <ul>
26+
* <li>minConcurrentTaskPollers = 1
27+
* <li>maxConcurrentTaskPollers = 100
28+
* <li>initialConcurrentTaskPollers = 5
29+
*/
30+
public PollerBehaviorAutoscaling() {
31+
this(null, null, null);
32+
}
33+
1934
/**
2035
* Creates a new PollerBehaviorAutoscaling with the specified parameters.
2136
*
22-
* @param minConcurrentTaskPollers Minimum number of concurrent task pollers.
23-
* @param maxConcurrentTaskPollers Maximum number of concurrent task pollers.
24-
* @param initialConcurrentTaskPollers Initial number of concurrent task pollers.
37+
* @param minConcurrentTaskPollers Minimum number of concurrent task pollers. Default is 1.
38+
* @param maxConcurrentTaskPollers Maximum number of concurrent task pollers. Default is 100.
39+
* @param initialConcurrentTaskPollers Initial number of concurrent task pollers. Default is 5.
2540
*/
2641
public PollerBehaviorAutoscaling(
27-
int minConcurrentTaskPollers,
28-
int maxConcurrentTaskPollers,
29-
int initialConcurrentTaskPollers) {
42+
@Nullable Integer minConcurrentTaskPollers,
43+
@Nullable Integer maxConcurrentTaskPollers,
44+
@Nullable Integer initialConcurrentTaskPollers) {
45+
if (minConcurrentTaskPollers == null) {
46+
minConcurrentTaskPollers = 1;
47+
}
48+
if (maxConcurrentTaskPollers == null) {
49+
maxConcurrentTaskPollers = 100;
50+
}
51+
if (initialConcurrentTaskPollers == null) {
52+
initialConcurrentTaskPollers = 5;
53+
}
3054
if (minConcurrentTaskPollers < 1) {
3155
throw new IllegalArgumentException("minConcurrentTaskPollers must be at least 1");
3256
}
@@ -67,7 +91,7 @@ public int getMaxConcurrentTaskPollers() {
6791
*
6892
* @return Initial number of concurrent task pollers.
6993
*/
70-
public int getInitialMaxConcurrentTaskPollers() {
94+
public int getInitialConcurrentTaskPollers() {
7195
return initialConcurrentTaskPollers;
7296
}
7397

temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanNexusWorkerShutdownTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class CleanNexusWorkerShutdownTest {
4747
public static Collection<PollerBehavior> data() {
4848
return Arrays.asList(
4949
new PollerBehavior[] {
50-
new PollerBehaviorSimpleMaximum(10), new PollerBehaviorAutoscaling(1, 10, 5),
50+
new PollerBehaviorSimpleMaximum(10), new PollerBehaviorAutoscaling(),
5151
});
5252
}
5353

temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/properties/WorkerProperties.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import io.temporal.common.WorkerDeploymentVersion;
55
import io.temporal.worker.WorkerDeploymentOptions;
66
import io.temporal.worker.tuning.PollerBehavior;
7-
import io.temporal.worker.tuning.PollerBehaviorAutoscaling;
87
import java.util.Collection;
98
import javax.annotation.Nonnull;
109
import javax.annotation.Nullable;
@@ -97,19 +96,62 @@ public WorkerDeploymentConfigurationProperties getDeploymentProperties() {
9796
}
9897

9998
public static class PollerConfigurationProperties {
100-
private final @Nullable PollerBehaviorAutoscaling pollerBehaviorAutoscaling;
99+
public static class PollerBehaviorAutoscalingConfiguration {
100+
private final Boolean enabled;
101+
private final Integer minConcurrentTaskPollers;
102+
private final Integer maxConcurrentTaskPollers;
103+
private final Integer initialConcurrentTaskPollers;
104+
105+
@ConstructorBinding
106+
public PollerBehaviorAutoscalingConfiguration(
107+
@Nullable Boolean enabled,
108+
@Nullable Integer minConcurrentTaskPollers,
109+
@Nullable Integer maxConcurrentTaskPollers,
110+
@Nullable Integer initialConcurrentTaskPollers) {
111+
this.enabled = enabled;
112+
this.minConcurrentTaskPollers = minConcurrentTaskPollers;
113+
this.maxConcurrentTaskPollers = maxConcurrentTaskPollers;
114+
this.initialConcurrentTaskPollers = initialConcurrentTaskPollers;
115+
}
116+
117+
@Nullable
118+
public Boolean isEnabled() {
119+
// If enabled is true or any of the other parameters are set, then autoscaling is enabled.
120+
return Boolean.TRUE.equals(enabled)
121+
|| minConcurrentTaskPollers != null
122+
|| maxConcurrentTaskPollers != null
123+
|| initialConcurrentTaskPollers != null;
124+
}
125+
126+
@Nullable
127+
public Integer getMinConcurrentTaskPollers() {
128+
return minConcurrentTaskPollers;
129+
}
130+
131+
@Nullable
132+
public Integer getMaxConcurrentTaskPollers() {
133+
return maxConcurrentTaskPollers;
134+
}
135+
136+
@Nullable
137+
public Integer getInitialConcurrentTaskPollers() {
138+
return initialConcurrentTaskPollers;
139+
}
140+
}
141+
142+
private final @Nullable PollerBehaviorAutoscalingConfiguration pollerBehaviorAutoscaling;
101143

102144
/**
103145
* @param pollerBehaviorAutoscaling defines poller behavior for autoscaling
104146
*/
105147
@ConstructorBinding
106148
public PollerConfigurationProperties(
107-
@Nullable PollerBehaviorAutoscaling pollerBehaviorAutoscaling) {
149+
@Nullable PollerBehaviorAutoscalingConfiguration pollerBehaviorAutoscaling) {
108150
this.pollerBehaviorAutoscaling = pollerBehaviorAutoscaling;
109151
}
110152

111153
@Nullable
112-
public PollerBehaviorAutoscaling getPollerBehaviorAutoscaling() {
154+
public PollerBehaviorAutoscalingConfiguration getPollerBehaviorAutoscaling() {
113155
return pollerBehaviorAutoscaling;
114156
}
115157
}

temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkerOptionsTemplate.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.temporal.spring.boot.autoconfigure.properties.WorkerProperties;
77
import io.temporal.worker.WorkerDeploymentOptions;
88
import io.temporal.worker.WorkerOptions;
9+
import io.temporal.worker.tuning.PollerBehaviorAutoscaling;
910
import java.util.Optional;
1011
import javax.annotation.Nonnull;
1112
import javax.annotation.Nullable;
@@ -50,25 +51,46 @@ WorkerOptions createWorkerOptions() {
5051
Optional.ofNullable(threadsConfiguration.getMaxConcurrentNexusTaskPollers())
5152
.ifPresent(options::setMaxConcurrentNexusTaskPollers);
5253
if (threadsConfiguration.getWorkflowTaskPollersConfiguration() != null) {
53-
Optional.ofNullable(
54+
WorkerProperties.PollerConfigurationProperties.PollerBehaviorAutoscalingConfiguration
55+
pollerBehaviorAutoscaling =
5456
threadsConfiguration
5557
.getWorkflowTaskPollersConfiguration()
56-
.getPollerBehaviorAutoscaling())
57-
.ifPresent(options::setWorkflowTaskPollersBehavior);
58+
.getPollerBehaviorAutoscaling();
59+
if (pollerBehaviorAutoscaling != null && pollerBehaviorAutoscaling.isEnabled()) {
60+
options.setWorkflowTaskPollersBehavior(
61+
new PollerBehaviorAutoscaling(
62+
pollerBehaviorAutoscaling.getMinConcurrentTaskPollers(),
63+
pollerBehaviorAutoscaling.getMaxConcurrentTaskPollers(),
64+
pollerBehaviorAutoscaling.getInitialConcurrentTaskPollers()));
65+
}
5866
}
5967
if (threadsConfiguration.getActivityTaskPollersConfiguration() != null) {
60-
Optional.ofNullable(
68+
WorkerProperties.PollerConfigurationProperties.PollerBehaviorAutoscalingConfiguration
69+
pollerBehaviorAutoscaling =
6170
threadsConfiguration
6271
.getActivityTaskPollersConfiguration()
63-
.getPollerBehaviorAutoscaling())
64-
.ifPresent(options::setActivityTaskPollersBehavior);
72+
.getPollerBehaviorAutoscaling();
73+
if (pollerBehaviorAutoscaling != null && pollerBehaviorAutoscaling.isEnabled()) {
74+
options.setActivityTaskPollersBehavior(
75+
new PollerBehaviorAutoscaling(
76+
pollerBehaviorAutoscaling.getMinConcurrentTaskPollers(),
77+
pollerBehaviorAutoscaling.getMaxConcurrentTaskPollers(),
78+
pollerBehaviorAutoscaling.getInitialConcurrentTaskPollers()));
79+
}
6580
}
6681
if (threadsConfiguration.getNexusTaskPollersConfiguration() != null) {
67-
Optional.ofNullable(
82+
WorkerProperties.PollerConfigurationProperties.PollerBehaviorAutoscalingConfiguration
83+
pollerBehaviorAutoscaling =
6884
threadsConfiguration
6985
.getNexusTaskPollersConfiguration()
70-
.getPollerBehaviorAutoscaling())
71-
.ifPresent(options::setNexusTaskPollersBehavior);
86+
.getPollerBehaviorAutoscaling();
87+
if (pollerBehaviorAutoscaling != null && pollerBehaviorAutoscaling.isEnabled()) {
88+
options.setNexusTaskPollersBehavior(
89+
new PollerBehaviorAutoscaling(
90+
pollerBehaviorAutoscaling.getMinConcurrentTaskPollers(),
91+
pollerBehaviorAutoscaling.getMaxConcurrentTaskPollers(),
92+
pollerBehaviorAutoscaling.getInitialConcurrentTaskPollers()));
93+
}
7294
}
7395
}
7496

temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/OptionalWorkerOptionsTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,23 @@ public TemporalOptionsCustomizer<WorkerOptions.Builder> workerCustomizer() {
114114
"Values from the Spring Config should be respected");
115115
assertEquals(
116116
5,
117-
autoscaling.getInitialMaxConcurrentTaskPollers(),
117+
autoscaling.getInitialConcurrentTaskPollers(),
118118
"Values from the Spring Config should be respected");
119+
assertNotNull(options.getActivityTaskPollersBehavior());
120+
assertInstanceOf(
121+
PollerBehaviorAutoscaling.class, options.getActivityTaskPollersBehavior());
122+
autoscaling = (PollerBehaviorAutoscaling) options.getActivityTaskPollersBehavior();
119123
assertEquals(
120124
1,
121-
options.getMaxConcurrentActivityTaskPollers(),
125+
autoscaling.getMinConcurrentTaskPollers(),
126+
"Values from the Spring Config should be respected");
127+
assertEquals(
128+
100,
129+
autoscaling.getMaxConcurrentTaskPollers(),
130+
"Values from the Spring Config should be respected");
131+
assertEquals(
132+
5,
133+
autoscaling.getInitialConcurrentTaskPollers(),
122134
"Values from the Spring Config should be respected");
123135
assertEquals(
124136
1,

temporal-spring-boot-autoconfigure/src/test/resources/application.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,14 @@ spring:
115115
max-concurrent-nexus-task-executors: 1
116116
max-concurrent-activity-executors: 1
117117
max-concurrent-local-activity-executors: 1
118-
max-concurrent-activity-task-pollers: 1
119118
max-concurrent-nexus-task-pollers: 1
120119
workflow-task-pollers-configuration:
121120
poller-behavior-autoscaling:
122121
min-concurrent-task-pollers: 1
123122
max-concurrent-task-pollers: 10
124-
initial-concurrent-task-pollers: 5
123+
activity-task-pollers-configuration:
124+
poller-behavior-autoscaling:
125+
enabled: true
125126
rate-limits:
126127
max-worker-activities-per-second: 1.0
127128
max-task-queue-activities-per-second: 1.0

0 commit comments

Comments
 (0)