Skip to content

Commit 51fefac

Browse files
authored
Use new API for background replication (#2674)
2 parents c6a3201 + 5ba2dea commit 51fefac

File tree

13 files changed

+1250
-21
lines changed

13 files changed

+1250
-21
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Changelog for NeoFS Node
1010
- CLI now allows to create and print eACL with numeric filters (#2742)
1111
- gRPC connection limits per endpoint (#1240)
1212
- `neofs-lens object link` command for the new link object inspection (#2799)
13+
- Storage nodes serve new `ObjectService.Replicate` RPC (#2674)
1314

1415
### Fixed
1516
- Access to `PUT` objects no longer grants `DELETE` rights (#2261)
@@ -24,6 +25,7 @@ Changelog for NeoFS Node
2425
- Storage nodes no longer accept objects with header larger than 16KB (#2749)
2526
- IR sends NeoFS chain GAS to netmap nodes every epoch, not per a configurable blocks number (#2777)
2627
- Big objects are split with the new split scheme (#2667)
28+
- Background replicator transfers objects using new `ObjectService.Replicate` RPC (#2317)
2729

2830
### Removed
2931
- Object notifications incl. NATS (#2750)

cmd/neofs-node/grpc.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,63 @@ import (
44
"crypto/tls"
55
"errors"
66
"fmt"
7+
"math"
78
"net"
89
"time"
910

1011
grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc"
12+
"github.com/nspcc-dev/neofs-sdk-go/object"
1113
"go.uber.org/zap"
1214
"golang.org/x/net/netutil"
1315
"google.golang.org/grpc"
1416
"google.golang.org/grpc/credentials"
1517
)
1618

1719
func initGRPC(c *cfg) {
20+
if c.cfgMorph.client == nil {
21+
initMorphComponents(c)
22+
}
23+
24+
// limit max size of single messages received by the gRPC servers up to max
25+
// object size setting of the NeoFS network: this is needed to serve
26+
// ObjectService.Replicate RPC transmitting the entire stored object in one
27+
// message
28+
maxObjSize, err := c.nCli.MaxObjectSize()
29+
fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err)
30+
31+
maxRecvSize := maxObjSize
32+
// don't forget about meta fields
33+
if maxRecvSize < uint64(math.MaxUint64-object.MaxHeaderLen) { // just in case, always true in practice
34+
maxRecvSize += object.MaxHeaderLen
35+
} else {
36+
maxRecvSize = math.MaxUint64
37+
}
38+
39+
var maxRecvMsgSizeOpt grpc.ServerOption
40+
if maxRecvSize > maxMsgSize { // do not decrease default value
41+
if maxRecvSize > math.MaxInt {
42+
// ^2GB for 32-bit systems which is currently enough in practice. If at some
43+
// point this is not enough, we'll need to expand the option
44+
fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d",
45+
maxRecvSize, math.MaxInt))
46+
}
47+
maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize))
48+
c.log.Debug("limit max recv gRPC message size to fit max stored objects",
49+
zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize))
50+
}
51+
1852
var successCount int
1953
grpcconfig.IterateEndpoints(c.cfgReader, func(sc *grpcconfig.Config) {
2054
serverOpts := []grpc.ServerOption{
2155
grpc.MaxSendMsgSize(maxMsgSize),
2256
}
57+
if maxRecvMsgSizeOpt != nil {
58+
// TODO(@cthulhu-rider): the setting can be server-global only now, support
59+
// per-RPC limits
60+
// TODO(@cthulhu-rider): max object size setting may change in general,
61+
// but server configuration is static now
62+
serverOpts = append(serverOpts, maxRecvMsgSizeOpt)
63+
}
2364

2465
tlsCfg := sc.TLS()
2566

cmd/neofs-node/object.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,10 @@ func initObjectService(c *cfg) {
359359
firstSvc = objectService.NewMetricCollector(signSvc, c.metricsCollector)
360360
}
361361

362-
server := objectTransportGRPC.New(firstSvc)
362+
objNode, err := newNodeForObjects(c.cfgObject.cnrSource, c.netMapSource, sPut, c.IsLocalKey)
363+
fatalOnErr(err)
364+
365+
server := objectTransportGRPC.New(firstSvc, objNode)
363366

364367
for _, srv := range c.cfgGRPC.servers {
365368
objectGRPC.RegisterObjectServiceServer(srv, server)
@@ -601,3 +604,47 @@ func (h headerSource) Head(address oid.Address) (*objectSDK.Object, error) {
601604

602605
return hw.h, nil
603606
}
607+
608+
// nodeForObjects represents NeoFS storage node for object storage.
609+
type nodeForObjects struct {
610+
putObjectService *putsvc.Service
611+
containerNodes *containerNodes
612+
isLocalPubKey func([]byte) bool
613+
}
614+
615+
func newNodeForObjects(containers containercore.Source, network netmap.Source, putObjectService *putsvc.Service, isLocalPubKey func([]byte) bool) (*nodeForObjects, error) {
616+
cnrNodes, err := newContainerNodes(containers, network)
617+
if err != nil {
618+
return nil, err
619+
}
620+
return &nodeForObjects{
621+
putObjectService: putObjectService,
622+
containerNodes: cnrNodes,
623+
isLocalPubKey: isLocalPubKey,
624+
}, nil
625+
}
626+
627+
// ForEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
628+
// of each node match the referenced container's storage policy at two latest
629+
// epochs into f. When f returns false, nil is returned instantly.
630+
//
631+
// Implements [object.Node] interface.
632+
func (x *nodeForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error {
633+
return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f)
634+
}
635+
636+
// IsOwnPublicKey checks whether given binary-encoded public key is assigned to
637+
// local storage node in the network map.
638+
//
639+
// Implements [object.Node] interface.
640+
func (x *nodeForObjects) IsOwnPublicKey(pubKey []byte) bool {
641+
return x.isLocalPubKey(pubKey)
642+
}
643+
644+
// VerifyAndStoreObject checks given object's format and, if it is correct,
645+
// saves the object in the node's local object storage.
646+
//
647+
// Implements [object.Node] interface.
648+
func (x *nodeForObjects) VerifyAndStoreObject(obj objectSDK.Object) error {
649+
return x.putObjectService.ValidateAndStoreObjectLocally(obj)
650+
}

cmd/neofs-node/policy.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/nspcc-dev/neofs-node/pkg/core/container"
7+
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
8+
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
9+
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
10+
)
11+
12+
// containerNodes wraps NeoFS network state to apply container storage policies.
13+
type containerNodes struct {
14+
containers container.Source
15+
network netmap.Source
16+
}
17+
18+
func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
19+
return &containerNodes{
20+
containers: containers,
21+
network: network,
22+
}, nil
23+
}
24+
25+
// forEachNodePubKeyInSets passes binary-encoded public key of each node into f.
26+
// When f returns false, forEachNodePubKeyInSets returns false instantly.
27+
// Otherwise, true is returned.
28+
func forEachNodePubKeyInSets(nodeSets [][]netmapsdk.NodeInfo, f func(pubKey []byte) bool) bool {
29+
for i := range nodeSets {
30+
for j := range nodeSets[i] {
31+
if !f(nodeSets[i][j].PublicKey()) {
32+
return false
33+
}
34+
}
35+
}
36+
return true
37+
}
38+
39+
// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
40+
// of each node match the referenced container's storage policy at two latest
41+
// epochs into f. When f returns false, nil is returned instantly.
42+
func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.ID, f func(pubKey []byte) bool) error {
43+
epoch, err := x.network.Epoch()
44+
if err != nil {
45+
return fmt.Errorf("read current NeoFS epoch: %w", err)
46+
}
47+
48+
cnr, err := x.containers.Get(cnrID)
49+
if err != nil {
50+
return fmt.Errorf("read container by ID: %w", err)
51+
}
52+
53+
networkMap, err := x.network.GetNetMapByEpoch(epoch)
54+
if err != nil {
55+
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
56+
}
57+
// TODO(#2692): node sets remain unchanged for fixed container and network map,
58+
// so recently calculated results worth caching
59+
ns, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
60+
if err != nil {
61+
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
62+
}
63+
64+
if !forEachNodePubKeyInSets(ns, f) || epoch == 0 {
65+
return nil
66+
}
67+
68+
epoch--
69+
70+
networkMap, err = x.network.GetNetMapByEpoch(epoch)
71+
if err != nil {
72+
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
73+
}
74+
75+
ns, err = networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
76+
if err != nil {
77+
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
78+
}
79+
80+
forEachNodePubKeyInSets(ns, f)
81+
82+
return nil
83+
}

0 commit comments

Comments
 (0)