Skip to content

Commit

Permalink
object/searchv2: Forward requests to container nodes in parallel
Browse files Browse the repository at this point in the history
Refs #3058.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Feb 14, 2025
1 parent dba93fa commit f1989c2
Showing 1 changed file with 64 additions and 47 deletions.
111 changes: 64 additions & 47 deletions pkg/services/object/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
"github.com/nspcc-dev/tzhash/tz"
"github.com/panjf2000/ants/v2"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -174,29 +175,36 @@ const (
)

type server struct {
handlers Handlers
fsChain FSChain
storage Storage
signer ecdsa.PrivateKey
mNumber uint32
metrics MetricCollector
aclChecker aclsvc.ACLChecker
reqInfoProc ACLInfoExtractor
nodeClients searchsvc.ClientConstructor
handlers Handlers
fsChain FSChain
storage Storage
signer ecdsa.PrivateKey
mNumber uint32
metrics MetricCollector
aclChecker aclsvc.ACLChecker
reqInfoProc ACLInfoExtractor
nodeClients searchsvc.ClientConstructor
searchWorkers *ants.Pool
}

// New provides protoobject.ObjectServiceServer for the given parameters.
func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp ACLInfoExtractor, cs searchsvc.ClientConstructor) protoobject.ObjectServiceServer {
// TODO: configurable capacity
sp, err := ants.NewPool(100, ants.WithNonblocking(true))
if err != nil {
panic(fmt.Errorf("create ants pool: %w", err)) // fails on invalid input only

Check warning on line 195 in pkg/services/object/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/server.go#L195

Added line #L195 was not covered by tests
}
return &server{
handlers: hs,
fsChain: fsChain,
storage: st,
signer: signer,
mNumber: magicNumber,
metrics: m,
aclChecker: ac,
reqInfoProc: rp,
nodeClients: cs,
handlers: hs,
fsChain: fsChain,
storage: st,
signer: signer,
mNumber: magicNumber,
metrics: m,
aclChecker: ac,
reqInfoProc: rp,
nodeClients: cs,
searchWorkers: sp,
}
}

Expand Down Expand Up @@ -1999,47 +2007,56 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear
}
} else {
var signed bool
var signErr error
var resErr error
mProcessedNodes := make(map[string]struct{})
var sets [][]sdkclient.SearchResultItem
var mores []bool
if err = s.fsChain.ForEachContainerNode(cnr, func(node sdknetmap.NodeInfo) bool {
var mtx sync.Mutex
var wg sync.WaitGroup
add := func(set []sdkclient.SearchResultItem, more bool) {
mtx.Lock()
sets, mores = append(sets, set), append(mores, more)
mtx.Unlock()
}
err = s.fsChain.ForEachContainerNode(cnr, func(node sdknetmap.NodeInfo) bool {
nodePub := node.PublicKey()
strKey := string(nodePub)
if _, ok := mProcessedNodes[strKey]; ok {
return true
}
mProcessedNodes[strKey] = struct{}{}

var set []sdkclient.SearchResultItem
var more bool
var err error
if s.fsChain.IsOwnPublicKey(nodePub) {
var crsr *meta.SearchCursor
if set, crsr, err = s.storage.SearchObjects(cnr, fs, body.Attributes, cursor, count); err != nil {
// TODO: log error
return true
}
more = crsr != nil
} else {
if !signed {
req.MetaHeader = &protosession.RequestMetaHeader{Ttl: 1, Origin: req.MetaHeader}
if req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(s.signer), req, nil); err != nil {
signErr = fmt.Errorf("sign request: %w", err)
return false
}
signed = true
}
// TODO: consider parallelism
if set, more, err = s.searchOnRemoteNode(ctx, node, req); err != nil {
// TODO: log error
return true
wg.Add(1)
go func() {
defer wg.Done()
if set, crsr, err := s.storage.SearchObjects(cnr, fs, body.Attributes, cursor, count); err == nil {
add(set, crsr != nil)
} // TODO: else log error

Check warning on line 2034 in pkg/services/object/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/server.go#L2001-L2034

Added lines #L2001 - L2034 were not covered by tests
}()
return true

Check warning on line 2036 in pkg/services/object/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/server.go#L2036

Added line #L2036 was not covered by tests
}
if !signed {
req.MetaHeader = &protosession.RequestMetaHeader{Ttl: 1, Origin: req.MetaHeader}
if req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(s.signer), req, nil); err != nil {
resErr = fmt.Errorf("sign request: %w", err)
return false
}
signed = true

Check warning on line 2044 in pkg/services/object/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/server.go#L2038-L2044

Added lines #L2038 - L2044 were not covered by tests
}
sets, mores = append(sets, set), append(mores, more)
return true
}); err == nil {
err = signErr
wg.Add(1)
if resErr = s.searchWorkers.Submit(func() {
defer wg.Done()
if set, more, err := s.searchOnRemoteNode(ctx, node, req); err == nil {
add(set, more)
} // TODO: else log error
}); resErr != nil {
wg.Done()
}
return resErr == nil

Check warning on line 2055 in pkg/services/object/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/server.go#L2046-L2055

Added lines #L2046 - L2055 were not covered by tests
})
wg.Wait()
if err == nil {
err = resErr
}
if err != nil {
return nil, err
Expand Down

0 comments on commit f1989c2

Please sign in to comment.