-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathrtb_consumer.go
245 lines (216 loc) · 7.52 KB
/
rtb_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
//
// This is a sample program to read RTB4FREE event data that is published on Kafka.
// The bid, win, pixel and click events are aggregated and printed every 5 minutues.
// See rtb4free.com for complete information on the OpenRTB bidding platform.
//
package main
import (
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
cluster "github.com/bsm/sarama-cluster"
log "github.com/go-ozzo/ozzo-log"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
//
// Define command line options and flags
//
var (
brokerList = kingpin.Flag("brokerList", "List of brokers to connect").Default("kafka:9092").String()
partition = kingpin.Flag("partition", "Partition number").Default("0").String()
offsetType = kingpin.Flag("offsetType", "Offset Type (OffsetNewest | OffsetOldest)").Default("-1").Int()
messageCountStart = kingpin.Flag("messageCountStart", "Message counter start from:").Int()
// MySQL parameters for accessing campaign manager database
mysqlHost = kingpin.Flag("mysqlHost", "MySQL database server host name.").Default("web_db").String()
mysqlDbname = kingpin.Flag("mysqlDbname", "MySQL database name.").Default("rtb4free").String()
mysqlUser = kingpin.Flag("mysqlUser", "MySQL database user id.").Default("ben").String()
mysqlPassword = kingpin.Flag("mysqlPassword", "MySQL database password.").Default("test").String()
debug = kingpin.Flag("debug", "Output debug messages.").Bool()
)
// CountFields - stores message count and time interval of message topic.
type CountFields struct {
lock *sync.Mutex // Mutex lock to prevent concurrent writes on this counter
count int64 // Count field, increments for each occurrence of the bid, win, pixel, click
intervalTs int64 // Epoch time in milleseconds timestamp for the interval.
intervalTm time.Time // Time object timestamp for the interval.
}
// RecordKey - key values to be used as a map key. Will map to CountFields
// This is the unique identified for the count aggregation - ie, count for each campaign/creative's time interval.
type RecordKey struct {
CampaignID int64
CreativeID int64
IntervalStr string
IntervalTs string
}
// OutputCounts - Map of unique interval keys to counts
// Within each interval, this will be created and the count incremented for the unique recordkey
type OutputCounts map[RecordKey]CountFields
// TIme interval for the aggregated record is every 5 minutes
const intervalStr string = "5m"
const intervalSecs int64 = 300
// RTB Counters - we want to count bids, wins, pixels and clicks
var (
aggBids OutputCounts
aggWins OutputCounts
aggPixels OutputCounts
aggClicks OutputCounts
)
// logger - Create custom logger for each function. Helps debug concurrency.
var logger *log.Logger
func main() {
logger = log.NewLogger()
t1 := log.NewConsoleTarget()
logger.Targets = append(logger.Targets, t1)
logger.Open()
log1 := logger.GetLogger("main") // Set the logger "app" field to this functions.
defer logger.Close()
// Set variables from command line
kingpin.Parse()
// Override if environment variable defined
// We need to override command line if deploying with Docker compose environment variables.
// env variable format: RTBAGG_<uppercase key>, ie RTBAGG_BROKERLIST
if v := getEnvValue("brokerList"); v != "" {
*brokerList = v
}
if v := getEnvValue("partition"); v != "" {
*partition = v
}
if v := getEnvValue("offsetType"); v != "" {
val, err := strconv.Atoi(v)
if err == nil {
*offsetType = val
}
}
if v := getEnvValue("messageCountStart"); v != "" {
val, err := strconv.Atoi(v)
if err == nil {
*messageCountStart = val
}
}
if v := getEnvValue("mysqlHost"); v != "" {
*mysqlHost = v
}
if v := getEnvValue("mysqlDbname"); v != "" {
*mysqlDbname = v
}
if v := getEnvValue("mysqlUser"); v != "" {
*mysqlUser = v
}
if v := getEnvValue("mysqlPassword"); v != "" {
*mysqlPassword = v
}
if v := getEnvValue("debug"); v != "" {
if v == "true" || v == "TRUE" {
*debug = true
} else {
*debug = false
}
}
brokers := strings.Split(*brokerList, ",")
if *debug {
logger.MaxLevel = log.LevelDebug
} else {
logger.MaxLevel = log.LevelInfo
}
log1.Info("Console output level is " + logger.MaxLevel.String())
log1.Info(fmt.Sprintf("Looking for kafka brokers: %s", brokers))
log1.Info(fmt.Sprintf("Read from db host: %s", *mysqlHost))
// Initialize the counter maps
aggBids = OutputCounts{}
aggWins = OutputCounts{}
aggPixels = OutputCounts{}
aggClicks = OutputCounts{}
// Read the MySQL table to get campaign and creative attributes
err := readMySQLTables(*mysqlHost, *mysqlDbname, *mysqlUser, *mysqlPassword)
if err {
log1.Alert("MySQL error on initial read.")
panic("MySQL error on initial read.") // Let docker restart to reread. Need initial db to be set.
}
config := cluster.NewConfig()
config.Group.Mode = cluster.ConsumerModePartitions
// Set up CTL-C to break program
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
//
//
// Channel to catch CTL-C
doneCh := make(chan struct{})
ticker := time.NewTicker(time.Duration(intervalSecs) * time.Second)
//Subscribe to Kafka topics
go getTopic(config, brokers, []string{"bids"})
go getTopic(config, brokers, []string{"wins"})
go getTopic(config, brokers, []string{"pixels"})
go getTopic(config, brokers, []string{"clicks"})
// Wait for CTL-C. Will kill all go getTopic routines
go func() {
log1 := logger.GetLogger("main go")
for {
select {
case <-signals:
log1.Alert("Interrupt detected")
// Drain remaining writes
allkeys := make(map[RecordKey]struct{})
var tsMs int64
setMapKeys(&allkeys, &aggBids, tsMs, true)
setMapKeys(&allkeys, &aggWins, tsMs, true) // cost keys are included in win keys
setMapKeys(&allkeys, &aggPixels, tsMs, true)
setMapKeys(&allkeys, &aggClicks, tsMs, true)
writeAggregatedRecords(&allkeys)
log1.Info("Finished sending remaining writes.")
doneCh <- struct{}{}
case <-ticker.C:
log1.Info(fmt.Sprintf("\nTicker at %s.", time.Now()))
writeLastInterval()
}
}
}()
<-doneCh
log1.Info("End main")
}
//
// Calculate the aggregated records to be printed by examining the recordKeys.
// Then print only those records that have expired.
func writeLastInterval() {
log1 := logger.GetLogger("writeLastInterval")
// Set time value for history
tsStr, tsMs, _ := intervalTimestamp(int64(int64(time.Now().Unix())*1000), intervalSecs)
log1.Info(fmt.Sprintf("Interval Time stamp string: %s, %d", tsStr, tsMs))
allkeys := make(map[RecordKey]struct{})
tsMs -= intervalSecs * 1000 // Get the previous interval
// Get all counters that are ready to print
setMapKeys(&allkeys, &aggBids, tsMs, false)
setMapKeys(&allkeys, &aggWins, tsMs, false)
setMapKeys(&allkeys, &aggPixels, tsMs, false)
setMapKeys(&allkeys, &aggClicks, tsMs, false)
writeAggregatedRecords(&allkeys)
}
//
// Look at the recordKey for counters
//
func setMapKeys(set *map[RecordKey]struct{}, mymaps *OutputCounts, tsMs int64, sendAll bool) {
for k, fields := range *mymaps {
if sendAll || (fields.intervalTs <= tsMs) {
(*set)[k] = struct{}{} // only set if timestamp >tsMs
}
}
return
}
//
// Read the command line variables.
// Check if there is an ENV variable, and override if exists.
// This way we can set options in Docker compose files.
//
func getEnvValue(key string) string {
log1 := logger.GetLogger("getEnvValue")
envkey := "RTBAGG_" + strings.ToUpper(string(key))
if v := os.Getenv(envkey); v != "" {
log1.Info(fmt.Sprintf("Setting %s to env key %s value: %s", key, envkey, v))
return v
}
return ""
}