-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathnsq_consumer.go
59 lines (54 loc) · 1.59 KB
/
nsq_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main
import (
"encoding/json"
"log"
"time"
"github.com/astaxie/beego/orm"
_ "github.com/go-sql-driver/mysql" // import your used driver
"github.com/nsqio/go-nsq"
)
type ActBody struct {
Openid string `json:"openid"` // 主体内容
Acttime string `json:"acttime"` // 活动时间
Type string `json:"type"` // 类型
Note string `json:"note"` // 备注
Uuid string `json:"uuid"` // UUID
}
func init() {
orm.RegisterDataBase("default", "mysql", "root:root@tcp(127.0.0.1:3306)/shtelecom?charset=utf8", 30)
orm.Debug = false
}
func main() {
o := orm.NewOrm()
o.Using("default") // 默认使用 default,你可以指定为其他数据库
cfg := nsq.NewConfig()
consumer, err := nsq.NewConsumer("shtelecom-active", "mohoo", cfg)
if err != nil {
log.Fatal(err)
}
// 设置消息处理函数
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
var actBody ActBody
errJson := json.Unmarshal(message.Body, &actBody)
if errJson != nil {
return err
}
_, err := o.Raw("INSERT INTO openid_active (openid, acttime, type, note, uuid) VALUES(?, ?, ?, ?, ?); ", actBody.Openid, actBody.Acttime, actBody.Type, actBody.Note, actBody.Uuid).Exec()
if err == nil {
log.Println("id:", actBody.Openid, " time:", actBody.Acttime)
} else {
log.Fatal(err.Error())
panic(1)
}
time.Sleep(5 * time.Millisecond)
return nil
}))
// 连接到单例nsqd
if err := consumer.ConnectToNSQD("172.16.50.143:4150"); err != nil {
log.Fatal(err)
}
//if err := consumer.ConnectToNSQD("127.0.0.1:4150"); err != nil {
// log.Fatal(err)
//}
<-consumer.StopChan
}