Skip to content

Commit 5ac7bde

Browse files
committed
BACKUP: node/object: Cache storage policy execution results
Signed-off-by: Leonard Lyubich <[email protected]>
1 parent 4439588 commit 5ac7bde

File tree

3 files changed

+362
-54
lines changed

3 files changed

+362
-54
lines changed

cmd/neofs-node/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,8 @@ type cfgObject struct {
473473
cfgLocalStorage cfgLocalStorage
474474

475475
tombstoneLifetime uint64
476+
477+
containerNodesBuilder *containerNodesBuilder
476478
}
477479

478480
type cfgNotifications struct {

cmd/neofs-node/object.go

Lines changed: 33 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
1717
morphClient "github.com/nspcc-dev/neofs-node/pkg/morph/client"
1818
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
19+
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
20+
netmapevents "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
1921
objectTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/object/grpc"
2022
objectService "github.com/nspcc-dev/neofs-node/pkg/services/object"
2123
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl"
@@ -237,6 +239,20 @@ func initObjectService(c *cfg) {
237239
}
238240
}
239241

242+
var err error
243+
c.cfgObject.containerNodesBuilder, err = newContainerNodesBuilder(c.netMapSource, c.cfgObject.cnrSource, c.networkState.CurrentEpoch(), func(err error) {
244+
c.log.Warn("internal failure of the containers' storage policy processor, the cost of system resources for processing object operations may increase",
245+
zap.Error(err))
246+
})
247+
fatalOnErrDetails("init cached builder of the container nodes", err)
248+
249+
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
250+
epoch := ev.(netmapevents.NewEpoch).EpochNumber()
251+
c.log.Debug("switching to new epoch in the container node builder", zap.Uint64("epoch", epoch))
252+
c.cfgObject.containerNodesBuilder.setCurrentEpoch(epoch)
253+
c.log.Debug("container node builder successfully switched to the new epoch", zap.Uint64("epoch", epoch))
254+
})
255+
240256
sPut := putsvc.NewService(c,
241257
putsvc.WithKeyStorage(keyStorage),
242258
putsvc.WithClientConstructor(putConstructor),
@@ -599,32 +615,15 @@ func (c *cfg) getContainerNodesFromNetworkMap(networkMap *netmapsdk.NetMap, cnrI
599615
return
600616
}
601617

602-
func (c *cfg) getContainerNodesAtEpoch(cnrID cid.ID, epoch uint64) (
603-
nodeSets [][]netmapsdk.NodeInfo,
604-
storagePolicy netmapsdk.PlacementPolicy,
605-
networkMap *netmapsdk.NetMap,
606-
err error,
607-
) {
608-
networkMap, err = c.netMapSource.GetNetMapByEpoch(epoch)
609-
if err != nil {
610-
err = fmt.Errorf("read network map by epoch: %w", err)
611-
return
612-
}
613-
614-
nodeSets, storagePolicy, err = c.getContainerNodesFromNetworkMap(networkMap, cnrID)
615-
616-
return
617-
}
618-
619618
// GetContainerNodesAtEpoch reads storage policy of the referenced container
620619
// from the underlying container storage, reads network map at the specified
621620
// epoch from the underlying storage, applies the storage policy to it and
622621
// returns sets of selected storage nodes.
623622
//
624623
// GetContainerNodesAtEpoch implements [searchsvc.Node].
625624
func (c *cfg) GetContainerNodesAtEpoch(cnr cid.ID, epoch uint64) ([][]netmapsdk.NodeInfo, error) {
626-
nodeSets, _, _, err := c.getContainerNodesAtEpoch(cnr, epoch)
627-
return nodeSets, err
625+
cnrNodes, _, err := c.cfgObject.containerNodesBuilder.getForEpoch(cnr, epoch)
626+
return cnrNodes.nodeSets, err
628627
}
629628

630629
// IsLocalPublicKey implements [searchsvc.Node], [getsvc.Node].
@@ -640,60 +639,40 @@ func (c *cfg) IsLocalPublicKey(bPubKey []byte) bool {
640639
//
641640
// GetObjectNodesAtEpoch implements [getsvc.Node].
642641
func (c *cfg) GetObjectNodesAtEpoch(addr oid.Address, epoch uint64) ([][]netmapsdk.NodeInfo, []uint32, error) {
643-
nodeLists, storagePolicy, networkMap, err := c.getContainerNodesAtEpoch(addr.Container(), epoch)
642+
cnrNodes, err := c.cfgObject.containerNodesBuilder.getForObjectAtEpoch(addr, epoch)
644643
if err != nil {
645644
return nil, nil, err
646645
}
647646

648-
nodeLists, err = networkMap.PlacementVectors(nodeLists, addr.Object())
649-
if err != nil {
650-
return nil, nil, fmt.Errorf("apply object's storage policy to the network map: %w", err)
651-
}
652-
653-
primaryNums := make([]uint32, storagePolicy.NumberOfReplicas())
654-
for i := range primaryNums {
655-
primaryNums[i] = storagePolicy.ReplicaNumberByIndex(i)
656-
}
657-
658-
return nodeLists, primaryNums, nil
647+
return cnrNodes.nodeSets, cnrNodes.primaryNodesNums, nil
659648
}
660649

661650
// TODO: docs
662651
func (c *cfg) ObjectStoragePolicyForContainer(cnrID cid.ID) (putsvc.ObjectStoragePolicy, error) {
663-
networkMap, err := netmap.GetLatestNetworkMap(c.netMapSource)
652+
cnrNodes, networkMap, cache, err := c.cfgObject.containerNodesBuilder._getForEpoch(cnrID, 0)
664653
if err != nil {
665-
return nil, fmt.Errorf("read current network map: %w", err)
666-
}
667-
668-
nodeSets, storagePolicy, err := c.getContainerNodesFromNetworkMap(networkMap, cnrID)
669-
if err != nil {
670-
return nil, fmt.Errorf("get storage nodes for the container in the current network map: %w", err)
671-
}
672-
673-
primaryNodeNums := make([]uint32, storagePolicy.NumberOfReplicas())
674-
for i := range primaryNodeNums {
675-
primaryNodeNums[i] = storagePolicy.ReplicaNumberByIndex(i)
654+
return nil, err
676655
}
677656

678-
return containerStoragePolicy{
679-
networkMap: networkMap,
680-
nodeSets: nodeSets,
681-
primaryNodeNums: primaryNodeNums,
657+
return &containerStoragePolicy{
658+
networkMap: networkMap,
659+
cnrNodes: cnrNodes,
660+
cache: cache,
682661
}, nil
683662
}
684663

685664
type containerStoragePolicy struct {
686-
networkMap *netmapsdk.NetMap
687-
nodeSets [][]netmapsdk.NodeInfo
688-
primaryNodeNums []uint32
665+
networkMap *netmapsdk.NetMap
666+
cnrNodes containerNodes
667+
cache *containerNodesCacheItem
689668
}
690669

691670
// TODO: docs
692-
func (x containerStoragePolicy) StorageNodesForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, []uint32, error) {
693-
nodeLists, err := x.networkMap.PlacementVectors(x.nodeSets, obj)
671+
func (x *containerStoragePolicy) StorageNodesForObject(obj oid.ID) ([][]netmapsdk.NodeInfo, []uint32, error) {
672+
nodeLists, err := sortContainerNodesForObjectAtEpoch(x.cnrNodes.nodeSets, x.networkMap, x.cache, obj)
694673
if err != nil {
695-
return nil, nil, fmt.Errorf("sort container nodes for the object: %w", err)
674+
return nil, nil, err
696675
}
697676

698-
return nodeLists, x.primaryNodeNums, nil
677+
return nodeLists, x.cnrNodes.primaryNodesNums, nil
699678
}

0 commit comments

Comments
 (0)