Skip to content

Commit 0136b4a

Browse files
authored
Merge branch 'main' into feat-dist-scheduler
Signed-off-by: Mike Nguyen <[email protected]>
2 parents 2d1814e + b7b90e3 commit 0136b4a

File tree

10 files changed

+473
-6
lines changed

10 files changed

+473
-6
lines changed

.github/workflows/validate_examples.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ jobs:
166166
"grpc-service",
167167
"hello-world",
168168
"pubsub",
169+
"bidipubsub",
169170
"service",
170171
"socket",
171172
"workflow",

client/client.go

+8
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,14 @@ type Client interface {
162162
// UnsubscribeConfigurationItems can stop the subscription with target store's and id
163163
UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error
164164

165+
// Subscribe subscribes to a pubsub topic and streams messages to the returned Subscription.
166+
// Subscription must be closed after finishing with subscribing.
167+
Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error)
168+
169+
// SubscribeWithHandler subscribes to a pubsub topic and calls the given handler on topic events.
170+
// The returned cancel function must be called after finishing with subscribing.
171+
SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error)
172+
165173
// DeleteBulkState deletes content for multiple keys from store.
166174
DeleteBulkState(ctx context.Context, storeName string, keys []string, meta map[string]string) error
167175

client/state.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,8 @@ func (c *GRPCClient) GetBulkState(ctx context.Context, storeName string, keys []
382382

383383
// GetState retrieves state from specific store using default consistency option.
384384
func (c *GRPCClient) GetState(ctx context.Context, storeName, key string, meta map[string]string) (item *StateItem, err error) {
385-
return c.GetStateWithConsistency(ctx, storeName, key, meta, StateConsistencyStrong)
385+
i, err := c.GetStateWithConsistency(ctx, storeName, key, meta, StateConsistencyStrong)
386+
return i, err
386387
}
387388

388389
// GetStateWithConsistency retrieves state from specific store using provided state consistency.

client/subscribe.go

+220
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
/*
2+
Copyright 2024 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package client
15+
16+
import (
17+
"context"
18+
"encoding/json"
19+
"errors"
20+
"fmt"
21+
"mime"
22+
"strings"
23+
"sync"
24+
"sync/atomic"
25+
26+
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
27+
"github.com/dapr/go-sdk/service/common"
28+
)
29+
30+
type SubscriptionHandleFunction func(event *common.TopicEvent) common.SubscriptionResponseStatus
31+
32+
type SubscriptionOptions struct {
33+
PubsubName string
34+
Topic string
35+
DeadLetterTopic *string
36+
Metadata map[string]string
37+
}
38+
39+
type Subscription struct {
40+
stream pb.Dapr_SubscribeTopicEventsAlpha1Client
41+
// lock locks concurrent writes to subscription stream.
42+
lock sync.Mutex
43+
closed atomic.Bool
44+
}
45+
46+
type SubscriptionMessage struct {
47+
*common.TopicEvent
48+
sub *Subscription
49+
}
50+
51+
func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error) {
52+
stream, err := c.subscribeInitialRequest(ctx, opts)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
return &Subscription{
58+
stream: stream,
59+
}, nil
60+
}
61+
62+
func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) {
63+
s, err := c.Subscribe(ctx, opts)
64+
if err != nil {
65+
return nil, err
66+
}
67+
68+
go func() {
69+
defer s.Close()
70+
71+
for {
72+
msg, err := s.Receive()
73+
if err != nil {
74+
if !s.closed.Load() {
75+
logger.Printf("Error receiving messages from subscription pubsub=%s topic=%s, closing subscription: %s",
76+
opts.PubsubName, opts.Topic, err)
77+
}
78+
return
79+
}
80+
81+
go func() {
82+
if err := msg.respondStatus(handler(msg.TopicEvent)); err != nil {
83+
logger.Printf("Error responding to topic with event status pubsub=%s topic=%s message_id=%s: %s",
84+
opts.PubsubName, opts.Topic, msg.ID, err)
85+
}
86+
}()
87+
}
88+
}()
89+
90+
return s.Close, nil
91+
}
92+
93+
func (s *Subscription) Close() error {
94+
if !s.closed.CompareAndSwap(false, true) {
95+
return errors.New("subscription already closed")
96+
}
97+
98+
return s.stream.CloseSend()
99+
}
100+
101+
func (s *Subscription) Receive() (*SubscriptionMessage, error) {
102+
event, err := s.stream.Recv()
103+
if err != nil {
104+
return nil, err
105+
}
106+
107+
data := any(event.GetData())
108+
if len(event.GetData()) > 0 {
109+
mediaType, _, err := mime.ParseMediaType(event.GetDataContentType())
110+
if err == nil {
111+
var v interface{}
112+
switch mediaType {
113+
case "application/json":
114+
if err := json.Unmarshal(event.GetData(), &v); err == nil {
115+
data = v
116+
}
117+
case "text/plain":
118+
// Assume UTF-8 encoded string.
119+
data = string(event.GetData())
120+
default:
121+
if strings.HasPrefix(mediaType, "application/") &&
122+
strings.HasSuffix(mediaType, "+json") {
123+
if err := json.Unmarshal(event.GetData(), &v); err == nil {
124+
data = v
125+
}
126+
}
127+
}
128+
}
129+
}
130+
131+
topicEvent := &common.TopicEvent{
132+
ID: event.GetId(),
133+
Source: event.GetSource(),
134+
Type: event.GetType(),
135+
SpecVersion: event.GetSpecVersion(),
136+
DataContentType: event.GetDataContentType(),
137+
Data: data,
138+
RawData: event.GetData(),
139+
Topic: event.GetTopic(),
140+
PubsubName: event.GetPubsubName(),
141+
}
142+
143+
return &SubscriptionMessage{
144+
sub: s,
145+
TopicEvent: topicEvent,
146+
}, nil
147+
}
148+
149+
func (s *SubscriptionMessage) Success() error {
150+
return s.respond(pb.TopicEventResponse_SUCCESS)
151+
}
152+
153+
func (s *SubscriptionMessage) Retry() error {
154+
return s.respond(pb.TopicEventResponse_RETRY)
155+
}
156+
157+
func (s *SubscriptionMessage) Drop() error {
158+
return s.respond(pb.TopicEventResponse_DROP)
159+
}
160+
161+
func (s *SubscriptionMessage) respondStatus(status common.SubscriptionResponseStatus) error {
162+
var statuspb pb.TopicEventResponse_TopicEventResponseStatus
163+
switch status {
164+
case common.SubscriptionResponseStatusSuccess:
165+
statuspb = pb.TopicEventResponse_SUCCESS
166+
case common.SubscriptionResponseStatusRetry:
167+
statuspb = pb.TopicEventResponse_RETRY
168+
case common.SubscriptionResponseStatusDrop:
169+
statuspb = pb.TopicEventResponse_DROP
170+
default:
171+
return fmt.Errorf("unknown status, expected one of %s, %s, %s: %s",
172+
common.SubscriptionResponseStatusSuccess, common.SubscriptionResponseStatusRetry,
173+
common.SubscriptionResponseStatusDrop, status)
174+
}
175+
176+
return s.respond(statuspb)
177+
}
178+
179+
func (s *SubscriptionMessage) respond(status pb.TopicEventResponse_TopicEventResponseStatus) error {
180+
s.sub.lock.Lock()
181+
defer s.sub.lock.Unlock()
182+
183+
return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
184+
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventResponse{
185+
EventResponse: &pb.SubscribeTopicEventsResponseAlpha1{
186+
Id: s.ID,
187+
Status: &pb.TopicEventResponse{Status: status},
188+
},
189+
},
190+
})
191+
}
192+
193+
func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts SubscriptionOptions) (pb.Dapr_SubscribeTopicEventsAlpha1Client, error) {
194+
if len(opts.PubsubName) == 0 {
195+
return nil, errors.New("pubsub name required")
196+
}
197+
198+
if len(opts.Topic) == 0 {
199+
return nil, errors.New("topic required")
200+
}
201+
202+
stream, err := c.protoClient.SubscribeTopicEventsAlpha1(ctx)
203+
if err != nil {
204+
return nil, err
205+
}
206+
207+
err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
208+
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{
209+
InitialRequest: &pb.SubscribeTopicEventsInitialRequestAlpha1{
210+
PubsubName: opts.PubsubName, Topic: opts.Topic,
211+
Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic,
212+
},
213+
},
214+
})
215+
if err != nil {
216+
return nil, errors.Join(err, stream.CloseSend())
217+
}
218+
219+
return stream, nil
220+
}

examples/bidipubsub/README.md

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Dapr PubSub Example with go-sdk
2+
3+
This folder contains two Go files that use the Go SDK to invoke the Dapr Pub/Sub API.
4+
5+
## Diagram
6+
7+
![](https://i.loli.net/2020/08/23/5MBYgwqCZcXNUf2.jpg)
8+
9+
## Step
10+
11+
### Prepare
12+
13+
- Dapr installed
14+
15+
### Run Subscriber Server
16+
17+
<!-- STEP
18+
name: Run Subscriber Server
19+
output_match_mode: substring
20+
match_order: none
21+
expected_stdout_lines:
22+
- 'event - PubsubName: messages, Topic: neworder'
23+
- 'event - PubsubName: messages, Topic: neworder'
24+
- 'event - PubsubName: messages, Topic: neworder'
25+
- 'event - PubsubName: messages, Topic: sendorder'
26+
- 'event - PubsubName: messages, Topic: sendorder'
27+
- 'event - PubsubName: messages, Topic: sendorder'
28+
expected_stderr_lines:
29+
background: true
30+
sleep: 15
31+
-->
32+
33+
```bash
34+
dapr run --app-id sub \
35+
--dapr-http-port 3500 \
36+
--log-level debug \
37+
--resources-path ./config \
38+
go run bidisub/bidisub.go
39+
```
40+
41+
<!-- END_STEP -->
42+
43+
### Run Publisher
44+
45+
<!-- STEP
46+
name: Run publisher
47+
output_match_mode: substring
48+
expected_stdout_lines:
49+
- 'sending message'
50+
- 'message published'
51+
- 'sending multiple messages'
52+
- 'multiple messages published'
53+
expected_stderr_lines:
54+
background: true
55+
sleep: 15
56+
-->
57+
58+
```bash
59+
dapr run --app-id pub \
60+
--log-level debug \
61+
--resources-path ./config \
62+
go run pub/pub.go
63+
```
64+
65+
<!-- END_STEP -->
66+
67+
## Result
68+
69+
```shell
70+
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: 82427280-1c18-4fab-b901-c7e68d295d31, Data: ping123
71+
```

0 commit comments

Comments
 (0)