1
1
package main
2
2
3
3
import (
4
+ "encoding/json"
5
+ "github.com/astaxie/beego/orm"
4
6
_ "github.com/go-sql-driver/mysql" // import your used driver
5
7
"github.com/nsqio/go-nsq"
6
8
"log"
7
- "github.com/astaxie/beego/orm"
8
- "fmt"
9
9
"time"
10
- "strconv"
11
10
)
12
11
13
- func init () {
14
- orm .RegisterDataBase ("default" , "mysql" , "root:@tcp(172.16.50.143:4000)/shtelecom?charset=utf8" , 30 )
15
- orm .Debug = true
12
+ type ActBody struct {
13
+ Openid string `json:"openid"` // 主体内容
14
+ Acttime string `json:"acttime"` // 活动时间
15
+ Type string `json:"type"` // 类型
16
+ Note string `json:"note"` // 备注
17
+ Uuid string `json:"uuid"` // UUID
18
+ }
16
19
20
+ func init () {
21
+ orm .RegisterDataBase ("default" , "mysql" , "mohoo:mohoo@tcp(172.16.50.143:4000)/shtelecom?charset=utf8" , 30 )
22
+ orm .Debug = false
17
23
}
18
24
19
25
func main () {
@@ -26,19 +32,27 @@ func main() {
26
32
}
27
33
// 设置消息处理函数
28
34
consumer .AddHandler (nsq .HandlerFunc (func (message * nsq.Message ) error {
29
- log .Println (string (message .Body ))
30
- i := time .Now ().Unix ()
31
- res , err := o .Raw (" INSERT INTO openid_active (openid, acttime, type, note) VALUES(?, ?, ?, ?); " , string (message .Body ), strconv .FormatInt (i , 10 ), "BIND" , "WX" ).Exec ()
35
+ var actBody ActBody
36
+ errJson := json .Unmarshal (message .Body , & actBody )
37
+ if errJson != nil {
38
+ return err
39
+ }
40
+ _ , err := o .Raw ("INSERT INTO openid_active (openid, acttime, type, note, uuid) VALUES(?, ?, ?, ?, ?); " , actBody .Openid , actBody .Acttime , actBody .Type , actBody .Note , actBody .Uuid ).Exec ()
32
41
if err == nil {
33
- num , _ := res .RowsAffected ()
34
- fmt .Println ("mysql row affected nums: " , num )
42
+ log .Println ("id:" , actBody .Openid , " time:" , actBody .Acttime )
43
+ } else {
44
+ log .Fatal (err .Error ())
45
+ panic (1 )
35
46
}
36
- time .Sleep (3 * time .Millisecond )
47
+ time .Sleep (5 * time .Millisecond )
37
48
return nil
38
49
}))
39
50
// 连接到单例nsqd
40
51
if err := consumer .ConnectToNSQD ("172.16.50.143:4150" ); err != nil {
41
52
log .Fatal (err )
42
53
}
54
+ //if err := consumer.ConnectToNSQD("127.0.0.1:4150"); err != nil {
55
+ // log.Fatal(err)
56
+ //}
43
57
<- consumer .StopChan
44
58
}
0 commit comments