Skip to content

Commit fdd425a

Browse files
committed
bulksub success
Signed-off-by: sadath-12 <[email protected]>
1 parent f19a505 commit fdd425a

File tree

5 files changed

+135
-92
lines changed

5 files changed

+135
-92
lines changed

Diff for: examples/pubsub/pub/pub.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ var (
2929

3030
func main() {
3131
ctx := context.Background()
32-
// publishEventData := []byte("ping")
32+
publishEventData := []byte("ping")
3333
publishEventsData := []interface{}{"multi-ping", "multi-pong"}
3434

3535
client, err := dapr.NewClient()
@@ -39,9 +39,9 @@ func main() {
3939
defer client.Close()
4040

4141
// Publish a single event
42-
// if err := client.PublishEvent(ctx, pubsubName, topicName, publishEventData); err != nil {
43-
// panic(err)
44-
// }
42+
if err := client.PublishEvent(ctx, pubsubName, topicName, publishEventData); err != nil {
43+
panic(err)
44+
}
4545

4646
// Publish multiple events
4747
if res := client.PublishEvents(ctx, pubsubName, topicName, publishEventsData); res.Error != nil {

Diff for: examples/pubsub/sub/bulksub.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package main
15+
16+
import (
17+
"context"
18+
"log"
19+
"net/http"
20+
21+
"github.com/dapr/go-sdk/service/common"
22+
daprd "github.com/dapr/go-sdk/service/http"
23+
)
24+
25+
26+
var defaultSubscription = &common.Subscription{
27+
PubsubName: "messages",
28+
Topic: "neworder",
29+
Route: "/orders",
30+
}
31+
32+
var importantSubscription = &common.Subscription{
33+
PubsubName: "messages",
34+
Topic: "neworder",
35+
Route: "/important",
36+
Match: `event.type == "important"`,
37+
Priority: 1,
38+
}
39+
40+
func main() {
41+
s := daprd.NewService(":8080")
42+
43+
if err := s.AddBulkTopicEventHandler(defaultSubscription, bulkeventHandler,10,1000); err != nil {
44+
log.Fatalf("error adding topic subscription: %v", err)
45+
}
46+
47+
if err := s.Start(); err != nil && err != http.ErrServerClosed {
48+
log.Fatalf("error listenning: %v", err)
49+
}
50+
}
51+
52+
func bulkeventHandler(ctx context.Context, e []common.TopicEvent) (retry bool, err error) {
53+
for _, event := range e {
54+
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", event.PubsubName, event.Topic, event.ID, event.Data)
55+
}
56+
return false, nil
57+
}
58+
59+
func importantEventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
60+
log.Printf("important event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
61+
return false, nil
62+
}

Diff for: examples/pubsub/sub/sub.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,7 @@ var importantSubscription = &common.Subscription{
4646
func main() {
4747
s := daprd.NewService(":8080")
4848

49-
// if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil {
50-
// log.Fatalf("error adding topic subscription: %v", err)
51-
// }
52-
53-
54-
55-
if err := s.AddBulkTopicEventHandler(defaultSubscription, bulkeventHandler,10,1000); err != nil {
49+
if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil {
5650
log.Fatalf("error adding topic subscription: %v", err)
5751
}
5852

Diff for: service/common/type.go

-19
Original file line numberDiff line numberDiff line change
@@ -47,25 +47,6 @@ type TopicEvent struct {
4747
PubsubName string `json:"pubsubname"`
4848
}
4949

50-
type BulkTopic struct {
51-
ContentType string `json:"contentType"`
52-
EntryID string `json:"entryId"`
53-
Event map[string]string `json:"event"`
54-
RawData []byte `json:"-"`
55-
Data interface{} `json:"data"`
56-
DataContentType string `json:"datacontenttype"`
57-
ID string `json:"id"`
58-
PubsubName string `json:"pubsubname"`
59-
Source string `json:"source"`
60-
SpecVersion string `json:"specversion"`
61-
Time string `json:"time"`
62-
Topic string `json:"topic"`
63-
TraceID string `json:"traceid"`
64-
TraceParent string `json:"traceparent"`
65-
TraceState string `json:"tracestate"`
66-
Metadata interface{} `json:"metadata"`
67-
}
68-
6950
func (e *TopicEvent) Struct(target interface{}) error {
7051
// TODO: Enhance to inspect DataContentType for the best
7152
// deserialization method.

Diff for: service/http/topic.go

+68-62
Original file line numberDiff line numberDiff line change
@@ -68,23 +68,73 @@ type topicEventJSON struct {
6868
PubsubName string `json:"pubsubname"`
6969
}
7070

71-
type status string
71+
func (in topicEventJSON) getData() (data any, rawData []byte) {
72+
var (
73+
err error
74+
v any
75+
)
76+
if len(in.Data) > 0 {
77+
rawData = []byte(in.Data)
78+
data = rawData
79+
// We can assume that rawData is valid JSON
80+
// without checking in.DataContentType == "application/json".
81+
if err = json.Unmarshal(rawData, &v); err == nil {
82+
data = v
83+
// Handling of JSON base64 encoded or escaped in a string.
84+
if str, ok := v.(string); ok {
85+
// This is the path that will most likely succeed.
86+
var (
87+
vString any
88+
decoded []byte
89+
)
90+
if err = json.Unmarshal([]byte(str), &vString); err == nil {
91+
data = vString
92+
} else if decoded, err = base64.StdEncoding.DecodeString(str); err == nil {
93+
// Decoded Base64 encoded JSON does not seem to be in the spec
94+
// but it is in existing unit tests so this handles that case.
95+
var vBase64 any
96+
if err = json.Unmarshal(decoded, &vBase64); err == nil {
97+
data = vBase64
98+
}
99+
}
100+
}
101+
}
102+
} else if in.DataBase64 != "" {
103+
rawData, err = base64.StdEncoding.DecodeString(in.DataBase64)
104+
if err == nil {
105+
data = rawData
106+
if in.DataContentType == "application/json" {
107+
if err = json.Unmarshal(rawData, &v); err == nil {
108+
data = v
109+
}
110+
}
111+
}
112+
}
113+
114+
return data, rawData
115+
}
116+
117+
type AppResponseStatus string
72118

73119
const (
74-
// SubscriptionResponseStatusSuccess indicates that the subscription event was processed successfully.
75-
SubscriptionResponseStatusSuccess status = "SUCCESS"
120+
// Success means the message is received and processed correctly.
121+
Success AppResponseStatus = "SUCCESS"
122+
// Retry means the message is received but could not be processed and must be retried.
123+
Retry AppResponseStatus = "RETRY"
124+
// Drop means the message is received but should not be processed.
125+
Drop AppResponseStatus = "DROP"
76126
)
77127

78128
type BulkSubscribeResponseEntry struct {
79129
// The id of the bulk subscribe entry
80-
entryId string
130+
EntryId string `json:"entryId"`
81131

82132
// The response status of the bulk subscribe entry
83-
status status
133+
Status AppResponseStatus `json:"status"`
84134
}
85135

86136
type BulkSubscribeResponse struct {
87-
statuses []BulkSubscribeResponseEntry
137+
Statuses []BulkSubscribeResponseEntry `json:"statuses"`
88138
}
89139

90140
func (s *Server) registerBaseHandler() {
@@ -275,52 +325,6 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
275325
return nil
276326
}
277327

278-
func (in topicEventJSON) getData() (data any, rawData []byte) {
279-
var (
280-
err error
281-
v any
282-
)
283-
if len(in.Data) > 0 {
284-
rawData = []byte(in.Data)
285-
data = rawData
286-
// We can assume that rawData is valid JSON
287-
// without checking in.DataContentType == "application/json".
288-
if err = json.Unmarshal(rawData, &v); err == nil {
289-
data = v
290-
// Handling of JSON base64 encoded or escaped in a string.
291-
if str, ok := v.(string); ok {
292-
// This is the path that will most likely succeed.
293-
var (
294-
vString any
295-
decoded []byte
296-
)
297-
if err = json.Unmarshal([]byte(str), &vString); err == nil {
298-
data = vString
299-
} else if decoded, err = base64.StdEncoding.DecodeString(str); err == nil {
300-
// Decoded Base64 encoded JSON does not seem to be in the spec
301-
// but it is in existing unit tests so this handles that case.
302-
var vBase64 any
303-
if err = json.Unmarshal(decoded, &vBase64); err == nil {
304-
data = vBase64
305-
}
306-
}
307-
}
308-
}
309-
} else if in.DataBase64 != "" {
310-
rawData, err = base64.StdEncoding.DecodeString(in.DataBase64)
311-
if err == nil {
312-
data = rawData
313-
if in.DataContentType == "application/json" {
314-
if err = json.Unmarshal(rawData, &v); err == nil {
315-
data = v
316-
}
317-
}
318-
}
319-
}
320-
321-
return data, rawData
322-
}
323-
324328
type BulkSubscribeMessageItem struct {
325329
EntryId string `json:"entryId"` //nolint:stylecheck
326330
Event interface{} `json:"event"`
@@ -402,8 +406,8 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
402406
data, rawData := in.getData()
403407

404408
statuses = append(statuses, BulkSubscribeResponseEntry{
405-
entryId: in.ID,
406-
status: SubscriptionResponseStatusSuccess,
409+
EntryId: entry.EntryId,
410+
Status: Success,
407411
},
408412
)
409413

@@ -424,30 +428,26 @@ func (s *Server) AddBulkTopicEventHandler(sub *common.Subscription, fn common.Bu
424428
messages = append(messages, te)
425429
}
426430
resp := BulkSubscribeResponse{
427-
statuses: statuses,
431+
Statuses: statuses,
428432
}
429433
if err != nil {
430434
http.Error(w, err.Error(), PubSubHandlerDropStatusCode)
431435
return
432436
}
433437
w.Header().Add("Content-Type", "application/json")
434-
if err := json.NewEncoder(w).Encode(resp); err != nil {
435-
http.Error(w, err.Error(), http.StatusInternalServerError)
436-
return
437-
}
438438
w.WriteHeader(http.StatusOK)
439439

440440
retry, err := fn(r.Context(), messages)
441441
if err == nil {
442-
writeStatus(w, common.SubscriptionResponseStatusSuccess)
442+
writeBulkStatus(w, resp)
443443
return
444444
}
445445

446446
if retry {
447-
writeStatus(w, common.SubscriptionResponseStatusRetry)
447+
writeBulkStatus(w, resp)
448448
return
449449
}
450-
writeStatus(w, common.SubscriptionResponseStatusDrop)
450+
writeBulkStatus(w, resp)
451451
})))
452452

453453
return nil
@@ -459,3 +459,9 @@ func writeStatus(w http.ResponseWriter, s string) {
459459
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode)
460460
}
461461
}
462+
463+
func writeBulkStatus(w http.ResponseWriter, s BulkSubscribeResponse) {
464+
if err := json.NewEncoder(w).Encode(s); err != nil {
465+
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode)
466+
}
467+
}

0 commit comments

Comments
 (0)