Skip to content

Commit 2bd6237

Browse files
committed
fix: fix thread safety issue in engine invoke
1 parent 3ffd1aa commit 2bd6237

File tree

2 files changed

+18
-7
lines changed

2 files changed

+18
-7
lines changed

pkg/workflow/engine_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,8 @@ func Test_Engine_ClonedNetworkAccess(t *testing.T) {
205205
}
206206

207207
func Test_EngineInvocationConcurrent(t *testing.T) {
208-
configuration := configuration.NewInMemory()
209-
engine := NewWorkFlowEngine(configuration)
208+
config := configuration.NewInMemory()
209+
engine := NewWorkFlowEngine(config)
210210

211211
flagset := pflag.NewFlagSet("1", pflag.ExitOnError)
212212
callback := func(invocation InvocationContext, input []Data) ([]Data, error) {
@@ -224,6 +224,9 @@ func Test_EngineInvocationConcurrent(t *testing.T) {
224224
stop := make(chan struct{}, N)
225225
for range N {
226226
go func() {
227+
logger := zerolog.Nop()
228+
engine.SetLogger(&logger)
229+
engine.SetConfiguration(configuration.NewWithOpts())
227230
_, invokeErr := engine.Invoke(workflowId)
228231
assert.NoError(t, invokeErr)
229232
stop <- struct{}{}

pkg/workflow/engineimpl.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ func (e *EngineImpl) GetLogger() *zerolog.Logger {
4141
}
4242

4343
func (e *EngineImpl) SetLogger(logger *zerolog.Logger) {
44+
e.mu.Lock()
45+
defer e.mu.Unlock()
4446
e.logger = logger
4547

4648
if e.networkAccess != nil {
@@ -49,6 +51,8 @@ func (e *EngineImpl) SetLogger(logger *zerolog.Logger) {
4951
}
5052

5153
func (e *EngineImpl) SetConfiguration(config configuration.Configuration) {
54+
e.mu.Lock()
55+
defer e.mu.Unlock()
5256
e.config = config
5357

5458
if e.networkAccess != nil {
@@ -247,21 +251,25 @@ func (e *EngineImpl) InvokeWithInputAndConfig(
247251

248252
// prepare logger
249253
prefix := fmt.Sprintf("%s:%d", id.Host, e.invocationCounter)
250-
e.mu.Unlock()
251-
252254
zlogger := e.logger.With().Str("ext", prefix).Logger()
253255

256+
localConfig := e.config
257+
localNetwork := e.networkAccess
258+
localUi := e.ui
259+
localAnalytics := e.analytics
260+
254261
// prepare configuration
255262
if config == nil {
256-
config = e.config.Clone()
263+
config = localConfig.Clone()
257264
}
258265

259266
// prepare networkAccess
260-
networkAccess := e.networkAccess.Clone()
267+
networkAccess := localNetwork.Clone()
261268
networkAccess.SetConfiguration(config)
269+
e.mu.Unlock()
262270

263271
// create a context object for the invocation
264-
context := NewInvocationContext(id, config, e, networkAccess, zlogger, e.analytics, e.ui)
272+
context := NewInvocationContext(id, config, e, networkAccess, zlogger, localAnalytics, localUi)
265273

266274
// invoke workflow through its callback
267275
zlogger.Printf("Workflow Start")

0 commit comments

Comments
 (0)