diff --git a/go.mod b/go.mod index 79bfe789..c5e948f6 100644 --- a/go.mod +++ b/go.mod @@ -20,9 +20,9 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect - golang.org/x/net v0.15.0 // indirect - golang.org/x/sys v0.12.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/net v0.19.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect ) diff --git a/go.sum b/go.sum index 05f510a7..9062f3bb 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,8 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= -golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -45,13 +45,13 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= diff --git a/service/common/type.go b/service/common/type.go index 1883b3c3..9ea00cad 100644 --- a/service/common/type.go +++ b/service/common/type.go @@ -45,6 +45,8 @@ type TopicEvent struct { Topic string `json:"topic"` // PubsubName is name of the pub/sub this message came from PubsubName string `json:"pubsubname"` + // Metadata is the custom metadata attached to the event. + Metadata map[string]string `json:"metadata,omitempty"` } func (e *TopicEvent) Struct(target interface{}) error { diff --git a/service/grpc/topic.go b/service/grpc/topic.go index f749de52..6c20d821 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -22,6 +22,7 @@ import ( "strings" "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc/metadata" runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/go-sdk/service/common" @@ -127,6 +128,7 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq RawData: in.GetData(), Topic: in.GetTopic(), PubsubName: in.GetPubsubName(), + Metadata: getCustomMetadataFromContext(ctx), } h := sub.DefaultHandler if in.GetPath() != "" { @@ -154,3 +156,16 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq in.GetPubsubName(), in.GetTopic(), ) } + +func getCustomMetadataFromContext(ctx context.Context) map[string]string { + md := make(map[string]string) + meta, ok := metadata.FromIncomingContext(ctx) + if ok { + for k, v := range meta { + if strings.HasPrefix(strings.ToLower(k), "metadata.") { + md[k[9:]] = v[0] + } + } + } + return md +} diff --git a/service/grpc/topic_test.go b/service/grpc/topic_test.go index 3c66c6c0..7e9175bc 100644 --- a/service/grpc/topic_test.go +++ b/service/grpc/topic_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" "github.com/golang/protobuf/ptypes/empty" "github.com/stretchr/testify/assert" @@ -136,6 +137,32 @@ func TestTopic(t *testing.T) { require.NoError(t, err) }) + t.Run("topic event for valid topic with metadata", func(t *testing.T) { + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test2", + } + err := server.AddTopicEventHandler(sub2, func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + assert.Equal(t, "value1", e.Metadata["key1"]) + return false, nil + }) + require.NoError(t, err) + + in := &runtime.TopicEventRequest{ + Id: "a123", + Source: "test", + Type: "test", + SpecVersion: "v1.0", + DataContentType: "text/plain", + Data: []byte("test"), + Topic: sub2.Topic, + PubsubName: sub2.PubsubName, + } + ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"Metadata.key1": "value1"})) + _, err = server.OnTopicEvent(ctx, in) + require.NoError(t, err) + }) + stopTestServer(t, server) } diff --git a/service/http/topic.go b/service/http/topic.go index 74376a89..fa6a85e5 100644 --- a/service/http/topic.go +++ b/service/http/topic.go @@ -19,6 +19,7 @@ import ( "errors" "io" "net/http" + "strings" "github.com/go-chi/chi/v5" @@ -278,6 +279,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE Subject: in.Subject, PubsubName: in.PubsubName, Topic: in.Topic, + Metadata: getCustomMetdataFromHeaders(r), } w.Header().Add("Content-Type", "application/json") @@ -301,6 +303,16 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE return nil } +func getCustomMetdataFromHeaders(r *http.Request) map[string]string { + md := make(map[string]string) + for k, v := range r.Header { + if strings.HasPrefix(strings.ToLower(k), "metadata.") { + md[k[9:]] = v[0] + } + } + return md +} + func writeStatus(w http.ResponseWriter, s string) { status := &common.SubscriptionResponse{Status: s} if err := json.NewEncoder(w).Encode(status); err != nil { diff --git a/service/http/topic_test.go b/service/http/topic_test.go index 6fa30a3c..596b67fc 100644 --- a/service/http/topic_test.go +++ b/service/http/topic_test.go @@ -148,8 +148,8 @@ func TestEventHandler(t *testing.T) { func TestEventDataHandling(t *testing.T) { tests := map[string]struct { - data string - result interface{} + data string + expectedData interface{} }{ "JSON nested": { data: `{ @@ -166,7 +166,7 @@ func TestEventDataHandling(t *testing.T) { "message":"hello" } }`, - result: map[string]interface{}{ + expectedData: map[string]interface{}{ "message": "hello", }, }, @@ -183,7 +183,7 @@ func TestEventDataHandling(t *testing.T) { "datacontenttype" : "application/json", "data" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" }`, - result: map[string]interface{}{ + expectedData: map[string]interface{}{ "message": "hello", }, }, @@ -200,7 +200,7 @@ func TestEventDataHandling(t *testing.T) { "datacontenttype" : "application/json", "data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" }`, - result: map[string]interface{}{ + expectedData: map[string]interface{}{ "message": "hello", }, }, @@ -217,7 +217,7 @@ func TestEventDataHandling(t *testing.T) { "datacontenttype" : "application/octet-stream", "data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" }`, - result: []byte(`{"message":"hello"}`), + expectedData: []byte(`{"message":"hello"}`), }, "JSON string escaped": { data: `{ @@ -232,7 +232,7 @@ func TestEventDataHandling(t *testing.T) { "datacontenttype" : "application/json", "data" : "{\"message\":\"hello\"}" }`, - result: map[string]interface{}{ + expectedData: map[string]interface{}{ "message": "hello", }, }, @@ -264,7 +264,85 @@ func TestEventDataHandling(t *testing.T) { t.Run(name, func(t *testing.T) { makeEventRequest(t, s, "/test", tt.data, http.StatusOK) <-recv - assert.Equal(t, tt.result, topicEvent.Data) + assert.Equal(t, tt.expectedData, topicEvent.Data) + }) + } +} + +func TestEventMetadataHandling(t *testing.T) { + tests := map[string]struct { + metadata map[string]string + expectedMetadata map[string]string + }{ + "single key-value pair with prefix": { + metadata: map[string]string{ + "metadata.key1": "value1", + }, + expectedMetadata: map[string]string{ + "key1": "value1", + }, + }, + "multiple key-value pairs with prefix": { + metadata: map[string]string{ + "metadata.key1": "value1", + "metadata.key2": "value2", + }, + expectedMetadata: map[string]string{ + "key1": "value1", + "key2": "value2", + }, + }, + "some keys with prefix and some without": { + metadata: map[string]string{ + "metadata.key1": "value1", + "key2": "value2", + }, + expectedMetadata: map[string]string{ + "key1": "value1", + }, + }, + } + + s := newServer("", nil) + + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/test", + Metadata: map[string]string{}, + } + + recv := make(chan struct{}, 1) + var topicEvent *common.TopicEvent + handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + topicEvent = e + recv <- struct{}{} + + return false, nil + } + err := s.AddTopicEventHandler(sub, handler) + require.NoErrorf(t, err, "error adding event handler") + + s.registerBaseHandler() + + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + makeEventRequestWithMetadata(t, s, "/test", `{ + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : { + "message":"hello" + } + }`, http.StatusOK, tt.metadata) + <-recv + assert.Equal(t, tt.expectedMetadata, topicEvent.Metadata) }) } } @@ -357,6 +435,18 @@ func makeEventRequest(t *testing.T, s *Server, route, data string, expectedStatu testRequest(t, s, req, expectedStatusCode) } +func makeEventRequestWithMetadata(t *testing.T, s *Server, route, data string, expectedStatusCode int, metadata map[string]string) { + t.Helper() + + req, err := http.NewRequest(http.MethodPost, route, strings.NewReader(data)) + require.NoErrorf(t, err, "error creating request: %s", data) + req.Header.Set("Content-Type", "application/json") + for k, v := range metadata { + req.Header.Set(k, v) + } + testRequest(t, s, req, expectedStatusCode) +} + func TestAddingInvalidEventHandlers(t *testing.T) { s := newServer("", nil) err := s.AddTopicEventHandler(nil, testTopicFunc)