diff --git a/go.mod b/go.mod index 62144e815..d24d93c32 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/agiledragon/gomonkey v2.0.2+incompatible github.com/agiledragon/gomonkey/v2 v2.9.0 github.com/mattn/go-sqlite3 v1.14.19 + google.golang.org/protobuf v1.30.0 ) require ( @@ -86,7 +87,6 @@ require ( go.uber.org/multierr v1.8.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/protobuf v1.30.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/pkg/saga/statemachine/engine/invoker/grpc_invoker.go b/pkg/saga/statemachine/engine/invoker/grpc_invoker.go new file mode 100644 index 000000000..e83e798bd --- /dev/null +++ b/pkg/saga/statemachine/engine/invoker/grpc_invoker.go @@ -0,0 +1,244 @@ +package invoker + +import ( + "context" + "errors" + "fmt" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" + "github.com/seata/seata-go/pkg/util/log" + "google.golang.org/grpc" + "reflect" + "strings" + "sync" + "time" +) + +type GRPCInvoker struct { + clients map[string]GRPCClient + clientsMapLock sync.Mutex + needClose bool +} + +func NewGRPCInvoker() *GRPCInvoker { + return &GRPCInvoker{ + clients: make(map[string]GRPCClient), + } +} + +func (g *GRPCInvoker) NeedClose() bool { + return g.needClose +} + +func (g *GRPCInvoker) SetNeedClose(needClose bool) { + g.needClose = needClose +} + +func (g *GRPCInvoker) RegisterClient(serviceName string, client GRPCClient) { + g.clientsMapLock.Lock() + defer g.clientsMapLock.Unlock() + + g.clients[serviceName] = client +} + +func (g *GRPCInvoker) GetClient(serviceName string) GRPCClient { + g.clientsMapLock.Lock() + defer g.clientsMapLock.Unlock() + + if client, ok := g.clients[serviceName]; ok { + return client + } + + return nil +} + +func (g *GRPCInvoker) Invoke(ctx context.Context, input []any, service state.ServiceTaskState) (output []reflect.Value, err error) { + serviceTaskStateImpl := service.(*state.ServiceTaskStateImpl) + client := g.GetClient(serviceTaskStateImpl.ServiceName()) + if client == nil { + return nil, errors.New(fmt.Sprintf("no grpc client %s for service task state", serviceTaskStateImpl.ServiceName())) + } + + // context is the first arg in grpc client method + input = append([]any{ctx}, input...) + if serviceTaskStateImpl.IsAsync() { + go func() { + _, err := client.CallMethod(serviceTaskStateImpl, input) + if err != nil { + log.Errorf("invoke Service[%s].%s failed, err is %s", serviceTaskStateImpl.ServiceName(), + serviceTaskStateImpl.ServiceMethod(), err) + } + }() + return nil, nil + } else { + return client.CallMethod(serviceTaskStateImpl, input) + } +} + +func (g *GRPCInvoker) Close(ctx context.Context) error { + if g.needClose { + for _, client := range g.clients { + err := client.CloseConnection() + if err != nil { + return err + } + } + } + return nil +} + +type GRPCClient interface { + CallMethod(serviceTaskStateImpl *state.ServiceTaskStateImpl, input []any) ([]reflect.Value, error) + + CloseConnection() error +} + +type GPRCClientImpl struct { + serviceName string + client any + connection *grpc.ClientConn + methodLock sync.Mutex +} + +func NewGRPCClient(serviceName string, client any, connection *grpc.ClientConn) *GPRCClientImpl { + return &GPRCClientImpl{ + serviceName: serviceName, + client: client, + connection: connection, + } +} + +func (g *GPRCClientImpl) CallMethod(serviceTaskStateImpl *state.ServiceTaskStateImpl, input []any) ([]reflect.Value, error) { + + if serviceTaskStateImpl.Method() == nil { + err := g.initMethod(serviceTaskStateImpl) + if err != nil { + return nil, err + } + } + method := serviceTaskStateImpl.Method() + + args := make([]reflect.Value, 0, len(input)) + for _, arg := range input { + args = append(args, reflect.ValueOf(arg)) + } + + retryCountMap := make(map[state.Retry]int) + for { + res, err, shouldRetry := func() (res []reflect.Value, resErr error, shouldRetry bool) { + defer func() { + // err may happen in the method invoke (panic) and method return, we try to find err and use it to decide retry by + // whether contains exception or not + if r := recover(); r != nil { + errStr := fmt.Sprintf("%v", r) + retry := g.matchRetry(serviceTaskStateImpl, errStr) + res = nil + resErr = errors.New(errStr) + + if retry == nil { + return + } + shouldRetry = g.needRetry(serviceTaskStateImpl, retryCountMap, retry, resErr) + return + } + }() + + outs := method.Call(args) + // err is the last arg in grpc client method + if err, ok := outs[len(outs)-1].Interface().(error); ok { + errStr := err.Error() + retry := g.matchRetry(serviceTaskStateImpl, errStr) + res = nil + resErr = err + + if retry == nil { + return + } + shouldRetry = g.needRetry(serviceTaskStateImpl, retryCountMap, retry, resErr) + return + } + + // invoke success + res = outs + resErr = nil + shouldRetry = false + return + }() + + if !shouldRetry { + if err != nil { + return nil, errors.New(fmt.Sprintf("invoke Service[%s] failed, not satisfy retry config, the last err is %s", + serviceTaskStateImpl.ServiceName(), err)) + } + return res, nil + } + } +} + +func (g *GPRCClientImpl) CloseConnection() error { + if g.connection != nil { + err := g.connection.Close() + if err != nil { + return err + } + } + return nil +} + +func (g *GPRCClientImpl) initMethod(serviceTaskStateImpl *state.ServiceTaskStateImpl) error { + methodName := serviceTaskStateImpl.ServiceMethod() + g.methodLock.Lock() + defer g.methodLock.Unlock() + clientValue := reflect.ValueOf(g.client) + if clientValue.IsZero() { + return errors.New(fmt.Sprintf("invalid client value when grpc client call, serviceName: %s", g.serviceName)) + } + method := clientValue.MethodByName(methodName) + if method.IsZero() { + return errors.New(fmt.Sprintf("invalid client method when grpc client call, serviceName: %s, serviceMethod: %s", + g.serviceName, methodName)) + } + serviceTaskStateImpl.SetMethod(&method) + return nil +} + +func (g *GPRCClientImpl) matchRetry(impl *state.ServiceTaskStateImpl, str string) state.Retry { + if impl.Retry() != nil { + for _, retry := range impl.Retry() { + if retry.Exceptions() != nil { + for _, exception := range retry.Exceptions() { + if strings.Contains(str, exception) { + return retry + } + } + } + } + } + return nil +} + +func (g *GPRCClientImpl) needRetry(impl *state.ServiceTaskStateImpl, countMap map[state.Retry]int, retry state.Retry, err error) bool { + attempt, exist := countMap[retry] + if !exist { + countMap[retry] = 0 + } + + if attempt >= retry.MaxAttempt() { + return false + } + + intervalSecond := retry.IntervalSecond() + backoffRate := retry.BackoffRate() + var currentInterval int64 + if attempt == 0 { + currentInterval = int64(intervalSecond * 1000) + } else { + currentInterval = int64(intervalSecond * backoffRate * float64(attempt) * 1000) + } + + log.Warnf("invoke service[%s.%s] failed, will retry after %s millis, current retry count: %s, current err: %s", + impl.ServiceName(), impl.ServiceMethod(), currentInterval, attempt, err) + + time.Sleep(time.Duration(currentInterval) * time.Millisecond) + countMap[retry] = attempt + 1 + return true +} diff --git a/pkg/saga/statemachine/engine/invoker/grpc_invoker_test.go b/pkg/saga/statemachine/engine/invoker/grpc_invoker_test.go new file mode 100644 index 000000000..09d6745f9 --- /dev/null +++ b/pkg/saga/statemachine/engine/invoker/grpc_invoker_test.go @@ -0,0 +1,168 @@ +package invoker + +import ( + "context" + "errors" + "fmt" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "testing" + "time" + + pb "github.com/seata/seata-go/testdata/saga/engine/invoker/grpc" +) + +type MockGPRCClientImpl struct { + GPRCClientImpl +} + +type mockClientImpl struct { + invokeCount int +} + +func (m *mockClientImpl) SayHelloRight(ctx context.Context, word string) (string, error) { + m.invokeCount++ + fmt.Println("invoke right") + return word, nil +} + +func (m *mockClientImpl) SayHelloRightLater(ctx context.Context, word string, delay int) (string, error) { + m.invokeCount++ + if delay == m.invokeCount { + fmt.Println("invoke right") + return word, nil + } + fmt.Println("invoke fail") + return "", errors.New("invoke failed") +} + +func TestGRPCInvokerInvokeSucceedWithOutRetry(t *testing.T) { + ctx := context.Background() + invoker := newGRPCServiceInvoker() + values, err := invoker.Invoke(ctx, []any{"hello"}, newHelloServiceTaskState()) + if err != nil { + t.Error(err) + return + } + if values == nil || len(values) == 0 { + t.Error("no value in values") + return + } + if values[0].Interface().(string) != "hello" { + t.Errorf("expect hello, but got %v", values[0].Interface()) + } + if _, ok := values[1].Interface().(error); ok { + t.Errorf("expect nil, but got %v", values[1].Interface()) + } +} + +func TestGRPCInvokerInvokeSucceedInRetry(t *testing.T) { + ctx := context.Background() + invoker := newGRPCServiceInvoker() + values, err := invoker.Invoke(ctx, []any{"hello", 2}, newHelloServiceTaskStateWithRetry()) + if err != nil { + t.Error(err) + return + } + if values == nil || len(values) == 0 { + t.Error("no value in values") + return + } + if values[0].Interface().(string) != "hello" { + t.Errorf("expect hello, but got %v", values[0].Interface()) + } + if _, ok := values[1].Interface().(error); ok { + t.Errorf("expect nil, but got %v", values[1].Interface()) + } +} + +func TestGRPCInvokerInvokeFailedInRetry(t *testing.T) { + ctx := context.Background() + invoker := newGRPCServiceInvoker() + _, err := invoker.Invoke(ctx, []any{"hello", 5}, newHelloServiceTaskStateWithRetry()) + if err != nil { + assert.Error(t, err) + } +} + +func TestGRPCInvokerInvokeE2E(t *testing.T) { + go func() { + pb.StartProductServer() + }() + time.Sleep(3000 * time.Millisecond) + conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("did not connect: %v", err) + } + c := pb.NewProductInfoClient(conn) + grpcClient := NewGRPCClient("product", c, conn) + + invoker := NewGRPCInvoker() + invoker.RegisterClient("product", grpcClient) + ctx := context.Background() + values, err := invoker.Invoke(ctx, []any{&pb.Product{Id: "123"}}, newProductServiceTaskState()) + if err != nil { + t.Error(err) + return + } + t.Log(values) + err = invoker.Close(ctx) + if err != nil { + t.Error(err) + return + } +} + +func newGRPCServiceInvoker() ServiceInvoker { + mockGRPCInvoker := NewGRPCInvoker() + mockGRPCClient := &mockClientImpl{} + mockClient := NewGRPCClient("hello", mockGRPCClient, &grpc.ClientConn{}) + mockGRPCInvoker.RegisterClient("hello", mockClient) + return mockGRPCInvoker +} + +func newProductServiceTaskState() state.ServiceTaskState { + serviceTaskStateImpl := state.NewServiceTaskStateImpl() + serviceTaskStateImpl.SetName("product") + serviceTaskStateImpl.SetIsAsync(false) + serviceTaskStateImpl.SetServiceName("product") + serviceTaskStateImpl.SetServiceType("GRPC") + serviceTaskStateImpl.SetServiceMethod("AddProduct") + + retryImpl := &state.RetryImpl{} + retryImpl.SetExceptions([]string{"fail"}) + retryImpl.SetIntervalSecond(1) + retryImpl.SetMaxAttempt(3) + retryImpl.SetBackoffRate(0.9) + serviceTaskStateImpl.SetRetry([]state.Retry{retryImpl}) + return serviceTaskStateImpl +} + +func newHelloServiceTaskState() state.ServiceTaskState { + serviceTaskStateImpl := state.NewServiceTaskStateImpl() + serviceTaskStateImpl.SetName("hello") + serviceTaskStateImpl.SetIsAsync(false) + serviceTaskStateImpl.SetServiceName("hello") + serviceTaskStateImpl.SetServiceType("GRPC") + serviceTaskStateImpl.SetServiceMethod("SayHelloRight") + return serviceTaskStateImpl +} + +func newHelloServiceTaskStateWithRetry() state.ServiceTaskState { + serviceTaskStateImpl := state.NewServiceTaskStateImpl() + serviceTaskStateImpl.SetName("hello") + serviceTaskStateImpl.SetIsAsync(false) + serviceTaskStateImpl.SetServiceName("hello") + serviceTaskStateImpl.SetServiceType("GRPC") + serviceTaskStateImpl.SetServiceMethod("SayHelloRightLater") + + retryImpl := &state.RetryImpl{} + retryImpl.SetExceptions([]string{"fail"}) + retryImpl.SetIntervalSecond(1) + retryImpl.SetMaxAttempt(3) + retryImpl.SetBackoffRate(0.9) + serviceTaskStateImpl.SetRetry([]state.Retry{retryImpl}) + return serviceTaskStateImpl +} diff --git a/pkg/saga/statemachine/engine/invoker/invoker.go b/pkg/saga/statemachine/engine/invoker/invoker.go index 8c2ec0698..e1dd5dce2 100644 --- a/pkg/saga/statemachine/engine/invoker/invoker.go +++ b/pkg/saga/statemachine/engine/invoker/invoker.go @@ -1,5 +1,12 @@ package invoker +import ( + "context" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" + "reflect" + "sync" +) + type ScriptInvokerManager interface { } @@ -7,8 +14,32 @@ type ScriptInvoker interface { } type ServiceInvokerManager interface { + ServiceInvoker(serviceType string) ServiceInvoker + PutServiceInvoker(serviceType string, invoker ServiceInvoker) } type ServiceInvoker interface { - Invoke() + Invoke(ctx context.Context, input []any, service state.ServiceTaskState) (output []reflect.Value, err error) + Close(ctx context.Context) error +} + +type ServiceInvokerManagerImpl struct { + invokers map[string]ServiceInvoker + mutex sync.Mutex +} + +func NewServiceInvokerManagerImpl() *ServiceInvokerManagerImpl { + return &ServiceInvokerManagerImpl{ + invokers: make(map[string]ServiceInvoker), + } +} + +func (manager *ServiceInvokerManagerImpl) ServiceInvoker(serviceType string) ServiceInvoker { + return manager.invokers[serviceType] +} + +func (manager *ServiceInvokerManagerImpl) PutServiceInvoker(serviceType string, invoker ServiceInvoker) { + manager.mutex.Lock() + defer manager.mutex.Unlock() + manager.invokers[serviceType] = invoker } diff --git a/pkg/saga/statemachine/statelang/parser/statemachine_parser.go b/pkg/saga/statemachine/statelang/parser/statemachine_parser.go index a74c79ff7..690ad278f 100644 --- a/pkg/saga/statemachine/statelang/parser/statemachine_parser.go +++ b/pkg/saga/statemachine/statelang/parser/statemachine_parser.go @@ -136,7 +136,7 @@ func (b BaseStateParser) GetBoolOrDefault(stateName string, stateMap map[string] valueAsBool, ok := value.(bool) if !ok { - return false, errors.New("State [" + stateName + "] " + key + " illegal, required bool") + return defaultValue, errors.New("State [" + stateName + "] " + key + " illegal, required bool") } return valueAsBool, nil } diff --git a/pkg/saga/statemachine/statelang/parser/task_state_json_parser.go b/pkg/saga/statemachine/statelang/parser/task_state_json_parser.go index 5e702487c..8dcfff5c5 100644 --- a/pkg/saga/statemachine/statelang/parser/task_state_json_parser.go +++ b/pkg/saga/statemachine/statelang/parser/task_state_json_parser.go @@ -157,16 +157,16 @@ func (a *AbstractTaskStateParser) parseRetries(stateName string, retryInterfaces return nil, errors.New("State [" + stateName + "] " + "Retry illegal, require map[string]interface{}") } retry := &state.RetryImpl{} - errorTypes, err := a.GetSliceOrDefault(stateName, retryMap, "Exceptions", nil) + exceptions, err := a.GetSliceOrDefault(stateName, retryMap, "Exceptions", nil) if err != nil { return nil, err } - if errorTypes != nil { - errorTypeNames := make([]string, 0) - for _, errorType := range errorTypes { - errorTypeNames = append(errorTypeNames, errorType.(string)) + if exceptions != nil { + errors := make([]string, 0) + for _, errorType := range exceptions { + errors = append(errors, errorType.(string)) } - retry.SetErrorTypeNames(errorTypeNames) + retry.SetExceptions(errors) } maxAttempts, err := a.GetIntOrDefault(stateName, retryMap, "MaxAttempts", 0) @@ -191,14 +191,14 @@ func (a *AbstractTaskStateParser) parseRetries(stateName string, retryInterfaces return retries, nil } -func (a *AbstractTaskStateParser) parseCatches(stateName string, catchInterfaces []interface{}) ([]state.ErrorMatch, error) { - errorMatches := make([]state.ErrorMatch, 0, len(catchInterfaces)) +func (a *AbstractTaskStateParser) parseCatches(stateName string, catchInterfaces []interface{}) ([]state.ExceptionMatch, error) { + errorMatches := make([]state.ExceptionMatch, 0, len(catchInterfaces)) for _, catchInterface := range catchInterfaces { catchMap, ok := catchInterface.(map[string]interface{}) if !ok { return nil, errors.New("State [" + stateName + "] " + "Catch illegal, require map[string]interface{}") } - errorMatch := &state.ErrorMatchImpl{} + errorMatch := &state.ExceptionMatchImpl{} errorInterfaces, err := a.GetSliceOrDefault(stateName, catchMap, "Exceptions", nil) if err != nil { return nil, err @@ -208,7 +208,7 @@ func (a *AbstractTaskStateParser) parseCatches(stateName string, catchInterfaces for _, errorType := range errorInterfaces { errorNames = append(errorNames, errorType.(string)) } - errorMatch.SetErrors(errorNames) + errorMatch.SetExceptions(errorNames) } next, err := a.GetStringOrDefault(stateName, catchMap, "Next", "") if err != nil { @@ -242,7 +242,7 @@ func (s ServiceTaskStateParser) Parse(stateName string, stateMap map[string]inte return nil, err } - serviceName, err := s.GetString(stateName, stateMap, "ServiceName") + serviceName, err := s.GetString(stateName, stateMap, "serviceName") if err != nil { return nil, err } diff --git a/pkg/saga/statemachine/statelang/state/task_state.go b/pkg/saga/statemachine/statelang/state/task_state.go index 63e33f3ca..734680ce4 100644 --- a/pkg/saga/statemachine/statelang/state/task_state.go +++ b/pkg/saga/statemachine/statelang/state/task_state.go @@ -17,7 +17,7 @@ type TaskState interface { Retry() []Retry - Catches() []ErrorMatch + Catches() []ExceptionMatch Status() map[string]string @@ -36,18 +36,18 @@ type Loop interface { CompletionCondition() string } -type ErrorMatch interface { - Errors() []string +type ExceptionMatch interface { + Exceptions() []string - ErrorTypes() []reflect.Type + ExceptionTypes() []reflect.Type - SetErrorTypes(errorTypes []reflect.Type) + SetExceptionTypes(ExceptionTypes []reflect.Type) Next() string } type Retry interface { - ErrorTypeNames() []string + Exceptions() []string IntervalSecond() float64 @@ -77,7 +77,7 @@ type ServiceTaskState interface { type AbstractTaskState struct { *statelang.BaseState loop Loop - catches []ErrorMatch + catches []ExceptionMatch input []interface{} output map[string]interface{} compensatePersistModeUpdate bool @@ -140,7 +140,7 @@ func (a *AbstractTaskState) SetLoop(loop Loop) { a.loop = loop } -func (a *AbstractTaskState) SetCatches(catches []ErrorMatch) { +func (a *AbstractTaskState) SetCatches(catches []ExceptionMatch) { a.catches = catches } @@ -172,7 +172,7 @@ func (a *AbstractTaskState) ForUpdate() bool { return a.forUpdate } -func (a *AbstractTaskState) Catches() []ErrorMatch { +func (a *AbstractTaskState) Catches() []ExceptionMatch { return a.catches } @@ -198,6 +198,7 @@ type ServiceTaskStateImpl struct { serviceName string serviceMethod string parameterTypes []string + method *reflect.Value persist bool retryPersistModeUpdate bool compensatePersistModeUpdate bool @@ -210,6 +211,14 @@ func NewServiceTaskStateImpl() *ServiceTaskStateImpl { } } +func (s *ServiceTaskStateImpl) Method() *reflect.Value { + return s.method +} + +func (s *ServiceTaskStateImpl) SetMethod(method *reflect.Value) { + s.method = method +} + func (s *ServiceTaskStateImpl) IsAsync() bool { return s.isAsync } @@ -327,14 +336,14 @@ func (l *LoopImpl) CompletionCondition() string { } type RetryImpl struct { - errorTypeNames []string + exceptions []string intervalSecond float64 maxAttempt int backoffRate float64 } -func (r *RetryImpl) SetErrorTypeNames(errorTypeNames []string) { - r.errorTypeNames = errorTypeNames +func (r *RetryImpl) SetExceptions(exceptions []string) { + r.exceptions = exceptions } func (r *RetryImpl) SetIntervalSecond(intervalSecond float64) { @@ -349,8 +358,8 @@ func (r *RetryImpl) SetBackoffRate(backoffRate float64) { r.backoffRate = backoffRate } -func (r *RetryImpl) ErrorTypeNames() []string { - return r.errorTypeNames +func (r *RetryImpl) Exceptions() []string { + return r.exceptions } func (r *RetryImpl) IntervalSecond() float64 { @@ -365,33 +374,33 @@ func (r *RetryImpl) BackoffRate() float64 { return r.backoffRate } -type ErrorMatchImpl struct { - errors []string - errorTypes []reflect.Type - next string +type ExceptionMatchImpl struct { + exceptions []string + exceptionTypes []reflect.Type + next string } -func (e *ErrorMatchImpl) SetErrors(errors []string) { - e.errors = errors +func (e *ExceptionMatchImpl) SetExceptions(errors []string) { + e.exceptions = errors } -func (e *ErrorMatchImpl) SetNext(next string) { +func (e *ExceptionMatchImpl) SetNext(next string) { e.next = next } -func (e *ErrorMatchImpl) Errors() []string { - return e.errors +func (e *ExceptionMatchImpl) Exceptions() []string { + return e.exceptions } -func (e *ErrorMatchImpl) ErrorTypes() []reflect.Type { - return e.errorTypes +func (e *ExceptionMatchImpl) ExceptionTypes() []reflect.Type { + return e.exceptionTypes } -func (e *ErrorMatchImpl) SetErrorTypes(errorTypes []reflect.Type) { - e.errorTypes = errorTypes +func (e *ExceptionMatchImpl) SetExceptionTypes(exceptionTypes []reflect.Type) { + e.exceptionTypes = exceptionTypes } -func (e *ErrorMatchImpl) Next() string { +func (e *ExceptionMatchImpl) Next() string { return e.next } diff --git a/testdata/saga/engine/invoker/grpc/product.go b/testdata/saga/engine/invoker/grpc/product.go new file mode 100644 index 000000000..e240a9e8d --- /dev/null +++ b/testdata/saga/engine/invoker/grpc/product.go @@ -0,0 +1,35 @@ +package product + +import ( + "context" + "log" + "net" + + "google.golang.org/grpc" +) + +type server struct { + UnimplementedProductInfoServer +} + +func (*server) AddProduct(context.Context, *Product) (*ProductId, error) { + log.Println("add product success") + return &ProductId{Value: "1"}, nil +} +func (*server) GetProduct(context.Context, *ProductId) (*Product, error) { + log.Println("get product success") + return &Product{Id: "1"}, nil +} + +func StartProductServer() { + lis, err := net.Listen("tcp", ":8080") + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + RegisterProductInfoServer(s, &server{}) + log.Printf("server listening at %v", lis.Addr()) + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} diff --git a/testdata/saga/engine/invoker/grpc/product.pb.go b/testdata/saga/engine/invoker/grpc/product.pb.go new file mode 100644 index 000000000..663af14dc --- /dev/null +++ b/testdata/saga/engine/invoker/grpc/product.pb.go @@ -0,0 +1,219 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc v4.25.3 +// source: product.proto + +package product + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Product struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *Product) Reset() { + *x = Product{} + if protoimpl.UnsafeEnabled { + mi := &file_product_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Product) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Product) ProtoMessage() {} + +func (x *Product) ProtoReflect() protoreflect.Message { + mi := &file_product_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Product.ProtoReflect.Descriptor instead. +func (*Product) Descriptor() ([]byte, []int) { + return file_product_proto_rawDescGZIP(), []int{0} +} + +func (x *Product) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +type ProductId struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *ProductId) Reset() { + *x = ProductId{} + if protoimpl.UnsafeEnabled { + mi := &file_product_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ProductId) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProductId) ProtoMessage() {} + +func (x *ProductId) ProtoReflect() protoreflect.Message { + mi := &file_product_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProductId.ProtoReflect.Descriptor instead. +func (*ProductId) Descriptor() ([]byte, []int) { + return file_product_proto_rawDescGZIP(), []int{1} +} + +func (x *ProductId) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +var File_product_proto protoreflect.FileDescriptor + +var file_product_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x07, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x22, 0x19, 0x0a, 0x07, 0x50, 0x72, 0x6f, 0x64, + 0x75, 0x63, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x22, 0x21, 0x0a, 0x09, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x49, 0x64, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x75, 0x0a, 0x0b, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, + 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x32, 0x0a, 0x0a, 0x61, 0x64, 0x64, 0x50, 0x72, 0x6f, 0x64, + 0x75, 0x63, 0x74, 0x12, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x2e, 0x50, 0x72, + 0x6f, 0x64, 0x75, 0x63, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x2e, + 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x49, 0x64, 0x12, 0x32, 0x0a, 0x0a, 0x67, 0x65, 0x74, + 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, + 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x49, 0x64, 0x1a, 0x10, 0x2e, 0x70, 0x72, + 0x6f, 0x64, 0x75, 0x63, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x42, 0x56, 0x0a, + 0x1b, 0x69, 0x6f, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, + 0x73, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x42, 0x0f, 0x48, 0x65, + 0x6c, 0x6c, 0x6f, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, + 0x24, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x73, 0x61, 0x67, 0x61, 0x2f, 0x65, + 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x2f, 0x69, 0x6e, 0x76, 0x6f, 0x6b, 0x65, 0x72, 0x2f, 0x70, 0x72, + 0x6f, 0x64, 0x75, 0x63, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_product_proto_rawDescOnce sync.Once + file_product_proto_rawDescData = file_product_proto_rawDesc +) + +func file_product_proto_rawDescGZIP() []byte { + file_product_proto_rawDescOnce.Do(func() { + file_product_proto_rawDescData = protoimpl.X.CompressGZIP(file_product_proto_rawDescData) + }) + return file_product_proto_rawDescData +} + +var file_product_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_product_proto_goTypes = []interface{}{ + (*Product)(nil), // 0: product.Product + (*ProductId)(nil), // 1: product.ProductId +} +var file_product_proto_depIdxs = []int32{ + 0, // 0: product.ProductInfo.addProduct:input_type -> product.Product + 1, // 1: product.ProductInfo.getProduct:input_type -> product.ProductId + 1, // 2: product.ProductInfo.addProduct:output_type -> product.ProductId + 0, // 3: product.ProductInfo.getProduct:output_type -> product.Product + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_product_proto_init() } +func file_product_proto_init() { + if File_product_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_product_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Product); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_product_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ProductId); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_product_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_product_proto_goTypes, + DependencyIndexes: file_product_proto_depIdxs, + MessageInfos: file_product_proto_msgTypes, + }.Build() + File_product_proto = out.File + file_product_proto_rawDesc = nil + file_product_proto_goTypes = nil + file_product_proto_depIdxs = nil +} diff --git a/testdata/saga/engine/invoker/grpc/product.proto b/testdata/saga/engine/invoker/grpc/product.proto new file mode 100644 index 000000000..230f70f6d --- /dev/null +++ b/testdata/saga/engine/invoker/grpc/product.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; +package product; + +option go_package = "testdata/saga/engine/invoker/product"; +option java_multiple_files = true; +option java_package = "io.grpc.examples.helloworld"; +option java_outer_classname = "HelloWorldProto"; + +service ProductInfo { + rpc addProduct(Product) returns (ProductId); + rpc getProduct(ProductId) returns (Product); +} + +message Product { + string id = 1; +} + +message ProductId { + string value = 1; +} \ No newline at end of file diff --git a/testdata/saga/engine/invoker/grpc/product_grpc.pb.go b/testdata/saga/engine/invoker/grpc/product_grpc.pb.go new file mode 100644 index 000000000..9ea290fc9 --- /dev/null +++ b/testdata/saga/engine/invoker/grpc/product_grpc.pb.go @@ -0,0 +1,150 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.25.3 +// source: product.proto + +package product + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + ProductInfo_AddProduct_FullMethodName = "/product.ProductInfo/addProduct" + ProductInfo_GetProduct_FullMethodName = "/product.ProductInfo/getProduct" +) + +// ProductInfoClient is the client API for ProductInfo service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ProductInfoClient interface { + // 添加商品 + AddProduct(ctx context.Context, in *Product, opts ...grpc.CallOption) (*ProductId, error) + // 获取商品 + GetProduct(ctx context.Context, in *ProductId, opts ...grpc.CallOption) (*Product, error) +} + +type productInfoClient struct { + cc grpc.ClientConnInterface +} + +func NewProductInfoClient(cc grpc.ClientConnInterface) ProductInfoClient { + return &productInfoClient{cc} +} + +func (c *productInfoClient) AddProduct(ctx context.Context, in *Product, opts ...grpc.CallOption) (*ProductId, error) { + out := new(ProductId) + err := c.cc.Invoke(ctx, ProductInfo_AddProduct_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *productInfoClient) GetProduct(ctx context.Context, in *ProductId, opts ...grpc.CallOption) (*Product, error) { + out := new(Product) + err := c.cc.Invoke(ctx, ProductInfo_GetProduct_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ProductInfoServer is the server API for ProductInfo service. +// All implementations must embed UnimplementedProductInfoServer +// for forward compatibility +type ProductInfoServer interface { + // 添加商品 + AddProduct(context.Context, *Product) (*ProductId, error) + // 获取商品 + GetProduct(context.Context, *ProductId) (*Product, error) + mustEmbedUnimplementedProductInfoServer() +} + +// UnimplementedProductInfoServer must be embedded to have forward compatible implementations. +type UnimplementedProductInfoServer struct { +} + +func (UnimplementedProductInfoServer) AddProduct(context.Context, *Product) (*ProductId, error) { + return nil, status.Errorf(codes.Unimplemented, "method AddProduct not implemented") +} +func (UnimplementedProductInfoServer) GetProduct(context.Context, *ProductId) (*Product, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetProduct not implemented") +} +func (UnimplementedProductInfoServer) mustEmbedUnimplementedProductInfoServer() {} + +// UnsafeProductInfoServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ProductInfoServer will +// result in compilation errors. +type UnsafeProductInfoServer interface { + mustEmbedUnimplementedProductInfoServer() +} + +func RegisterProductInfoServer(s grpc.ServiceRegistrar, srv ProductInfoServer) { + s.RegisterService(&ProductInfo_ServiceDesc, srv) +} + +func _ProductInfo_AddProduct_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Product) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ProductInfoServer).AddProduct(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ProductInfo_AddProduct_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ProductInfoServer).AddProduct(ctx, req.(*Product)) + } + return interceptor(ctx, in, info, handler) +} + +func _ProductInfo_GetProduct_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ProductId) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ProductInfoServer).GetProduct(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ProductInfo_GetProduct_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ProductInfoServer).GetProduct(ctx, req.(*ProductId)) + } + return interceptor(ctx, in, info, handler) +} + +// ProductInfo_ServiceDesc is the grpc.ServiceDesc for ProductInfo service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var ProductInfo_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "product.ProductInfo", + HandlerType: (*ProductInfoServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "addProduct", + Handler: _ProductInfo_AddProduct_Handler, + }, + { + MethodName: "getProduct", + Handler: _ProductInfo_GetProduct_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "product.proto", +}