Skip to content

Commit 2d22d8b

Browse files
committed
Adding a queue implementation using RedisStreams
- queue implementation still implements a pubsub interface, but uses RedisStreams with groups instead, which enables one subscriber in a group to receive each message, rather than all subscribers - Added some comments for pubsub
1 parent b53ffcc commit 2d22d8b

File tree

4 files changed

+114
-0
lines changed

4 files changed

+114
-0
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ require (
2727
golang.org/x/crypto v0.0.0-20190422183909-d864b10871cd
2828
golang.org/x/net v0.0.0-20190424024845-afe8014c977f // indirect
2929
golang.org/x/sys v0.0.0-20190516110030-61b9204099cb // indirect
30+
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522
3031
google.golang.org/appengine v1.5.0 // indirect
3132
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
3233
)

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w
111111
golang.org/x/sys v0.0.0-20190516110030-61b9204099cb h1:k07iPOt0d6nEnwXF+kHB+iEg+WSuKe/SOQuFM2QoD+E=
112112
golang.org/x/sys v0.0.0-20190516110030-61b9204099cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
113113
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
114+
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522 h1:bhOzK9QyoD0ogCnFro1m2mz41+Ib0oOhfJnBp5MR4K4=
115+
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
114116
google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4c=
115117
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
116118
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

stores/redis/pubsub.go

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package redis
22

33
import "github.com/go-redis/redis"
44

5+
// RedisPubSub implements the PublishSubscribe interface, but publishes to all
6+
// subscribers. All subscribers will receive the message.
57
type RedisPubSub struct {
68
client *redis.Client
79
}

stores/redis/queue.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package redis
2+
3+
import (
4+
"bytes"
5+
"encoding/gob"
6+
"fmt"
7+
"os"
8+
9+
"github.com/go-redis/redis"
10+
)
11+
12+
const defaultKey = "value" // a default key needed to use RedisStreams
13+
14+
// RedisQueue implements the PublishSubscribe interface, but only publishes to
15+
// one subscriber. Implemented by having Publishers push into a queue, and
16+
// Subscribers read from the queue and pop off from the top.
17+
type RedisQueue struct {
18+
client *redis.Client
19+
}
20+
21+
// NewRedisQueue creates a new queue and a group associated with that queue.
22+
// Underlying mecahnism uses Redis Streams.
23+
func NewRedisQueue(client *redis.Client, queueID, groupID string) *RedisQueue {
24+
_, err := client.XGroupCreateMkStream(queueID, groupID, "$").Result()
25+
if err != nil {
26+
fmt.Println("Error creating redis group stream")
27+
os.Exit(1)
28+
}
29+
return &RedisQueue{
30+
client: client,
31+
}
32+
}
33+
34+
// Subscribe creates a goroutine that subscribes to a RedisStream, based on
35+
// queueID, groupID, consumerID. Sends data values to a msg []byte channel.
36+
func (rq *RedisQueue) Subscribe(queueID, groupID, consumerID string, msg chan []byte) error {
37+
// Create Subscription
38+
39+
// Read from stram (do in loop)
40+
// XREADGROUP GROUP queueGROUP ConsumerID COUNT 1 STREAMS queueID >
41+
42+
// Acknowledge that message was processed
43+
// XACK queueID queueGROUP MSGID
44+
45+
go func() {
46+
args := &redis.XReadGroupArgs{
47+
Group: groupID,
48+
Consumer: consumerID,
49+
Streams: []string{queueID},
50+
Count: 1,
51+
Block: 0,
52+
NoAck: false,
53+
}
54+
55+
for {
56+
xstreams, err := rq.client.XReadGroup(args).Result()
57+
if err != nil {
58+
// handle error, prob by logging
59+
}
60+
xstream := xstreams[0] // only asking for one stream
61+
message := xstream.Messages[0] // asking for one message
62+
msgID := message.ID // retrieve message ID
63+
value, err := getBytes(message.Values["value"]) // retrieve value using defaultKey, transform to bytes
64+
if err != nil {
65+
// log gob decoding error
66+
}
67+
68+
msg <- value // send the value
69+
70+
// Ack the read
71+
_, err = rq.client.XAck(queueID, groupID, msgID).Result()
72+
if err != nil {
73+
// log ack error
74+
}
75+
}
76+
}()
77+
78+
return nil
79+
}
80+
81+
// Publish value to a queue, based on queueID.
82+
func (rq *RedisQueue) Publish(queueID string, value []byte) error {
83+
// XADD queueID * field value
84+
var m map[string]interface{}
85+
m["value"] = value
86+
args := &redis.XAddArgs{
87+
Stream: queueID,
88+
MaxLen: 1, // MAXLEN N
89+
MaxLenApprox: 1, // MAXLEN ~ N
90+
ID: "*",
91+
Values: m,
92+
}
93+
94+
err := rq.client.XAdd(args).Err()
95+
if err != nil {
96+
return err
97+
}
98+
return nil
99+
}
100+
101+
func getBytes(data interface{}) ([]byte, error) {
102+
var buf bytes.Buffer
103+
enc := gob.NewEncoder(&buf)
104+
err := enc.Encode(data)
105+
if err != nil {
106+
return nil, err
107+
}
108+
return buf.Bytes(), nil
109+
}

0 commit comments

Comments
 (0)