Skip to content

Commit 05fce61

Browse files
authored
Merge pull request #27 from kalbhor/main
feat: add error metrics to target and source
2 parents 5fac087 + 9167e66 commit 05fce61

File tree

4 files changed

+30
-8
lines changed

4 files changed

+30
-8
lines changed

internal/relay/common.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,17 @@ import (
2020
"github.com/twmb/franz-go/pkg/sasl/scram"
2121
)
2222

23+
const relayMetricPrefix = "kafka_relay_"
24+
2325
var (
24-
RelayMetric = "kafka_relay_msg_count{source=\"%s\", destination=\"%s\", partition=\"%d\"}"
26+
SrcNetworkErrMetric = relayMetricPrefix + "source_errors_total{source=\"%d\", error=\"%s\"}"
27+
SrcsUnhealthyMetric = relayMetricPrefix + "sources_unhealthy_total"
28+
SrcKafkaErrMetric = relayMetricPrefix + "source_kafka_errors_total{source=\"%d\", error=\"%s\"}"
29+
SrcHealthMetric = relayMetricPrefix + "source_highwatermark{source=\"%d\"}"
30+
31+
TargetNetworkErrMetric = relayMetricPrefix + "source_errors_total{source=\"%d\", error=\"%s\"}"
32+
TargetKafkaErrMetric = relayMetricPrefix + "target_kafka_errors_total{error=\"%s\"}"
33+
RelayedMsgsMetric = relayMetricPrefix + "msgs_total{source=\"%s\", destination=\"%s\", partition=\"%d\"}"
2534

2635
ErrLaggingBehind = fmt.Errorf("topic end offset is lagging behind")
2736
)

internal/relay/source_pool.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync"
1010
"time"
1111

12+
"github.com/VictoriaMetrics/metrics"
1213
"github.com/twmb/franz-go/pkg/kadm"
1314
"github.com/twmb/franz-go/pkg/kgo"
1415
)
@@ -48,10 +49,11 @@ type Server struct {
4849

4950
// SourcePool manages the source Kafka instances and consumption.
5051
type SourcePool struct {
51-
cfg SourcePoolCfg
52-
client *kgo.Client
53-
log *slog.Logger
54-
topics []string
52+
cfg SourcePoolCfg
53+
client *kgo.Client
54+
log *slog.Logger
55+
metrics *metrics.Set
56+
topics []string
5557

5658
offsets map[string]map[int32]kgo.Offset
5759

@@ -82,7 +84,7 @@ var (
8284

8385
// NewSourcePool returns a controller instance that manages the lifecycle of a pool of N source (consumer)
8486
// servers. The pool always attempts to find one healthy node for the relay to consume from.
85-
func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topics, log *slog.Logger) (*SourcePool, error) {
87+
func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topics, m *metrics.Set, log *slog.Logger) (*SourcePool, error) {
8688
servers := make([]Server, 0, len(serverCfgs))
8789

8890
// Initially mark all servers as unhealthy.
@@ -105,6 +107,7 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics Topi
105107
topics: topicNames,
106108
servers: servers,
107109
log: log,
110+
metrics: m,
108111
backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax),
109112
}, nil
110113
}
@@ -154,6 +157,7 @@ loop:
154157
conn, err := sp.newConn(globalCtx, s)
155158
if err != nil {
156159
retries++
160+
sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcNetworkErrMetric, s.ID, "new connection failed")).Inc()
157161
sp.log.Error("new source connection failed", "id", s.ID, "broker", s.Config.BootstrapBrokers, "error", err, "retries", retries)
158162
waitTries(globalCtx, sp.backoffFn(retries))
159163
continue loop
@@ -170,6 +174,7 @@ loop:
170174
}
171175

