@@ -3,6 +3,7 @@ package relay
3
3
import (
4
4
"context"
5
5
"log/slog"
6
+ "strconv"
6
7
"sync"
7
8
"time"
8
9
@@ -22,7 +23,7 @@ type Relay struct {
22
23
target * Target
23
24
log * slog.Logger
24
25
25
- topics Topics
26
+ topic Topic
26
27
27
28
// signalCh is used to signal when the relay poll loop should look for a new healthy server.
28
29
signalCh chan struct {}
@@ -33,21 +34,21 @@ type Relay struct {
33
34
targetOffsets TopicOffsets
34
35
35
36
// Live topic offsets from source.
36
- srcOffsets map [string ] map [ int32 ]int64
37
+ srcOffsets map [int32 ]int64
37
38
38
39
// list of filter implementations for skipping messages
39
40
filters map [string ]filter.Provider
40
41
}
41
42
42
- func NewRelay (cfg RelayCfg , src * SourcePool , target * Target , topics Topics , filters map [string ]filter.Provider , log * slog.Logger ) (* Relay , error ) {
43
+ func NewRelay (cfg RelayCfg , src * SourcePool , target * Target , topic Topic , filters map [string ]filter.Provider , log * slog.Logger ) (* Relay , error ) {
43
44
// If stop-at-end is set, fetch and cache the offsets to determine
44
45
// when end is reached.
45
46
var offsets TopicOffsets
46
47
if cfg .StopAtEnd {
47
48
if o , err := target .GetHighWatermark (); err != nil {
48
49
return nil , err
49
50
} else {
50
- offsets = o .KOffsets ()
51
+ offsets = o .KOffsets ()[ topic . TargetTopic ]
51
52
}
52
53
}
53
54
@@ -57,10 +58,10 @@ func NewRelay(cfg RelayCfg, src *SourcePool, target *Target, topics Topics, filt
57
58
target : target ,
58
59
log : log ,
59
60
60
- topics : topics ,
61
+ topic : topic ,
61
62
signalCh : make (chan struct {}, 1 ),
62
63
63
- srcOffsets : make (map [string ] map [ int32 ]int64 ),
64
+ srcOffsets : make (map [int32 ]int64 ),
64
65
targetOffsets : offsets ,
65
66
filters : filters ,
66
67
}
@@ -116,9 +117,6 @@ func (re *Relay) Start(globalCtx context.Context) error {
116
117
defer wg .Done ()
117
118
// Wait till main ctx is cancelled.
118
119
<- ctx .Done ()
119
-
120
- // Stop consumer group.
121
- re .source .Close ()
122
120
}()
123
121
124
122
// Start the indefinite poll that asks for new connections
@@ -195,7 +193,12 @@ loop:
195
193
continue loop
196
194
}
197
195
198
- re .cacheSrcOffsets (of )
196
+ srcOffsets := make (map [int32 ]int64 )
197
+ of .Each (func (lo kadm.ListedOffset ) {
198
+ srcOffsets [lo .Partition ] = lo .Offset
199
+ })
200
+
201
+ re .cacheSrcOffsets (srcOffsets )
199
202
firstPoll = false
200
203
}
201
204
@@ -237,7 +240,6 @@ loop:
237
240
}
238
241
239
242
re .log .Debug ("processed fetches" )
240
- server .Client .AllowRebalance ()
241
243
}
242
244
}
243
245
}
@@ -246,7 +248,7 @@ loop:
246
248
func (re * Relay ) processMessage (ctx context.Context , rec * kgo.Record ) error {
247
249
// Decrement the end offsets for the given topic and partition till we reach 0
248
250
if re .cfg .StopAtEnd {
249
- re .decrementSourceOffset (rec .Topic , rec . Partition )
251
+ re .decrementSourceOffset (rec .Partition )
250
252
}
251
253
252
254
// If there are filters, run the message through them to decide whether
@@ -266,17 +268,17 @@ func (re *Relay) processMessage(ctx context.Context, rec *kgo.Record) error {
266
268
}
267
269
}
268
270
269
- t , ok := re .topics [rec .Topic ]
270
- if ! ok {
271
- return nil
272
- }
273
-
271
+ // Add the src message time as a meta header to the target.
272
+ // The target consumer can check the lag between the src and target message time if required.
274
273
// Repurpose &kgo.Record and forward it to producer to reduce allocs.
275
- rec .Headers = nil
274
+ rec .Headers = append (rec .Headers , kgo.RecordHeader {
275
+ Key : "_t" ,
276
+ Value : nsToBytes (rec .Timestamp .UnixNano ()),
277
+ })
276
278
rec .Timestamp = time.Time {}
277
- rec .Topic = t .TargetTopic
278
- if ! t .AutoTargetPartition {
279
- rec .Partition = int32 (t .TargetPartition )
279
+ rec .Topic = re . topic .TargetTopic
280
+ if ! re . topic .AutoTargetPartition {
281
+ rec .Partition = int32 (re . topic .TargetPartition )
280
282
}
281
283
rec .Attrs = kgo.RecordAttrs {}
282
284
rec .ProducerEpoch = 0
@@ -299,47 +301,35 @@ func (re *Relay) processMessage(ctx context.Context, rec *kgo.Record) error {
299
301
}
300
302
301
303
// decrementSourceOffset decrements the offset count for the given topic and partition in the source offsets map.
302
- func (re * Relay ) decrementSourceOffset (topic string , partition int32 ) {
303
- if o , ok := re .srcOffsets [topic ]; ok {
304
- if offset , found := o [partition ]; found && offset > 0 {
305
- o [partition ]--
306
- re .srcOffsets [topic ] = o
307
- }
304
+ func (re * Relay ) decrementSourceOffset (partition int32 ) {
305
+ if offset , found := re .srcOffsets [partition ]; found && offset > 0 {
306
+ re .srcOffsets [partition ] -= 1
308
307
}
309
308
}
310
309
311
310
// cacheSrcOffsets sets the end offsets of the consumer during bootup to exit on consuming everything.
312
- func (re * Relay ) cacheSrcOffsets (of kadm.ListedOffsets ) {
313
- of .Each (func (lo kadm.ListedOffset ) {
314
- ct , ok := re .srcOffsets [lo .Topic ]
315
- if ! ok {
316
- ct = make (map [int32 ]int64 )
317
- }
318
- ct [lo .Partition ] = lo .Offset
319
- re .srcOffsets [lo .Topic ] = ct
320
- })
321
-
311
+ func (re * Relay ) cacheSrcOffsets (of map [int32 ]int64 ) {
312
+ re .srcOffsets = of
322
313
// Read till the destination offsets and reduce it from the target weight.
323
- for t , po := range re .targetOffsets {
324
- for p , o := range po {
325
- if ct , ok := re .srcOffsets [t ]; ok {
326
- if _ , found := ct [p ]; found {
327
- re.srcOffsets [t ][p ] -= o .EpochOffset ().Offset
328
- }
329
- }
314
+ for p , o := range re .targetOffsets {
315
+ if _ , ok := re .srcOffsets [p ]; ok {
316
+ re .srcOffsets [p ] -= o .EpochOffset ().Offset
330
317
}
331
318
}
332
319
}
333
320
334
321
// hasReachedEnd reports if there is any pending messages in given topic-partition.
335
322
func (re * Relay ) hasReachedEnd () bool {
336
- for _ , p := range re .srcOffsets {
337
- for _ , o := range p {
338
- if o > 0 {
339
- return false
340
- }
323
+ for _ , o := range re .srcOffsets {
324
+ if o > 0 {
325
+ return false
341
326
}
342
327
}
343
-
344
328
return true
345
329
}
330
+
331
+ func nsToBytes (ns int64 ) []byte {
332
+ // Preallocate a buffer for the byte slice
333
+ buf := make ([]byte , 0 , 10 ) // 10 is enough for most integers
334
+ return strconv .AppendInt (buf , ns , 10 )
335
+ }
0 commit comments