diff --git a/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java b/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java index 00eebf808..7ac77ba09 100644 --- a/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java +++ b/src/main/java/com/uber/cadence/client/WorkflowClientOptions.java @@ -26,6 +26,7 @@ import com.uber.m3.tally.Scope; import com.uber.m3.util.ImmutableMap; import java.lang.management.ManagementFactory; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -37,6 +38,7 @@ public final class WorkflowClientOptions { private static final WorkflowClientInterceptor[] EMPTY_INTERCEPTOR_ARRAY = new WorkflowClientInterceptor[0]; private static final List EMPTY_CONTEXT_PROPAGATORS = Arrays.asList(); + private static final Duration DEFAULT_WORKER_SHUTDOWN_TIME = Duration.ofSeconds(10); static { DEFAULT_INSTANCE = new Builder().build(); @@ -62,6 +64,7 @@ public static final class Builder { private String identity = ManagementFactory.getRuntimeMXBean().getName();; private List contextPropagators = EMPTY_CONTEXT_PROPAGATORS; private QueryRejectCondition queryRejectCondition; + private Duration timeForWorkerShutdown = DEFAULT_WORKER_SHUTDOWN_TIME; private Builder() {} @@ -72,6 +75,7 @@ private Builder(WorkflowClientOptions options) { metricsScope = options.getMetricsScope(); identity = options.getIdentity(); queryRejectCondition = options.getQueryRejectCondition(); + timeForWorkerShutdown = options.getTimeForWorkerShutdown(); } public Builder setDomain(String domain) { @@ -139,6 +143,17 @@ public Builder setQueryRejectCondition(QueryRejectCondition queryRejectCondition return this; } + /** + * Time for worker shutdown is an optional field used as the amount of time alloted for handling + * worker shutdown after SIGTERM signal. Default is 10 seconds. + * + * @param timeForWorkerShutdown + */ + public Builder setTimeForWorkerShutdown(Duration timeForWorkerShutdown) { + this.timeForWorkerShutdown = timeForWorkerShutdown; + return this; + } + public WorkflowClientOptions build() { metricsScope = metricsScope.tagged(ImmutableMap.of(MetricsTag.DOMAIN, domain)); return new WorkflowClientOptions( @@ -148,7 +163,8 @@ public WorkflowClientOptions build() { metricsScope, identity, contextPropagators, - queryRejectCondition); + queryRejectCondition, + timeForWorkerShutdown); } } @@ -159,6 +175,7 @@ public WorkflowClientOptions build() { private final String identity; private final List contextPropagators; private final QueryRejectCondition queryRejectCondition; + private final Duration timeForWorkerShutdown; private WorkflowClientOptions( String domain, @@ -167,7 +184,8 @@ private WorkflowClientOptions( Scope metricsScope, String identity, List contextPropagators, - QueryRejectCondition queryRejectCondition) { + QueryRejectCondition queryRejectCondition, + Duration timeForWorkerShutdown) { this.domain = domain; this.dataConverter = dataConverter; this.interceptors = interceptors; @@ -175,6 +193,7 @@ private WorkflowClientOptions( this.identity = identity; this.contextPropagators = contextPropagators; this.queryRejectCondition = queryRejectCondition; + this.timeForWorkerShutdown = timeForWorkerShutdown; } public String getDomain() { @@ -205,6 +224,10 @@ public QueryRejectCondition getQueryRejectCondition() { return queryRejectCondition; } + public Duration getTimeForWorkerShutdown() { + return timeForWorkerShutdown; + } + @Override public String toString() { return "WorkflowClientOptions{" @@ -222,6 +245,8 @@ public String toString() { + contextPropagators + ", queryRejectCondition=" + queryRejectCondition + + ", timeForWorkerShutdown=" + + timeForWorkerShutdown + '}'; } @@ -235,7 +260,8 @@ public boolean equals(Object o) { && Arrays.equals(interceptors, that.interceptors) && com.google.common.base.Objects.equal(identity, that.identity) && com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators) - && queryRejectCondition == that.queryRejectCondition; + && queryRejectCondition == that.queryRejectCondition + && com.google.common.base.Objects.equal(timeForWorkerShutdown, that.timeForWorkerShutdown); } @Override @@ -246,6 +272,7 @@ public int hashCode() { Arrays.hashCode(interceptors), identity, contextPropagators, - queryRejectCondition); + queryRejectCondition, + timeForWorkerShutdown); } } diff --git a/src/main/java/com/uber/cadence/internal/worker/WorkerShutDownHandler.java b/src/main/java/com/uber/cadence/internal/worker/WorkerShutDownHandler.java index da4f09924..d632bf3b4 100644 --- a/src/main/java/com/uber/cadence/internal/worker/WorkerShutDownHandler.java +++ b/src/main/java/com/uber/cadence/internal/worker/WorkerShutDownHandler.java @@ -18,6 +18,7 @@ import com.uber.cadence.internal.common.InternalUtils; import com.uber.cadence.worker.WorkerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -27,7 +28,7 @@ public class WorkerShutDownHandler { private static final List workerFactories = new ArrayList<>(); private static Thread registeredHandler; - public static void registerHandler() { + public static void registerHandler(Duration workerShutdownTimeout) { if (registeredHandler != null) { return; } @@ -44,10 +45,13 @@ public void run() { workerFactory.shutdownNow(); } - long remainingTimeout = 10000; + long remainingTimeoutMillis = + TimeUnit.SECONDS.toMillis(workerShutdownTimeout.getSeconds()) + + TimeUnit.NANOSECONDS.toMillis(workerShutdownTimeout.getNano()); + for (WorkerFactory workerFactory : workerFactories) { - final long timeoutMillis = remainingTimeout; - remainingTimeout = + final long timeoutMillis = remainingTimeoutMillis; + remainingTimeoutMillis = InternalUtils.awaitTermination( timeoutMillis, () -> workerFactory.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS)); diff --git a/src/main/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScaler.java b/src/main/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScaler.java new file mode 100644 index 000000000..9fee8c938 --- /dev/null +++ b/src/main/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScaler.java @@ -0,0 +1,111 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.worker.autoscaler; + +import java.time.Duration; +import java.util.concurrent.Executors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PollerAutoScaler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PollerAutoScaler.class); + + private Duration coolDownTime; + private PollerUsageEstimator pollerUsageEstimator; + private Recommender recommender; + private ResizableSemaphore semaphore; + private int semaphoreSize; + private boolean shutingDown; + + public PollerAutoScaler( + Duration coolDownTime, PollerUsageEstimator pollerUsageEstimator, Recommender recommender) { + this.coolDownTime = coolDownTime; + this.pollerUsageEstimator = pollerUsageEstimator; + this.recommender = recommender; + this.semaphore = new ResizableSemaphore(recommender.getUpperValue()); + this.semaphoreSize = recommender.getUpperValue(); + } + + public void start() { + Executors.newSingleThreadExecutor() + .submit( + new Runnable() { + @Override + public void run() { + while (!shutingDown) { + try { + Thread.sleep(coolDownTime.toMillis()); + if (!shutingDown) { + resizePollers(); + } + } catch (InterruptedException e) { + } + } + } + }); + } + + public void stop() { + LOGGER.info("shutting down poller autoscaler"); + shutingDown = true; + } + + protected void resizePollers() { + PollerUsage pollerUsage = pollerUsageEstimator.estimate(); + int pollerCount = + recommender.recommend(this.semaphoreSize, pollerUsage.getPollerUtilizationRate()); + + int diff = this.semaphoreSize - pollerCount; + if (diff < 0) { + semaphore.release(diff * -1); + } else { + semaphore.decreasePermits(diff); + } + + LOGGER.info(String.format("resized pollers to: %d", pollerCount)); + this.semaphoreSize = pollerCount; + } + + public int getLowerPollerAmount() { + return recommender.getLowerValue(); + } + + public int getUpperPollerAmount() { + return recommender.getUpperValue(); + } + + public PollerUsageEstimator getPollerUsageEstimator() { + return pollerUsageEstimator; + } + + public Recommender getRecommender() { + return recommender; + } + + public void acquire() throws InterruptedException { + semaphore.acquire(); + } + + public void release() { + semaphore.release(); + } + + // For testing + protected int getSemaphoreSize() { + return semaphoreSize; + } +} diff --git a/src/main/java/com/uber/cadence/internal/worker/autoscaler/PollerUsage.java b/src/main/java/com/uber/cadence/internal/worker/autoscaler/PollerUsage.java new file mode 100644 index 000000000..9cf56bd65 --- /dev/null +++ b/src/main/java/com/uber/cadence/internal/worker/autoscaler/PollerUsage.java @@ -0,0 +1,29 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.worker.autoscaler; + +public class PollerUsage { + + private final float pollerUtilizationRate; + + public PollerUsage(float pollerUtilizationRate) { + this.pollerUtilizationRate = pollerUtilizationRate; + } + + public float getPollerUtilizationRate() { + return pollerUtilizationRate; + } +} diff --git a/src/main/java/com/uber/cadence/internal/worker/autoscaler/PollerUsageEstimator.java b/src/main/java/com/uber/cadence/internal/worker/autoscaler/PollerUsageEstimator.java new file mode 100644 index 000000000..4b3a9d0dc --- /dev/null +++ b/src/main/java/com/uber/cadence/internal/worker/autoscaler/PollerUsageEstimator.java @@ -0,0 +1,45 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.worker.autoscaler; + +public class PollerUsageEstimator { + + private int noopTaskCount; + private int actionableTaskCount; + + public void increaseNoopTaskCount() { + noopTaskCount += 1; + } + + public void increaseActionableTaskCount() { + actionableTaskCount += 1; + } + + public PollerUsage estimate() { + if (noopTaskCount + actionableTaskCount == 0) { + return new PollerUsage(0); + } + PollerUsage result = + new PollerUsage((actionableTaskCount * 1f) / (noopTaskCount + actionableTaskCount)); + reset(); + return result; + } + + public void reset() { + noopTaskCount = 0; + actionableTaskCount = 0; + } +} diff --git a/src/main/java/com/uber/cadence/internal/worker/autoscaler/Recommender.java b/src/main/java/com/uber/cadence/internal/worker/autoscaler/Recommender.java new file mode 100644 index 000000000..2352e1979 --- /dev/null +++ b/src/main/java/com/uber/cadence/internal/worker/autoscaler/Recommender.java @@ -0,0 +1,50 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.worker.autoscaler; + +public class Recommender { + + private final float targetPollerUtilRate; + private final int upperValue; + private final int lowerValue; + + public Recommender(float targetPollerUtilRate, int upperValue, int lowerValue) { + this.targetPollerUtilRate = targetPollerUtilRate; + this.upperValue = upperValue; + this.lowerValue = lowerValue; + } + + public int recommend(int currentPollers, float pollerUtilizationRate) { + int recommended = 0; + + if (pollerUtilizationRate == 1) { + return upperValue; + } + + float r = currentPollers * pollerUtilizationRate / targetPollerUtilRate; + r = Math.min(upperValue, Math.max(lowerValue, r)); + recommended += r; + return recommended; + } + + public int getUpperValue() { + return upperValue; + } + + public int getLowerValue() { + return lowerValue; + } +} diff --git a/src/main/java/com/uber/cadence/internal/worker/autoscaler/ResizableSemaphore.java b/src/main/java/com/uber/cadence/internal/worker/autoscaler/ResizableSemaphore.java new file mode 100644 index 000000000..80a87089d --- /dev/null +++ b/src/main/java/com/uber/cadence/internal/worker/autoscaler/ResizableSemaphore.java @@ -0,0 +1,28 @@ +/* + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.worker.autoscaler; + +import java.util.concurrent.Semaphore; + +public class ResizableSemaphore extends Semaphore { + public ResizableSemaphore(int permits) { + super(permits); + } + + public void decreasePermits(int reduction) { + reducePermits(reduction); + } +} diff --git a/src/main/java/com/uber/cadence/worker/WorkerFactory.java b/src/main/java/com/uber/cadence/worker/WorkerFactory.java index 372d26384..e36131387 100644 --- a/src/main/java/com/uber/cadence/worker/WorkerFactory.java +++ b/src/main/java/com/uber/cadence/worker/WorkerFactory.java @@ -58,7 +58,7 @@ public static WorkerFactory newInstance(WorkflowClient workflowClient) { public static WorkerFactory newInstance( WorkflowClient workflowClient, WorkerFactoryOptions options) { - WorkerShutDownHandler.registerHandler(); + WorkerShutDownHandler.registerHandler(workflowClient.getOptions().getTimeForWorkerShutdown()); WorkerFactory workerFactory = new WorkerFactory(workflowClient, options); WorkerShutDownHandler.registerWorkerFactory(workerFactory); return workerFactory; diff --git a/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java b/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java new file mode 100644 index 000000000..ce28b3b54 --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerAutoScalerTest.java @@ -0,0 +1,33 @@ +package com.uber.cadence.internal.worker.autoscaler; + +import static org.junit.Assert.*; + +import java.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class PollerAutoScalerTest { + + @Test + public void testAutoScalerScalesPollers() { + PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator(); + Recommender recommender = new Recommender(0.5f, 100, 10, "test"); + PollerAutoScaler pollerAutoScaler = + new PollerAutoScaler(Duration.ofSeconds(1), pollerUsageEstimator, recommender); + + assertEquals(100, pollerAutoScaler.getSemaphoreSize()); + + pollerUsageEstimator.increaseActionableTaskCount(); + pollerAutoScaler.resizePollers(); + + assertEquals(100, pollerAutoScaler.getSemaphoreSize()); + + pollerUsageEstimator.increaseNoopTaskCount(); + + pollerAutoScaler.resizePollers(); + + assertEquals(10, pollerAutoScaler.getSemaphoreSize()); + } +} diff --git a/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerUsageEstimatorTest.java b/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerUsageEstimatorTest.java new file mode 100644 index 000000000..82c8f34a6 --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/worker/autoscaler/PollerUsageEstimatorTest.java @@ -0,0 +1,51 @@ +package com.uber.cadence.internal.worker.autoscaler; + +import static org.junit.Assert.*; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class PollerUsageEstimatorTest { + + @Test + public void testUsageIsZero() { + PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator(); + PollerUsage pollerUsage = pollerUsageEstimator.estimate(); + assertEquals(0f, pollerUsage.getPollerUtilizationRate(), 0); + } + + @Test + public void testUsagesIs100Percent() { + PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator(); + pollerUsageEstimator.increaseActionableTaskCount(); + pollerUsageEstimator.increaseActionableTaskCount(); + PollerUsage pollerUsage = pollerUsageEstimator.estimate(); + assertEquals(1f, pollerUsage.getPollerUtilizationRate(), 0); + } + + @Test + public void testUsageCalculatedCorrectly() { + PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator(); + pollerUsageEstimator.increaseNoopTaskCount(); + pollerUsageEstimator.increaseActionableTaskCount(); + pollerUsageEstimator.increaseNoopTaskCount(); + pollerUsageEstimator.increaseNoopTaskCount(); + + PollerUsage pollerUsage = pollerUsageEstimator.estimate(); + assertEquals(0.25f, pollerUsage.getPollerUtilizationRate(), 0); + } + + @Test + public void estimationIsReset() { + PollerUsageEstimator pollerUsageEstimator = new PollerUsageEstimator(); + pollerUsageEstimator.increaseActionableTaskCount(); + + PollerUsage pollerUsage = pollerUsageEstimator.estimate(); + assertEquals(1f, pollerUsage.getPollerUtilizationRate(), 0); + + pollerUsage = pollerUsageEstimator.estimate(); + assertEquals(0f, pollerUsage.getPollerUtilizationRate(), 0); + } +} diff --git a/src/test/java/com/uber/cadence/internal/worker/autoscaler/RecommenderTest.java b/src/test/java/com/uber/cadence/internal/worker/autoscaler/RecommenderTest.java new file mode 100644 index 000000000..72b6b6101 --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/worker/autoscaler/RecommenderTest.java @@ -0,0 +1,57 @@ +package com.uber.cadence.internal.worker.autoscaler; + +import static org.junit.Assert.*; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class RecommenderTest { + + @Test + public void pollerUnderutilizedShouldReduceToLowerBound() { + Recommender recommender = new Recommender(0.5f, 100, 1); + + int recommendedPollerCount = recommender.recommend(100, 0); + assertEquals(1, recommendedPollerCount); + } + + @Test + public void pollerUnderUtilsedShouldReduce() { + Recommender recommender = new Recommender(0.5f, 100, 1); + + int recommendedPollerCount = recommender.recommend(100, 0.1f); + assertEquals(20, recommendedPollerCount); + } + + @Test + public void polleratTargetRateShouldRemainUnchanged() { + Recommender recommender = new Recommender(0.5f, 100, 1); + int recommendedPollerCount = recommender.recommend(25, 0.5f); + assertEquals(25, recommendedPollerCount); + } + + @Test + public void pollerOverUtilised100PercentShouldAddPollersToMax() { + Recommender recommender = new Recommender(0.5f, 100, 1); + + int recommendedPollerCount = recommender.recommend(5, 1); + assertEquals(100, recommendedPollerCount); + } + + @Test + public void pollerOverUtilisedShouldAddPollers() { + Recommender recommender = new Recommender(0.4f, 100, 1); + int recommendedPollerCount = recommender.recommend(10, 0.8f); + assertEquals(20, recommendedPollerCount); + } + + @Test + public void pollerOverUtilisedUpperBound() { + Recommender recommender = new Recommender(0.5f, 100, 1); + + int recommendedPollerCount = recommender.recommend(99, 1); + assertEquals(100, recommendedPollerCount); + } +} diff --git a/src/test/java/com/uber/cadence/internal/worker/autoscaler/ResizableSemaphoreTest.java b/src/test/java/com/uber/cadence/internal/worker/autoscaler/ResizableSemaphoreTest.java new file mode 100644 index 000000000..b3e0d1dd8 --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/worker/autoscaler/ResizableSemaphoreTest.java @@ -0,0 +1,29 @@ +package com.uber.cadence.internal.worker.autoscaler; + +import static org.junit.Assert.*; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ResizableSemaphoreTest { + + @Test + public void releaseIncreasesNumberOfAvailablePermits() { + ResizableSemaphore resizableSemaphore = new ResizableSemaphore(10); + + resizableSemaphore.release(2); + + assertEquals(12, resizableSemaphore.availablePermits()); + } + + @Test + public void decreasePermitsDecreasesNumberOfAvailablePermits() { + ResizableSemaphore resizableSemaphore = new ResizableSemaphore(10); + + resizableSemaphore.decreasePermits(2); + + assertEquals(8, resizableSemaphore.availablePermits()); + } +} diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 4127be6fc..dce3f03b1 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -2729,6 +2729,7 @@ public void testQueryRejectionConditionNotOpen() { WorkflowClientOptions clientOptions = WorkflowClientOptions.newBuilder(workflowClient.getOptions()) .setQueryRejectCondition(QueryRejectCondition.NOT_OPEN) + .setTimeForWorkerShutdown(Duration.ofSeconds(1)) .build(); WorkflowClient wc; if (useExternalService) {