Skip to content

Commit 5ba2dea

Browse files
committed
replicator: Send local objects using new replication API
It's more lightweight and supports binary copying without additional decode-encode round. Based on the fact that if the object is fixed, the request remains unchanged. According to this, transport message is encoded once and sent to all nodes. Closes #2317. Refs #2316. Signed-off-by: Leonard Lyubich <[email protected]>
1 parent 5b1d6ef commit 5ba2dea

File tree

5 files changed

+77
-8
lines changed

5 files changed

+77
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Changelog for NeoFS Node
2525
- Storage nodes no longer accept objects with header larger than 16KB (#2749)
2626
- IR sends NeoFS chain GAS to netmap nodes every epoch, not per a configurable blocks number (#2777)
2727
- Big objects are split with the new split scheme (#2667)
28+
- Background replicator transfers objects using new `ObjectService.Replicate` RPC (#2317)
2829

2930
### Removed
3031
- Object notifications incl. NATS (#2750)

pkg/core/client/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ package client
22

33
import (
44
"context"
5+
"io"
56

67
rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client"
78
"github.com/nspcc-dev/neofs-node/pkg/network"
89
"github.com/nspcc-dev/neofs-sdk-go/client"
910
"github.com/nspcc-dev/neofs-sdk-go/container"
1011
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
12+
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
1113
"github.com/nspcc-dev/neofs-sdk-go/object"
1214
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
1315
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
@@ -19,6 +21,7 @@ import (
1921
type Client interface {
2022
ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error
2123
ObjectPutInit(ctx context.Context, header object.Object, signer user.Signer, prm client.PrmObjectPutInit) (client.ObjectWriter, error)
24+
ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error
2225
ObjectDelete(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectDelete) (oid.ID, error)
2326
ObjectGetInit(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectGet) (object.Object, *client.PayloadReader, error)
2427
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)

pkg/network/cache/multi.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"io"
78
"sync"
89
"time"
910

@@ -14,6 +15,7 @@ import (
1415
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
1516
"github.com/nspcc-dev/neofs-sdk-go/container"
1617
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
18+
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
1719
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
1820
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
1921
reputationSDK "github.com/nspcc-dev/neofs-sdk-go/reputation"
@@ -237,6 +239,24 @@ func (x *multiClient) ObjectPutInit(ctx context.Context, header objectSDK.Object
237239
return
238240
}
239241

242+
func (x *multiClient) ReplicateObject(ctx context.Context, id oid.ID, src io.ReadSeeker, signer neofscrypto.Signer) error {
243+
var errSeek error
244+
err := x.iterateClients(ctx, func(c clientcore.Client) error {
245+
err := c.ReplicateObject(ctx, id, src, signer)
246+
if err != nil {
247+
_, errSeek = src.Seek(0, io.SeekStart)
248+
if errSeek != nil {
249+
return nil // to break the iterator
250+
}
251+
}
252+
return err
253+
})
254+
if err != nil {
255+
return err
256+
}
257+
return errSeek
258+
}
259+
240260
func (x *multiClient) ContainerAnnounceUsedSpace(ctx context.Context, announcements []container.SizeEstimation, prm client.PrmAnnounceSpace) error {
241261
return x.iterateClients(ctx, func(c clientcore.Client) error {
242262
return c.ContainerAnnounceUsedSpace(ctx, announcements, prm)

pkg/services/object/put/remote.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package putsvc
33
import (
44
"context"
55
"fmt"
6+
"io"
67

78
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
89
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
910
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
1011
internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client"
1112
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
13+
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
1214
"github.com/nspcc-dev/neofs-sdk-go/netmap"
1315
"github.com/nspcc-dev/neofs-sdk-go/object"
1416
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
@@ -134,3 +136,32 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
134136

135137
return nil
136138
}
139+
140+
// ReplicateObjectToNode copies binary-encoded NeoFS object from the given
141+
// [io.ReadSeeker] into local storage of the node described by specified
142+
// [netmap.NodeInfo].
143+
func (s *RemoteSender) ReplicateObjectToNode(ctx context.Context, id oid.ID, src io.ReadSeeker, nodeInfo netmap.NodeInfo) error {
144+
var nodeInfoForCons clientcore.NodeInfo
145+
146+
err := clientcore.NodeInfoFromRawNetmapElement(&nodeInfoForCons, netmapCore.Node(nodeInfo))
147+
if err != nil {
148+
return fmt.Errorf("parse remote node info: %w", err)
149+
}
150+
151+
key, err := s.keyStorage.GetKey(nil)
152+
if err != nil {
153+
return fmt.Errorf("fetch local node's private key: %w", err)
154+
}
155+
156+
c, err := s.clientConstructor.Get(nodeInfoForCons)
157+
if err != nil {
158+
return fmt.Errorf("init NeoFS API client of the remote node: %w", err)
159+
}
160+
161+
err = c.ReplicateObject(ctx, id, src, (*neofsecdsa.Signer)(key))
162+
if err != nil {
163+
return fmt.Errorf("copy object using NeoFS API client of the remote node: %w", err)
164+
}
165+
166+
return nil
167+
}

pkg/services/replicator/process.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package replicator
22

33
import (
4+
"bytes"
45
"context"
6+
"io"
57

6-
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
78
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
9+
"github.com/nspcc-dev/neofs-sdk-go/client"
810
"github.com/nspcc-dev/neofs-sdk-go/netmap"
911
"go.uber.org/zap"
1012
)
@@ -25,21 +27,27 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
2527
)
2628
}()
2729

28-
if task.obj == nil {
29-
var err error
30-
task.obj, err = engine.Get(p.localStorage, task.addr)
30+
var err error
31+
var prm *putsvc.RemotePutPrm
32+
var stream io.ReadSeeker
33+
binReplication := task.obj == nil
34+
if binReplication {
35+
b, err := p.localStorage.GetBytes(task.addr)
3136
if err != nil {
3237
p.log.Error("could not get object from local storage",
3338
zap.Stringer("object", task.addr),
3439
zap.Error(err))
3540

3641
return
3742
}
43+
stream = bytes.NewReader(b)
44+
if len(task.nodes) > 1 {
45+
stream = client.DemuxReplicatedObject(stream)
46+
}
47+
} else {
48+
prm = new(putsvc.RemotePutPrm).WithObject(task.obj)
3849
}
3950

40-
prm := new(putsvc.RemotePutPrm).
41-
WithObject(task.obj)
42-
4351
for i := 0; task.quantity > 0 && i < len(task.nodes); i++ {
4452
select {
4553
case <-ctx.Done():
@@ -54,7 +62,13 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult)
5462

5563
callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)
5664

57-
err := p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
65+
if binReplication {
66+
err = p.remoteSender.ReplicateObjectToNode(callCtx, task.addr.Object(), stream, task.nodes[i])
67+
// note that we don't need to reset stream because it is used exactly once
68+
// according to the client.DemuxReplicatedObject above
69+
} else {
70+
err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i]))
71+
}
5872

5973
cancel()
6074

0 commit comments

Comments
 (0)