-
Notifications
You must be signed in to change notification settings - Fork 106
/
Copy pathinputnsq.go
135 lines (123 loc) · 3.54 KB
/
inputnsq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package nsq
import (
"context"
"errors"
"github.com/tsaikd/KDGoLib/version"
"github.com/nsqio/go-nsq"
"github.com/tsaikd/gogstash/config"
"github.com/tsaikd/gogstash/config/goglog"
"github.com/tsaikd/gogstash/config/logevent"
)
// ModuleName is the name used in config file
const ModuleName = "nsq"
// InputConfig holds the configuration json fields and internal objects
type InputConfig struct {
config.InputConfig
NSQ string `json:"nsq" yaml:"nsq"` // NSQd to connect to
Lookupd string `json:"lookupd" yaml:"lookupd"` // lookupd to connect to, can be a NSQd or lookupd
Topic string `json:"topic" yaml:"topic"` // topic to listen from
Channel string `json:"channel" yaml:"channel"` // channel to subscribe to
InFlight uint `json:"max_inflight" yaml:"max_inflight"` // max number of messages inflight
control config.Control // backpressure control
}
// DefaultInputConfig returns an InputConfig struct with default values
func DefaultInputConfig() InputConfig {
return InputConfig{
InputConfig: config.InputConfig{
CommonConfig: config.CommonConfig{
Type: ModuleName,
},
},
InFlight: 75,
}
}
// InitHandler initialize the input plugin
func InitHandler(
ctx context.Context,
raw config.ConfigRaw,
control config.Control,
) (config.TypeInputConfig, error) {
conf := DefaultInputConfig()
conf.control = control
err := config.ReflectConfig(raw, &conf)
if err != nil {
return nil, err
}
if conf.Lookupd == "" && conf.NSQ == "" {
return nil, errors.New("nsq: you need to specify nsq or lookupd")
}
if conf.Topic == "" {
return nil, errors.New("nsq: missing topic")
}
if conf.Channel == "" {
return nil, errors.New("nsq: missing channel")
}
conf.Codec, err = config.GetCodecOrDefault(ctx, raw["codec"])
if err != nil {
return nil, err
}
return &conf, err
}
// Start wraps the actual function starting the plugin
func (t *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEvent) error {
// setup the handler
handler := nsqhandler{
msgChan: msgChan,
ctx: ctx,
i: t,
}
// setup the consumer
conf := nsq.NewConfig()
conf.MaxInFlight = int(t.InFlight)
conf.UserAgent = "gogstash/" + version.VERSION
consumer, err := nsq.NewConsumer(t.Topic, t.Channel, conf)
if err != nil {
goglog.Logger.Errorf("nsq: %s", err.Error())
return err
}
consumer.AddHandler(&handler)
if len(t.NSQ) > 0 {
if err := consumer.ConnectToNSQD(t.NSQ); err != nil {
return err
}
}
if len(t.Lookupd) > 0 {
if err := consumer.ConnectToNSQLookupd(t.Lookupd); err != nil {
return err
}
}
// wait for stop signal and exit
outer_loop:
for {
select {
case <-ctx.Done():
break outer_loop
case <-t.control.PauseSignal():
goglog.Logger.Info("nsq: received pause")
consumer.ChangeMaxInFlight(0)
case <-t.control.ResumeSignal():
goglog.Logger.Info("nsq: received resume")
consumer.ChangeMaxInFlight(int(t.InFlight))
}
}
consumer.Stop()
<-consumer.StopChan
goglog.Logger.Info("nsq stopped")
return err
}
// nsqhandler implements a handler to receive messages
type nsqhandler struct {
msgChan chan<- logevent.LogEvent
ctx context.Context
i *InputConfig
}
// HandleMessage receives a message from NSQ
func (h *nsqhandler) HandleMessage(m *nsq.Message) error {
ok, err := h.i.Codec.Decode(h.ctx, m.Body, nil, []string{}, h.msgChan)
if !ok {
goglog.Logger.Errorf("nsq: nok ok, error %s", err.Error())
} else if err != nil {
goglog.Logger.Errorf("nsq: %s", err.Error())
}
return err
}