Skip to content

Commit fde14f8

Browse files
oneslashSardorbek Pulatov
and
Sardorbek Pulatov
authored
some cleanup and use waitingGroups for the graceful shutdown (#1)
* some cleanup and use waitingGroups for the graceful shutdown * remove listener which is not used * fix deferring wg * wg.Add(1) - cr fix Co-authored-by: Sardorbek Pulatov <[email protected]>
1 parent 56b9a9c commit fde14f8

File tree

1 file changed

+11
-43
lines changed

1 file changed

+11
-43
lines changed

sqs2http.go

+11-43
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,16 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8+
"net/http"
9+
"sync"
10+
811
"github.com/chaseisabelle/backoff/expbo"
912
"github.com/chaseisabelle/flagz"
1013
"github.com/chaseisabelle/sqsc"
1114
"github.com/chaseisabelle/stop"
1215
"github.com/g3n/engine/util/logger"
13-
"net/http"
1416
)
1517

16-
// listening for workers to shutdown
17-
var listener chan struct{}
18-
19-
// func is called before main()
20-
func init() {
21-
listener = make(chan struct{})
22-
}
23-
2418
// main process
2519
func main() {
2620
// init the configs for everything
@@ -42,9 +36,7 @@ func main() {
4236
boMax := flag.Int("backoff-max", 10, "max sleep time for the exponential backoff")
4337

4438
var flags flagz.Flagz
45-
4639
flag.Var(&flags, "requeue", "the http status code timeout requeue a message for")
47-
4840
flag.Parse()
4941

5042
if *verbose {
@@ -77,7 +69,6 @@ func main() {
7769

7870
if *workers < 1 {
7971
err := errors.New("need at least 1 worker")
80-
8172
die("failed to init workers", err)
8273
}
8374

@@ -106,13 +97,16 @@ func main() {
10697
// listen for kill/term/stop signals from user/os
10798
stop.Listen()
10899

100+
var wg sync.WaitGroup
101+
109102
// create the workers
110103
for tmp > 0 {
111104
// spawn a goroutine for each worker
112105
go func() {
113-
empties := uint64(0) //<< keeps track of number of subsequent empty replies from queue
114-
bo, err := expbo.New(uint64(1000), uint64(*boMax) * 1000, 2) //<< exponential backoff
115-
106+
wg.Add(1)
107+
defer wg.Done()
108+
empties := uint64(0) //<< keeps track of number of subsequent empty replies from queue
109+
bo, err := expbo.New(uint64(1000), uint64(*boMax)*1000, 2) //<< exponential backoff
116110
if err != nil {
117111
die("failed to init backoff", err)
118112
}
@@ -122,16 +116,13 @@ func main() {
122116
// check if user/os has stopped the program and exit gracefully
123117
if stop.Stopped() {
124118
debug("graceful exit", "worker")
125-
126119
break
127120
}
128121

129122
// attempt to consume a message from the queue
130123
bod, rh, err := sqs.Consume()
131-
132124
if err != nil {
133125
fail("consumer failure", err)
134-
135126
continue
136127
}
137128

@@ -142,7 +133,6 @@ func main() {
142133
// back off if necessary
143134
if *boAfter != 0 && empties >= uint64(*boAfter) {
144135
debug("sleeping", bo)
145-
146136
bo.Backoff()
147137
}
148138

@@ -160,72 +150,51 @@ func main() {
160150

161151
// create new http request
162152
req, err := http.NewRequest(*method, *to, bytes.NewBuffer([]byte(bod)))
163-
164153
if err != nil {
165154
fail("http request failure", err)
166-
167155
continue
168156
}
169157

170158
// execute http request
171159
res, err := cli.Do(req)
172-
173160
if err != nil {
174161
fail("http query failure", err)
175-
176162
continue
177163
}
178164

179165
// check the http response
180166
if res == nil {
181167
fail("http response failure", errors.New("received nil response"))
182-
183168
continue
184169
}
185170

186171
err = res.Body.Close()
187-
188172
if err != nil {
189173
fail("failed to close http response body", err)
190174
}
191175

192176
sc := res.StatusCode
193-
194177
debug("received http status code", sc)
195178

196179
// if the http code is in the requeue codes the requeue the message
197180
if has(sc, statuses) {
198181
info("requeue due to http response code", sc)
199-
200182
continue
201183
}
202184

203185
// elsewise delete the message from the queue
204186
_, err = sqs.Delete(rh)
205-
206187
if err != nil {
207188
fail("sqs delete failure", err)
208189
}
209190
}
210-
211-
// if we have broken out of the forever loop then we need to exit gracefully
212-
listener <- struct{}{}
213191
}()
214192

215193
tmp--
216194
}
217195

218-
tmp = 0
219-
220-
// listen for workers to exit gracefully
221-
for range listener {
222-
tmp++
223-
224-
if tmp >= *workers {
225-
break
226-
}
227-
}
228-
196+
// wait for the workers to finish
197+
wg.Wait()
229198
info("graceful exit", "bye bye")
230199
}
231200

@@ -251,6 +220,5 @@ func has(num int, arr []int) bool {
251220
return true
252221
}
253222
}
254-
255223
return false
256224
}

0 commit comments

Comments
 (0)