Skip to content

Commit 89676d3

Browse files
authored
added worker.NewV2 with validation on decision poller count (#1370)
What changed? added a worker.NewV2 interface that will return error and a deprecation message on New api. added validation on WorkerOptions -- decision poller should be larger than 1 if value is set (not zero) and sticky execution is enabled. Why? #1369 How did you test it? Unit Test Potential risks Low
1 parent 4d4c09f commit 89676d3

10 files changed

+136
-26
lines changed

Diff for: CHANGELOG.md

+7-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

77
## [Unreleased]
8+
### Added
9+
- Added worker.NewV2 with validation on decision poller count (#1370)
10+
811
## [v1.2.10] - 2024-07-10
912
### Added
1013
- Revert "Handle panics while polling for tasks (#1352)" (#1357)
@@ -83,16 +86,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
8386
- Fixed in TestEnv workflow interceptor is not propagated correctly for child workflows #1289
8487

8588
## [v1.0.2] - 2023-09-25
86-
### Added
89+
### Added
8790
- Add a structured error for non-determinism failures
8891

89-
### Changed
90-
- Do not log when automatic heart beating fails due to cancellations
92+
### Changed
93+
- Do not log when automatic heart beating fails due to cancellations
9194

9295
## [v1.0.1] - 2023-08-14
9396
### Added
9497
- Emit cadence worker's hardware utilization inside worker once per host by @timl3136 in #1260
95-
### Changed
98+
### Changed
9699
- Updated supported Go version to 1.19
97100
- Log when the automatic heartbeating fails
98101
- Updated golang.org/x/net and github.com/prometheus/client_golang

Diff for: evictiontest/workflow_cache_eviction_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,13 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
174174
// so if our worker puts *cacheSize* entries in the cache, it should evict exactly one
175175
s.service.EXPECT().ResetStickyTaskList(gomock.Any(), gomock.Any(), callOptions()...).DoAndReturn(mockResetStickyTaskList).Times(1)
176176

177-
workflowWorker := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{
177+
workflowWorker, err := internal.NewWorker(s.service, "test-domain", "tasklist", worker.Options{
178178
DisableActivityWorker: true,
179179
Logger: zaptest.NewLogger(s.T()),
180180
IsolationGroup: "zone-1",
181181
})
182+
s.Require().NoError(err)
183+
182184
// this is an arbitrary workflow we use for this test
183185
// NOTE: a simple helloworld that doesn't execute an activity
184186
// won't work because the workflow will simply just complete

Diff for: internal/internal_poller_autoscaler.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ import (
3737
const (
3838
defaultPollerAutoScalerCooldown = time.Minute
3939
defaultPollerAutoScalerTargetUtilization = 0.6
40-
defaultMinConcurrentPollerSize = 1
40+
defaultMinConcurrentActivityPollerSize = 1
41+
defaultMinConcurrentDecisionPollerSize = 2
4142
)
4243

4344
var (

Diff for: internal/internal_poller_autoscaler_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func Test_pollerAutoscaler(t *testing.T) {
6161
taskPoll: 0,
6262
unrelated: 0,
6363
initialPollerCount: 10,
64-
minPollerCount: 1,
64+
minPollerCount: 2,
6565
maxPollerCount: 10,
6666
targetMilliUsage: 500,
6767
cooldownTime: coolDownTime,

Diff for: internal/internal_worker.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -1025,8 +1025,12 @@ func newAggregatedWorker(
10251025
domain string,
10261026
taskList string,
10271027
options WorkerOptions,
1028-
) (worker *aggregatedWorker) {
1028+
) (worker *aggregatedWorker, err error) {
10291029
wOptions := AugmentWorkerOptions(options)
1030+
if err := options.Validate(); err != nil {
1031+
return nil, fmt.Errorf("worker options validation error: %w", err)
1032+
}
1033+
10301034
ctx := wOptions.BackgroundActivityContext
10311035
if ctx == nil {
10321036
ctx = context.Background()
@@ -1156,7 +1160,7 @@ func newAggregatedWorker(
11561160
logger: logger,
11571161
registry: registry,
11581162
workerstats: workerParams.WorkerStats,
1159-
}
1163+
}, nil
11601164
}
11611165

11621166
// tagScope with one or multiple tags, like
@@ -1286,10 +1290,10 @@ func AugmentWorkerOptions(options WorkerOptions) WorkerOptions {
12861290
options.MaxConcurrentSessionExecutionSize = defaultMaxConcurrentSessionExecutionSize
12871291
}
12881292
if options.MinConcurrentActivityTaskPollers == 0 {
1289-
options.MinConcurrentActivityTaskPollers = defaultMinConcurrentPollerSize
1293+
options.MinConcurrentActivityTaskPollers = defaultMinConcurrentActivityPollerSize
12901294
}
12911295
if options.MinConcurrentDecisionTaskPollers == 0 {
1292-
options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentPollerSize
1296+
options.MinConcurrentDecisionTaskPollers = defaultMinConcurrentDecisionPollerSize
12931297
}
12941298
if options.PollerAutoScalerCooldown == 0 {
12951299
options.PollerAutoScalerCooldown = defaultPollerAutoScalerCooldown

Diff for: internal/internal_worker_test.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -414,11 +414,12 @@ func createWorkerWithThrottle(
414414
workerOptions.EnableSessionWorker = true
415415

416416
// Start Worker.
417-
worker := NewWorker(
417+
worker, err := NewWorker(
418418
service,
419419
domain,
420420
"testGroupName2",
421421
workerOptions)
422+
require.NoError(t, err)
422423
return worker
423424
}
424425

@@ -1075,7 +1076,8 @@ func TestActivityNilArgs(t *testing.T) {
10751076
func TestWorkerOptionDefaults(t *testing.T) {
10761077
domain := "worker-options-test"
10771078
taskList := "worker-options-tl"
1078-
aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{})
1079+
aggWorker, err := newAggregatedWorker(nil, domain, taskList, WorkerOptions{})
1080+
require.NoError(t, err)
10791081
decisionWorker := aggWorker.workflowWorker
10801082
require.True(t, decisionWorker.executionParameters.Identity != "")
10811083
require.NotNil(t, decisionWorker.executionParameters.Logger)
@@ -1143,7 +1145,8 @@ func TestWorkerOptionNonDefaults(t *testing.T) {
11431145
Tracer: opentracing.NoopTracer{},
11441146
}
11451147

1146-
aggWorker := newAggregatedWorker(nil, domain, taskList, options)
1148+
aggWorker, err := newAggregatedWorker(nil, domain, taskList, options)
1149+
require.NoError(t, err)
11471150
decisionWorker := aggWorker.workflowWorker
11481151
require.True(t, len(decisionWorker.executionParameters.ContextPropagators) > 0)
11491152

@@ -1388,7 +1391,7 @@ func Test_augmentWorkerOptions(t *testing.T) {
13881391
MaxConcurrentDecisionTaskExecutionSize: 1000,
13891392
WorkerDecisionTasksPerSecond: 100000,
13901393
MaxConcurrentDecisionTaskPollers: 2,
1391-
MinConcurrentDecisionTaskPollers: 1,
1394+
MinConcurrentDecisionTaskPollers: 2,
13921395
PollerAutoScalerCooldown: time.Minute,
13931396
PollerAutoScalerTargetUtilization: 0.6,
13941397
PollerAutoScalerDryRun: false,

Diff for: internal/internal_workers_test.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,8 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() {
340340
DisableActivityWorker: true,
341341
Identity: "test-worker-identity",
342342
}
343-
worker := newAggregatedWorker(s.service, domain, taskList, options)
343+
worker, err := newAggregatedWorker(s.service, domain, taskList, options)
344+
s.Require().NoError(err)
344345
worker.RegisterWorkflowWithOptions(
345346
longDecisionWorkflowFn,
346347
RegisterWorkflowOptions{Name: "long-running-decision-workflow-type"},
@@ -515,7 +516,8 @@ func (s *WorkersTestSuite) TestQueryTask_WorkflowCacheEvicted() {
515516
// See the mock function for the second PollForDecisionTask call above.
516517
MaxConcurrentDecisionTaskExecutionSize: 1,
517518
}
518-
worker := newAggregatedWorker(s.service, domain, taskList, options)
519+
worker, err := newAggregatedWorker(s.service, domain, taskList, options)
520+
s.Require().NoError(err)
519521
worker.RegisterWorkflowWithOptions(
520522
queryWorkflowFn,
521523
RegisterWorkflowOptions{Name: workflowType},
@@ -638,7 +640,8 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() {
638640
DisableActivityWorker: true,
639641
Identity: "test-worker-identity",
640642
}
641-
worker := newAggregatedWorker(s.service, domain, taskList, options)
643+
worker, err := newAggregatedWorker(s.service, domain, taskList, options)
644+
s.Require().NoError(err)
642645
worker.RegisterWorkflowWithOptions(
643646
longDecisionWorkflowFn,
644647
RegisterWorkflowOptions{Name: "multiple-local-activities-workflow-type"},
@@ -748,7 +751,8 @@ func (s *WorkersTestSuite) TestLocallyDispatchedActivity() {
748751
Logger: zaptest.NewLogger(s.T()),
749752
Identity: "test-worker-identity",
750753
}
751-
worker := newAggregatedWorker(s.service, domain, taskList, options)
754+
worker, err := newAggregatedWorker(s.service, domain, taskList, options)
755+
s.Require().NoError(err)
752756
worker.RegisterWorkflowWithOptions(
753757
workflowFn,
754758
RegisterWorkflowOptions{Name: workflowType},
@@ -859,14 +863,15 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() {
859863
return nil
860864
}).MinTimes(1)
861865

862-
worker := newAggregatedWorker(s.service, domain, taskList, options)
866+
worker, err := newAggregatedWorker(s.service, domain, taskList, options)
867+
s.Require().NoError(err)
863868
worker.RegisterWorkflowWithOptions(
864869
workflowFn,
865870
RegisterWorkflowOptions{Name: workflowType},
866871
)
867872
worker.RegisterActivityWithOptions(activitySleep, RegisterActivityOptions{Name: "activitySleep"})
868873
s.NotNil(worker.locallyDispatchedActivityWorker)
869-
err := worker.Start()
874+
err = worker.Start()
870875
s.NoError(err, "worker failed to start")
871876

872877
// wait for test to complete

Diff for: internal/worker.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package internal
2323

2424
import (
2525
"context"
26+
"fmt"
2627
"time"
2728

2829
"go.uber.org/cadence/internal/common/debug"
@@ -108,7 +109,7 @@ type (
108109
// optional: Sets the minimum number of goroutines that will concurrently poll the
109110
// cadence-server to retrieve decision tasks. If FeatureFlags.PollerAutoScalerEnabled is set to true,
110111
// changing this value will NOT affect the rate at which the worker is able to consume tasks from a task list.
111-
// Default value is 1
112+
// Default value is 2
112113
MinConcurrentDecisionTaskPollers int
113114

114115
// optional: Sets the interval of poller autoscaling, between which poller autoscaler changes the poller count
@@ -333,7 +334,7 @@ func NewWorker(
333334
domain string,
334335
taskList string,
335336
options WorkerOptions,
336-
) *aggregatedWorker {
337+
) (*aggregatedWorker, error) {
337338
return newAggregatedWorker(service, domain, taskList, options)
338339
}
339340

@@ -383,3 +384,12 @@ func ReplayPartialWorkflowHistoryFromJSONFile(logger *zap.Logger, jsonfileName s
383384
r := NewWorkflowReplayer()
384385
return r.ReplayPartialWorkflowHistoryFromJSONFile(logger, jsonfileName, lastEventID)
385386
}
387+
388+
// Validate sanity validation of WorkerOptions
389+
func (o WorkerOptions) Validate() error {
390+
// decision task pollers must be >= 2 or unset if sticky tasklist is enabled https://github.com/uber-go/cadence-client/issues/1369
391+
if !o.DisableStickyExecution && (o.MaxConcurrentDecisionTaskPollers == 1 || o.MinConcurrentDecisionTaskPollers == 1) {
392+
return fmt.Errorf("DecisionTaskPollers must be >= 2 or use default value")
393+
}
394+
return nil
395+
}

Diff for: internal/worker_test.go

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) 2017-2021 Uber Technologies Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package internal
22+
23+
import (
24+
"testing"
25+
26+
"github.com/stretchr/testify/assert"
27+
)
28+
29+
func Test_NewWorker(t *testing.T) {
30+
tests := []struct {
31+
name string
32+
options WorkerOptions
33+
expectErr string
34+
}{
35+
{
36+
name: "happy with default value",
37+
options: WorkerOptions{},
38+
expectErr: "",
39+
},
40+
{
41+
name: "happy with explicit decision task poller set to 1 if sticky task list is disabled",
42+
options: WorkerOptions{
43+
MaxConcurrentDecisionTaskPollers: 1,
44+
DisableStickyExecution: true,
45+
},
46+
expectErr: "",
47+
},
48+
{
49+
name: "invalid worker with explicit decision task poller set to 1",
50+
options: WorkerOptions{
51+
MaxConcurrentDecisionTaskPollers: 1,
52+
},
53+
expectErr: "DecisionTaskPollers must be >= 2 or use default value",
54+
},
55+
}
56+
for _, tt := range tests {
57+
t.Run(tt.name, func(t *testing.T) {
58+
w, err := NewWorker(nil, "test-domain", "test-tasklist", tt.options)
59+
if tt.expectErr != "" {
60+
assert.ErrorContains(t, err, tt.expectErr)
61+
assert.Nil(t, w)
62+
} else {
63+
assert.NoError(t, err)
64+
assert.NotNil(t, w)
65+
}
66+
})
67+
}
68+
}

Diff for: worker/worker.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -269,20 +269,34 @@ const (
269269
ShadowModeContinuous = internal.ShadowModeContinuous
270270
)
271271

272-
// New creates an instance of worker for managing workflow and activity executions.
272+
// Deprecated: use NewV2 instead since this implementation will panic on error
273+
func New(
274+
service workflowserviceclient.Interface,
275+
domain string,
276+
taskList string,
277+
options Options,
278+
) Worker {
279+
w, err := internal.NewWorker(service, domain, taskList, options)
280+
if err != nil {
281+
panic(err)
282+
}
283+
return w
284+
}
285+
286+
// NewV2 returns an instance of worker for managing workflow and activity executions and an error.
273287
//
274288
// service - thrift connection to the cadence server
275289
// domain - the name of the cadence domain
276290
// taskList - is the task list name you use to identify your client worker, also
277291
// identifies group of workflow and activity implementations that are
278292
// hosted by a single worker process
279293
// options - configure any worker specific options like logger, metrics, identity
280-
func New(
294+
func NewV2(
281295
service workflowserviceclient.Interface,
282296
domain string,
283297
taskList string,
284298
options Options,
285-
) Worker {
299+
) (Worker, error) {
286300
return internal.NewWorker(service, domain, taskList, options)
287301
}
288302

0 commit comments

Comments
 (0)