Skip to content

Commit 87710cc

Browse files
authored
Merge pull request #28 from kalbhor/main
refactor: upgrade franz-go and minor lint changes
2 parents 05fce61 + b31e40b commit 87710cc

File tree

4 files changed

+15
-22
lines changed

4 files changed

+15
-22
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/knadh/koanf/providers/posflag v0.1.0
99
github.com/knadh/koanf/v2 v2.0.1
1010
github.com/spf13/pflag v1.0.5
11-
github.com/twmb/franz-go v1.17.0
11+
github.com/twmb/franz-go v1.17.1
1212
github.com/twmb/franz-go/pkg/kmsg v1.8.0
1313
)
1414

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
3232
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
3333
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
3434
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
35-
github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk=
36-
github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
35+
github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ=
36+
github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
3737
github.com/twmb/franz-go/pkg/kadm v1.8.1 h1:SrzL855I7gQTGdMtOYGTHhebs7TPgPN29FPtjusqwlE=
3838
github.com/twmb/franz-go/pkg/kadm v1.8.1/go.mod h1:qUSM7pxoMCU1UNu5H4USE64ODcVmeG9LS96mysv1nu8=
3939
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=

init.go

+1-10
Original file line numberDiff line numberDiff line change
@@ -276,19 +276,10 @@ func initMetricsServer(metrics *metrics.Set, ko *koanf.Koanf) *http.Server {
276276
buf.WriteTo(w)
277277
})
278278

279-
srv := &http.Server{
279+
return &http.Server{
280280
Addr: ko.MustString("app.metrics_server_addr"),
281281
Handler: mux,
282282
ReadTimeout: 10 * time.Second,
283283
WriteTimeout: 10 * time.Second,
284284
}
285-
286-
go func() {
287-
err := srv.ListenAndServe()
288-
if err != nil && err != http.ErrServerClosed {
289-
log.Printf("error starting server: %v", err)
290-
}
291-
}()
292-
293-
return srv
294285
}

main.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"log"
7+
"net/http"
78
"os"
89
"os/signal"
910
"syscall"
@@ -39,13 +40,20 @@ func main() {
3940
log.Fatalf("error initializing filter provider: %v", err)
4041
}
4142

42-
// Initialize metrics.
43-
metr := metrics.NewSet()
44-
4543
// Create a global context with interrupts signals.
4644
globalCtx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
4745
defer cancel()
4846

47+
// Initialize metrics set and start the HTTP server.
48+
metr := metrics.NewSet()
49+
metrSrv := initMetricsServer(metr, ko)
50+
go func() {
51+
if err := metrSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
52+
log.Printf("error starting server: %v", err)
53+
}
54+
}()
55+
defer metrSrv.Shutdown(globalCtx)
56+
4957
// Initialize the source and target Kafka config.
5058
consumerCfgs, prodConfig := initKafkaConfig(ko)
5159

@@ -78,16 +86,10 @@ func main() {
7886
log.Fatalf("error initializing relay controller: %v", err)
7987
}
8088

81-
// Start the metrSrv HTTP server.
82-
metrSrv := initMetricsServer(metr, ko)
83-
8489
// Start the relay. This is an indefinitely blocking call.
8590
if err := relay.Start(globalCtx); err != nil {
8691
log.Fatalf("error starting relay controller: %v", err)
8792
}
8893

89-
if metrSrv != nil {
90-
metrSrv.Shutdown(globalCtx)
91-
}
9294
lo.Info("bye")
9395
}

0 commit comments

Comments
 (0)