Skip to content

Commit 61356f4

Browse files
committed
feat: initial workflow
Signed-off-by: mikeee <[email protected]>
1 parent ae8becf commit 61356f4

File tree

6 files changed

+251
-0
lines changed

6 files changed

+251
-0
lines changed

go.mod

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,23 @@ require (
88
github.com/golang/mock v1.6.0
99
github.com/golang/protobuf v1.5.3
1010
github.com/google/uuid v1.3.1
11+
github.com/microsoft/durabletask-go v0.3.1
1112
github.com/stretchr/testify v1.8.4
1213
google.golang.org/grpc v1.57.0
1314
google.golang.org/protobuf v1.31.0
1415
gopkg.in/yaml.v3 v3.0.1
1516
)
1617

1718
require (
19+
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
1820
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
21+
github.com/go-logr/logr v1.2.4 // indirect
22+
github.com/go-logr/stdr v1.2.2 // indirect
1923
github.com/kr/text v0.2.0 // indirect
24+
github.com/marusama/semaphore/v2 v2.5.0 // indirect
2025
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
2126
go.opentelemetry.io/otel v1.16.0 // indirect
27+
go.opentelemetry.io/otel/metric v1.16.0 // indirect
2228
go.opentelemetry.io/otel/trace v1.16.0 // indirect
2329
golang.org/x/net v0.15.0 // indirect
2430
golang.org/x/sys v0.12.0 // indirect

go.sum

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
1+
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
2+
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
13
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
24
github.com/dapr/dapr v1.12.1-0.20231030205344-441017b888c5 h1:IlC2/2TemJw3dC1P8DsFZ4/ANl6IojDr50B7B8dIGIk=
35
github.com/dapr/dapr v1.12.1-0.20231030205344-441017b888c5/go.mod h1:zHcMel+UwYnMWfvJwpaDr43p95JteXyvBsSjXNnPU+c=
46
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
57
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
68
github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk=
79
github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
10+
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
11+
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
12+
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
13+
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
14+
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
815
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
916
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
1017
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
@@ -20,13 +27,19 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
2027
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
2128
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
2229
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
30+
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
31+
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
32+
github.com/microsoft/durabletask-go v0.3.1 h1:Y7RrPefd4cz5GMxjMx/Zvf9r5INombNlzI0DaQd994k=
33+
github.com/microsoft/durabletask-go v0.3.1/go.mod h1:t3u0iRvIadT1y4MD5cUG0mbTOqgANT6IFcLogv7o0M0=
2334
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
2435
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
2536
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
2637
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
2738
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
2839
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
2940
go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4=
41+
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
42+
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
3043
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
3144
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
3245
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=

workflow/activity_context.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package workflow
2+
3+
import (
4+
"github.com/microsoft/durabletask-go/task"
5+
)
6+
7+
type ActivityContext struct {
8+
ctx task.ActivityContext
9+
}
10+
11+
func (wfac *ActivityContext) GetInput(v interface{}) error {
12+
return wfac.ctx.GetInput(&v)
13+
}

workflow/context.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package workflow
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"time"
7+
8+
"github.com/microsoft/durabletask-go/task"
9+
)
10+
11+
type Context struct {
12+
orchestrationContext *task.OrchestrationContext
13+
}
14+
15+
func (wfc *Context) GetInput(v interface{}) error {
16+
return wfc.orchestrationContext.GetInput(&v)
17+
}
18+
19+
func (wfc *Context) Name() string {
20+
return wfc.orchestrationContext.Name
21+
}
22+
23+
func (wfc *Context) InstanceID() string {
24+
return fmt.Sprintf("%v", wfc.orchestrationContext.ID)
25+
}
26+
27+
func (wfc *Context) CurrentUTCDateTime() time.Time {
28+
return wfc.orchestrationContext.CurrentTimeUtc
29+
}
30+
31+
func (wfc *Context) IsReplaying() bool {
32+
return wfc.orchestrationContext.IsReplaying
33+
}
34+
35+
func (wfc *Context) CallActivity(activity interface{}) task.Task {
36+
var inp string
37+
if err := wfc.GetInput(&inp); err != nil {
38+
log.Printf("unable to get activity input: %v", err)
39+
}
40+
// the call should continue despite being unable to obtain an input
41+
42+
return wfc.orchestrationContext.CallActivity(activity, task.WithActivityInput(inp))
43+
}
44+
45+
func (wfc *Context) CallChildWorkflow() {
46+
// TODO: implement
47+
// call suborchestrator
48+
}
49+
50+
func (wfc *Context) CreateTimer() {
51+
// TODO: implement
52+
}
53+
54+
func (wfc *Context) WaitForExternalEvent() {
55+
// TODO: implement
56+
}
57+
58+
func (wfc *Context) ContinueAsNew() {
59+
// TODO: implement
60+
}