172176
retries++
177+
sp.metrics.GetOrCreateCounter(SrcsUnhealthyMetric).Inc()
173178
sp.log.Error("no healthy server found. waiting and retrying", "retries", retries)
174179
waitTries(globalCtx, sp.backoffFn(retries))
175180
}
@@ -183,6 +188,7 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {
183188

184189
// There's no connection.
185190
if fetches.IsClientClosed() {
191+
sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcKafkaErrMetric, s.ID, "client closed")).Inc()
186192
sp.log.Debug("retrieving fetches failed. client closed.", "id", s.ID, "broker", s.Config.BootstrapBrokers)
187193
sp.setWeight(s.ID, unhealthyWeight)
188194

@@ -191,6 +197,7 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {
191197

192198
// If there are errors in the fetches, handle them.
193199
for _, err := range fetches.Errors() {
200+
sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcKafkaErrMetric, s.ID, "fetches error")).Inc()
194201
sp.log.Error("found error in fetches", "server", s.ID, "error", err.Err)
195202
sp.setWeight(s.ID, unhealthyWeight)
196203

@@ -513,6 +520,7 @@ func (sp *SourcePool) setWeight(id int, weight int64) {
513520
sp.curCandidate = s
514521
}
515522

523+
sp.metrics.GetOrCreateCounter(fmt.Sprintf(SrcHealthMetric, id)).Set(uint64(weight))
516524
sp.log.Debug("setting candidate weight", "id", id, "weight", weight, "curr", sp.curCandidate)
517525
sp.servers[id] = s
518526
break

internal/relay/target.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ func (tg *Target) initProducer(top Topics) (*kgo.Client, error) {
164164
} else {
165165
tlsOpt, err := getTLSConfig(tg.pCfg.CACertPath, tg.pCfg.ClientCertPath, tg.pCfg.ClientKeyPath)
166166
if err != nil {
167+
tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetKafkaErrMetric, "tls config error")).Inc()
167168
return nil, err
168169
}
169170

@@ -188,6 +189,7 @@ outerLoop:
188189
default:
189190
cl, err = kgo.NewClient(opts...)
190191
if err != nil {
192+
tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetNetworkErrMetric, "error creating producer client")).Inc()
191193
tg.log.Error("error creating producer client", "error", err)
192194
retries++
193195
waitTries(tg.ctx, backoff(retries))
@@ -208,6 +210,7 @@ outerLoop:
208210

209211
// Test connectivity and ensure destination topics exists.
210212
if err := testConnection(cl, tg.pCfg.SessionTimeout, topics, partitions); err != nil {
213+
tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetNetworkErrMetric, "error connecting to producer")).Inc()
211214
tg.log.Error("error connecting to producer", "err", err)
212215
retries++
213216
waitTries(tg.ctx, backoff(retries))
@@ -295,13 +298,14 @@ retry:
295298
destTopic = tg.targetTopics[res.Record.Topic]
296299
part = res.Record.Partition
297300
)
298-
tg.metrics.GetOrCreateCounter(fmt.Sprintf(RelayMetric, srcTopic, destTopic, part)).Inc()
301+
tg.metrics.GetOrCreateCounter(fmt.Sprintf(RelayedMsgsMetric, srcTopic, destTopic, part)).Inc()
299302
}
300303

301304
tg.log.Debug("produced last offset", "offset", results[len(results)-1].Record.Offset, "batch", batchLen, "retry", retries)
302305

303306
// retry if there is an error
304307
if err != nil {
308+
tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetKafkaErrMetric, "error producing message")).Inc()
305309
tg.log.Error("error producing message", "err", err, "failed_count", batchLen, "retry", retries)
306310

307311
bufRecs := tg.client.BufferedProduceRecords()
@@ -333,6 +337,7 @@ retry:
333337
}
334338

335339
if !sent {
340+
tg.metrics.GetOrCreateCounter(fmt.Sprintf(TargetKafkaErrMetric, "error producing message after retries")).Inc()
336341
return fmt.Errorf("error producing message; exhausted retries (%v)", tg.pCfg.MaxRetries)
337342
}
338343

main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func main() {
6464
}
6565

6666
// Initialize the source Kafka (consumer) relay.
67-
srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, lo)
67+
srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, metr, lo)
6868
if err != nil {
6969
log.Fatalf("error initializing source pool controller: %v", err)
7070
}

0 commit comments

Comments
 (0)