Skip to content

Commit 9ef82a8

Browse files
authored
fix: record offset for src topic instead of target (#36)
1 parent 4273fc4 commit 9ef82a8

File tree

2 files changed

+19
-5
lines changed

2 files changed

+19
-5
lines changed

internal/relay/relay.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,10 @@ loop:
225225
// Always record the latest offsets before the messages are processed for new connections and
226226
// retries to consume from where it was left off.
227227
// TODO: What if the next step fails? The messages won't be read again?
228-
re.source.RecordOffsets(rec)
228+
if err := re.source.RecordOffsets(rec); err != nil {
229+
re.log.Error("error recording offset", "err", err)
230+
return err
231+
}
229232

230233
if err := re.processMessage(ctx, rec); err != nil {
231234
re.log.Error("error processing message", "err", err)

internal/relay/source_pool.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ type SourcePool struct {
5454
log *slog.Logger
5555
metrics *metrics.Set
5656
targetToSrc map[string]string
57+
srcToTarget map[string]string
5758
srcTopics []string
5859

5960
// targetOffsets is initialized with current topic high watermarks from target.
@@ -103,17 +104,20 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerCfg, topics Topics, t
103104
}
104105

105106
var (
107+
srcToTarg = make(map[string]string, len(topics))
106108
targToSrc = make(map[string]string, len(topics))
107109
srcTopics = make([]string, 0, len(topics))
108110
)
109111
for src, targ := range topics {
110112
srcTopics = append(srcTopics, src)
111113
targToSrc[targ.TargetTopic] = src
114+
srcToTarg[src] = targ.TargetTopic
112115
}
113116

114117
sp := &SourcePool{
115118
cfg: cfg,
116119
targetToSrc: targToSrc,
120+
srcToTarget: srcToTarg,
117121
srcTopics: srcTopics,
118122
servers: servers,
119123
log: log,
@@ -223,21 +227,28 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {
223227

224228
// RecordOffsets records the offsets of the latest fetched records per topic.
225229
// This is used to resume consumption on new connections/reconnections from the source during runtime.
226-
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) {
230+
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) error {
227231
if sp.targetOffsets == nil {
228232
sp.targetOffsets = make(TopicOffsets)
229233
}
230234

231-
if o, ok := sp.targetOffsets[rec.Topic]; ok {
235+
topic, ok := sp.srcToTarget[rec.Topic]
236+
if !ok {
237+
return fmt.Errorf("target topic not found for src topic %s", rec.Topic)
238+
}
239+
240+
if o, ok := sp.targetOffsets[topic]; ok {
232241
// If the topic already exists, update the offset for the partition.
233242
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
234-
sp.targetOffsets[rec.Topic] = o
243+
sp.targetOffsets[topic] = o
235244
} else {
236245
// If the topic does not exist, create a new map for the topic.
237246
o := make(map[int32]kgo.Offset)
238247
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
239-
sp.targetOffsets[rec.Topic] = o
248+
sp.targetOffsets[topic] = o
240249
}
250+
251+
return nil
241252
}
242253

243254
func (sp *SourcePool) GetHighWatermark(ctx context.Context, cl *kgo.Client) (kadm.ListedOffsets, error) {

0 commit comments

Comments
 (0)