workflow/runtime.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package workflow
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"reflect"
9+
"runtime"
10+
"strings"
11+
"sync"
12+
"time"
13+
14+
"github.com/microsoft/durabletask-go/backend"
15+
"github.com/microsoft/durabletask-go/client"
16+
"github.com/microsoft/durabletask-go/task"
17+
"google.golang.org/grpc"
18+
"google.golang.org/grpc/credentials/insecure"
19+
)
20+
21+
type WorkflowRuntime struct {
22+
tasks *task.TaskRegistry
23+
client *client.TaskHubGrpcClient
24+
25+
mutex sync.Mutex // TODO: implement
26+
quit chan bool
27+
cancel context.CancelFunc
28+
}
29+
30+
type Workflow func(ctx *Context) (any, error)
31+
32+
type Activity func(ctx ActivityContext) (any, error)
33+
34+
func NewRuntime(host string, port string) (*WorkflowRuntime, error) {
35+
ctx, canc := context.WithTimeout(context.Background(), time.Second*10)
36+
defer canc()
37+
38+
address := fmt.Sprintf("%s:%s", host, port)
39+
40+
clientConn, err := grpc.DialContext(
41+
ctx,
42+
address,
43+
grpc.WithTransportCredentials(insecure.NewCredentials()),
44+
grpc.WithBlock(), // TODO: config
45+
)
46+
if err != nil {
47+
return nil, fmt.Errorf("failed to create runtime - grpc connection failed: %v", err)
48+
}
49+
50+
return &WorkflowRuntime{
51+
tasks: task.NewTaskRegistry(),
52+
client: client.NewTaskHubGrpcClient(clientConn, backend.DefaultLogger()),
53+
quit: make(chan bool),
54+
cancel: canc,
55+
}, nil
56+
}
57+
58+
func getDecorator(f interface{}) (string, error) {
59+
if f == nil {
60+
return "", errors.New("nil function name")
61+
}
62+
63+
callSplit := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".")
64+
65+
funcName := callSplit[len(callSplit)-1]
66+
67+
return funcName, nil
68+
}
69+
70+
func (wr *WorkflowRuntime) RegisterWorkflow(w Workflow) error {
71+
wrappedOrchestration := func(ctx *task.OrchestrationContext) (any, error) {
72+
wfCtx := &Context{orchestrationContext: ctx}
73+
74+
return w(wfCtx)
75+
}
76+
77+
// getdecorator for workflow
78+
name, err := getDecorator(w)
79+
if err != nil {
80+
return fmt.Errorf("failed to get workflow decorator: %v", err)
81+
}
82+
83+
err = wr.tasks.AddOrchestratorN(name, wrappedOrchestration)
84+
return err
85+
}
86+
87+
func (wr *WorkflowRuntime) RegisterActivity(a Activity) error {
88+
wrappedActivity := func(ctx task.ActivityContext) (any, error) {
89+
ac := ActivityContext{ctx: ctx}
90+
91+
return a(ac)
92+
}
93+
94+
// getdecorator for activity
95+
name, err := getDecorator(a)
96+
if err != nil {
97+
return fmt.Errorf("failed to get activity decorator: %v", err)
98+
}
99+
100+
err = wr.tasks.AddActivityN(name, wrappedActivity)
101+
return err
102+
}
103+
104+
func (wr *WorkflowRuntime) Start() error {
105+
// go func start
106+
go func() {
107+
err := wr.client.StartWorkItemListener(context.Background(), wr.tasks)
108+
if err != nil {
109+
log.Fatalf("failed to start work stream: %v", err)
110+
}
111+
for {
112+
select {
113+
case <-wr.quit:
114+
return
115+
default:
116+
// continue serving
117+
}
118+
}
119+
}()
120+
121+
return nil
122+
}
123+
124+
func (wr *WorkflowRuntime) Shutdown() error {
125+
// cancel grpc context
126+
wr.cancel()
127+
// send close signal
128+
wr.quit <- true
129+
130+
return nil
131+
}

workflow/runtime_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package workflow
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestWorkflowRuntime(t *testing.T) {
11+
// TODO: Mock grpc conn - currently requires dapr to be available
12+
t.Run("test workflow name is correct", func(t *testing.T) {
13+
wr, err := NewRuntime("localhost", "50001")
14+
require.NoError(t, err)
15+
err = wr.RegisterWorkflow(testOrchestrator)
16+
require.NoError(t, err)
17+
})
18+
}
19+
20+
func TestGetDecorator(t *testing.T) {
21+
name, err := getDecorator(testOrchestrator)
22+
require.NoError(t, err)
23+
assert.Equal(t, "testOrchestrator", name)
24+
}
25+
26+
func testOrchestrator(ctx *Context) (any, error) {
27+
return nil, nil
28+
}

0 commit comments

Comments
 (0)