-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patheb.go
68 lines (57 loc) · 1.76 KB
/
eb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// This EventBus was borrowed from https://levelup.gitconnected.com/lets-write-a-simple-event-bus-in-go-79b9480d8997
type DataEvent struct {
Data string
Topic string
}
// DataChannel is a channel which can accept an DataEvent
type DataChannel chan DataEvent
// DataChannelSlice is a slice of DataChannels
type DataChannelSlice []DataChannel
// EventBus stores the information about subscribers interested for a particular topic
type EventBus struct {
subscribers map[string]DataChannelSlice
rm sync.RWMutex
}
func (eb *EventBus) Publish(topic string, data string) {
eb.rm.RLock()
if chans, found := eb.subscribers[topic]; found {
// this is done because the slices refer to same array even though they are passed by value
// thus we are creating a new slice with our elements thus preserve locking correctly.
// special thanks for /u/freesid who pointed it out
channels := append(DataChannelSlice{}, chans...)
go func(data DataEvent, dataChannelSlices DataChannelSlice) {
for _, ch := range dataChannelSlices {
ch <- data
}
}(DataEvent{Data: data, Topic: topic}, channels)
}
eb.rm.RUnlock()
}
func (eb *EventBus) Subscribe(topic string, ch DataChannel) {
eb.rm.Lock()
if prev, found := eb.subscribers[topic]; found {
eb.subscribers[topic] = append(prev, ch)
} else {
eb.subscribers[topic] = append([]DataChannel{}, ch)
}
eb.rm.Unlock()
}
var eb = &EventBus{
subscribers: map[string]DataChannelSlice{},
}
func printDataEvent(ch string, data DataEvent) {
fmt.Printf("Channel: %s; Topic: %s; DataEvent: %v\n", ch, data.Topic, data.Data)
}
func publishTo(topic string, data string) {
for {
eb.Publish(topic, data)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
}