Skip to content

Commit cd0af0e

Browse files
nurzhan-saktaganovKaymeKaydex
authored andcommitted
Router.Route: handle outdated *Replicaset object (resolve #11)
- Router.Call bugfix: set destinationName := rs.info.Name if destination exists - Router.Route bugfix: handle outdated *Replicaset object - make comments more go-style - Router.cronDiscovery: log panic in another goroutine - copy maps using copyMap generic
1 parent 3ecf433 commit cd0af0e

File tree

5 files changed

+67
-43
lines changed

5 files changed

+67
-43
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
## Unreleased
22

3+
BUG FIXES:
4+
* Router.Call bugfix: set destinationName := rs.info.Name if destination exists.
5+
* Router.Route bugfix: handle outdated *Replicaset object (resolve issue #11).
6+
37
FEATURES:
48
* Now when calling RemoveInstance, if an empty replicaset name is passed, the replicaset will be calculated automatically.
59

610
CHANGES:
711
* Bump go-tarantool from v2.2.1 to v2.3.0.
12+
* Make comments more go-style.
13+
* Router.cronDiscovery: log panic in another goroutine.
814

915
TESTS:
1016
* Fixed etcd overlapping ports.

api.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -311,35 +311,43 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
311311
// We reproduce here behavior in https://github.com/tarantool/vshard/blob/b6fdbe950a2e4557f05b83bd8b846b126ec3724e/vshard/router/init.lua#L663
312312
r.BucketReset(bucketID)
313313

314-
if destination := vshardError.Destination; destination != "" {
314+
destination := vshardError.Destination
315+
if destination != "" {
315316
var loggedOnce bool
316317
for {
317318
nameToReplicasetRef := r.getNameToReplicaset()
318319

319-
_, destinationExists := nameToReplicasetRef[destination]
320+
// In some cases destination contains UUID (prior to tnt 3.x), in some cases it contains replicaset name.
321+
// So, at this point we don't know what destination is: a name or an UUID.
322+
// But we need a name to access values in nameToReplicasetRef map, so let's find it out.
323+
var destinationName string
320324

321-
if !destinationExists {
325+
_, destinationExists := nameToReplicasetRef[destination]
326+
if destinationExists {
327+
destinationName = destination
328+
} else {
322329
// for older logic with uuid we must support backward compatibility
323330
// if destination is uuid and not name, lets find it too
324-
for _, rsRef := range nameToReplicasetRef {
325-
if rsRef.info.UUID.String() == destination {
331+
for rsName, rs := range nameToReplicasetRef {
332+
if rs.info.UUID.String() == destination {
326333
destinationExists = true
334+
destinationName = rsName
327335
break
328336
}
329337
}
330338
}
331339

332340
if destinationExists {
333-
_, err := r.BucketSet(bucketID, destination)
341+
_, err := r.BucketSet(bucketID, destinationName)
334342
if err == nil {
335343
break // breaks loop
336344
}
337-
r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destination, err)
345+
r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationName, err)
338346
}
339347

340348
if !loggedOnce {
341349
r.log().Warnf(ctx, "Replicaset '%v' was not found, but received from storage as destination - please "+
342-
"update configuration", destination)
350+
"update configuration", destinationName)
343351
loggedOnce = true
344352
}
345353

@@ -662,11 +670,5 @@ func (r *Router) RouteAll() map[string]*Replicaset {
662670
nameToReplicasetRef := r.getNameToReplicaset()
663671

664672
// Do not expose the original map to prevent unauthorized modification.
665-
nameToReplicasetCopy := make(map[string]*Replicaset)
666-
667-
for k, v := range nameToReplicasetRef {
668-
nameToReplicasetCopy[k] = v
669-
}
670-
671-
return nameToReplicasetCopy
673+
return copyMap(nameToReplicasetRef)
672674
}

discovery.go

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,20 @@ func (r *Router) Route(ctx context.Context, bucketID uint64) (*Replicaset, error
5555

5656
rs := routeMap[bucketID].Load()
5757
if rs != nil {
58-
return rs, nil
58+
nameToReplicasetRef := r.getNameToReplicaset()
59+
60+
actualRs := nameToReplicasetRef[rs.info.Name]
61+
switch {
62+
case actualRs == nil:
63+
// rs is outdated, can't use it -- let's discover bucket again
64+
r.BucketReset(bucketID)
65+
case actualRs == rs:
66+
return rs, nil
67+
default: // actualRs != rs
68+
// update rs -> actualRs for this bucket
69+
_, _ = r.BucketSet(bucketID, actualRs.info.Name)
70+
return actualRs, nil
71+
}
5972
}
6073

6174
// it`s ok if in the same time we have few active searches
@@ -106,14 +119,9 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
106119
return rs, nil
107120
}
108121

109-
/*
110-
-- All replicasets were scanned, but a bucket was not
111-
-- found anywhere, so most likely it does not exist. It
112-
-- can be wrong, if rebalancing is in progress, and a
113-
-- bucket was found to be RECEIVING on one replicaset, and
114-
-- was not found on other replicasets (it was sent during
115-
-- discovery).
116-
*/
122+
// All replicasets were scanned, but a bucket was not found anywhere, so most likely it does not exist.
123+
// It can be wrong, if rebalancing is in progress, and a bucket was found to be RECEIVING on one replicaset,
124+
// and was not found on other replicasets (it was sent during discovery).
117125

118126
return nil, newVShardErrorNoRouteToBucket(bucketID)
119127
}
@@ -192,6 +200,8 @@ func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buc
192200
continue
193201
}
194202

203+
// NOTE: oldRs and rs might have the same name, we intentionally don't check this case to keep the logic simple
204+
195205
// We don't check oldRs for nil here, because it's a valid key too (if rs == nil, it means removed from unknown buckets set)
196206
removedFrom[oldRs]++
197207
}
@@ -294,9 +304,15 @@ func (r *Router) cronDiscovery(ctx context.Context) {
294304
// Another one panic may happen due to log function below (e.g. bug in log().Errorf), in this case we have two options:
295305
// 1. recover again and log nothing: panic will be muted and lost
296306
// 2. don't try to recover, we hope that the second panic will be logged somehow by go runtime
297-
// So, we choose the second behavior
298-
r.log().Errorf(ctx, "[DISCOVERY] something unexpected has happened in cronDiscovery(%d): panic %v, stackstrace: %s",
299-
iterationCount, recovered, string(debug.Stack()))
307+
// So, we desided to combine them in the third behavior: log in another goroutin
308+
iterationCount := iterationCount
309+
// get stacktrace in the current goroutine
310+
debugStack := string(debug.Stack())
311+
312+
go func() {
313+
r.log().Errorf(ctx, "[DISCOVERY] something unexpected has happened in cronDiscovery(%d): panic %v, stacktrace: %s",
314+
iterationCount, recovered, debugStack)
315+
}()
300316
}
301317
}()
302318

topology.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ type TopologyController interface {
2525
AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error
2626
}
2727

28+
func copyMap[K comparable, V any](m map[K]V) map[K]V {
29+
copy := make(map[K]V)
30+
for k, v := range m {
31+
copy[k] = v
32+
}
33+
return copy
34+
}
35+
2836
func (r *Router) getNameToReplicaset() map[string]*Replicaset {
2937
r.nameToReplicasetMutex.RLock()
3038
nameToReplicasetRef := r.nameToReplicaset
@@ -119,10 +127,6 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta
119127
return ErrReplicasetExists
120128
}
121129

122-
replicaset := &Replicaset{
123-
info: rsInfo,
124-
}
125-
126130
rsInstances := make([]pool.Instance, 0, len(instances))
127131
for _, instance := range instances {
128132
rsInstances = append(rsInstances, pool.Instance{
@@ -158,13 +162,13 @@ func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, insta
158162
r.log().Errorf(ctx, "got connected now as false to pool.RW")
159163
}
160164

161-
replicaset.conn = conn
165+
replicaset := &Replicaset{
166+
info: rsInfo,
167+
conn: conn,
168+
}
162169

163170
// Create an entirely new map object
164-
nameToReplicasetNew := make(map[string]*Replicaset)
165-
for k, v := range nameToReplicasetOld {
166-
nameToReplicasetNew[k] = v
167-
}
171+
nameToReplicasetNew := copyMap(nameToReplicasetOld)
168172
nameToReplicasetNew[rsInfo.Name] = replicaset // add when conn is ready
169173

170174
// We could detect concurrent access to the TopologyController interface
@@ -202,10 +206,7 @@ func (r *Router) RemoveReplicaset(ctx context.Context, rsName string) []error {
202206
}
203207

204208
// Create an entirely new map object
205-
nameToReplicasetNew := make(map[string]*Replicaset)
206-
for k, v := range nameToReplicasetOld {
207-
nameToReplicasetNew[k] = v
208-
}
209+
nameToReplicasetNew := copyMap(nameToReplicasetOld)
209210
delete(nameToReplicasetNew, rsName)
210211

211212
r.setNameToReplicaset(nameToReplicasetNew)

vshard.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,8 @@ type Router struct {
4242

4343
routeMap atomic.Pointer[routeMap]
4444

45-
// ----------------------- Map-Reduce -----------------------
46-
// Storage Ref ID. It must be unique for each ref request
47-
// and therefore is global and monotonically growing.
45+
// refID is used for Map-Reduce operation. Since it must be unique (within connection) for each ref request,
46+
// we made it global and monotonically growing for each Router instance.
4847
refID atomic.Int64
4948

5049
cancelDiscovery func()

0 commit comments

Comments
 (0)