-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.go
116 lines (95 loc) · 2.75 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package gogo_kafka
import (
"context"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
type WorkerHandler func(key string, data []byte) error
type PanicHandler func(err interface{}, topic, key string, data []byte) error
type Worker struct {
topicNames []string
consumer *consumerSarama
client sarama.ConsumerGroup
ctx context.Context
cancel context.CancelFunc
config *Config
}
func New(config *Config, consumerGroup sarama.ConsumerGroup, retryManager RetryProcess) (*Worker, error) {
consumer := &consumerSarama{
ready: make(chan bool),
retryManager: retryManager,
handlers: map[string]WorkerHandler{},
}
ctx, cancel := context.WithCancel(context.Background())
return &Worker{
topicNames: []string{},
consumer: consumer,
ctx: ctx,
client: consumerGroup,
cancel: cancel,
config: config,
}, nil
}
func (worker *Worker) SetLogger(logger *log.Logger) {
sarama.Logger = logger
}
func (worker *Worker) SetPanicHandler(recover PanicHandler) {
worker.consumer.panicHandler = recover
}
func (worker *Worker) RegisterHandler(topicName string, handler WorkerHandler) {
worker.topicNames = append(worker.topicNames, topicName)
worker.consumer.handlers[topicName] = handler
}
func (worker *Worker) Start() {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := worker.client.Consume(worker.ctx, worker.topicNames, worker.consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
if worker.ctx.Err() != nil {
return
}
worker.consumer.ready = make(chan bool)
}
}()
<-worker.consumer.ready // Await till the consumer has been set up
log.Printf("[Worker: %s]: start subscribe topics %v", worker.config.GroupName, worker.topicNames)
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-worker.ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")
}
worker.cancel()
wg.Wait()
if err := worker.client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
// Create sarama consumer by custom config
func NewSaramaConsumerWithConfig(config *Config) (sarama.ConsumerGroup, error) {
if config.Debug {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
saramaVersion, err := sarama.ParseKafkaVersion(config.KafkaVersion)
if err != nil {
return nil, err
}
saramaConfig := sarama.NewConfig()
saramaConfig.Version = saramaVersion
saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
client, err := sarama.NewConsumerGroup(config.BrokerEndpoints, config.GroupName, saramaConfig)
if err != nil {
return nil, err
}
return client, nil
}