-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsubscriber.go
74 lines (57 loc) · 1.88 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package cloudpubsub
import (
"context"
"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"github.com/wantedly/subee"
"google.golang.org/api/option"
)
type subscriberImpl struct {
*Config
subscription *pubsub.Subscription
}
// CreateSubscriber returns Subscriber implementation.
func CreateSubscriber(ctx context.Context, projectID, subscriptionID string, opts ...Option) (subee.Subscriber, error) {
cfg := new(Config)
cfg.apply(opts)
sub := &subscriberImpl{
Config: cfg,
}
c, err := createPubSubClient(ctx, projectID, sub.ClientOpts...)
if err != nil {
return nil, errors.Wrap(err, "failed to create Google Cloud Pub/Sub client")
}
pubsubSub, err := createPubSubSubscription(ctx, subscriptionID, c)
if err != nil {
return nil, errors.Wrap(err, "failed to create Google Cloud Pub/Sub subscription")
}
sub.subscription = pubsubSub
return sub, nil
}
func createPubSubClient(ctx context.Context, projectID string, ops ...option.ClientOption) (*pubsub.Client, error) {
if len(projectID) == 0 {
return nil, errors.New("missing google project id")
}
c, err := pubsub.NewClient(ctx, projectID, ops...)
if err != nil {
return nil, errors.Wrap(err, "failed to create pub/sub client")
}
return c, nil
}
func createPubSubSubscription(ctx context.Context, subscriptionID string, c *pubsub.Client) (*pubsub.Subscription, error) {
pubsubSub := c.Subscription(subscriptionID)
ok, err := pubsubSub.Exists(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to report whether the subscription exists on the server")
}
if !ok {
return nil, errors.New("failed to get pub/sub subscription. Check subscription presence")
}
return pubsubSub, nil
}
func (r *subscriberImpl) Subscribe(ctx context.Context, f func(subee.Message)) error {
err := r.subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
f(&Message{m})
})
return errors.WithStack(err)
}