Skip to content

Commit 2b89d17

Browse files
committed
object/searchv2: Forward requests to container nodes in parallel
This adds non-blocking routine pool with static capacity = 100 used to forward `ObjectService.SearchV2` request to remote storage nodes. Refs #3058. Signed-off-by: Leonard Lyubich <[email protected]>
1 parent f38f0f8 commit 2b89d17

File tree

1 file changed

+64
-47
lines changed

1 file changed

+64
-47
lines changed

pkg/services/object/server.go

Lines changed: 64 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/nspcc-dev/neofs-sdk-go/user"
4545
"github.com/nspcc-dev/neofs-sdk-go/version"
4646
"github.com/nspcc-dev/tzhash/tz"
47+
"github.com/panjf2000/ants/v2"
4748
"google.golang.org/grpc"
4849
)
4950

@@ -174,29 +175,36 @@ const (
174175
)
175176

176177
type server struct {
177-
handlers Handlers
178-
fsChain FSChain
179-
storage Storage
180-
signer ecdsa.PrivateKey
181-
mNumber uint32
182-
metrics MetricCollector
183-
aclChecker aclsvc.ACLChecker
184-
reqInfoProc ACLInfoExtractor
185-
nodeClients searchsvc.ClientConstructor
178+
handlers Handlers
179+
fsChain FSChain
180+
storage Storage
181+
signer ecdsa.PrivateKey
182+
mNumber uint32
183+
metrics MetricCollector
184+
aclChecker aclsvc.ACLChecker
185+
reqInfoProc ACLInfoExtractor
186+
nodeClients searchsvc.ClientConstructor
187+
searchWorkers *ants.Pool
186188
}
187189

188190
// New provides protoobject.ObjectServiceServer for the given parameters.
189191
func New(hs Handlers, magicNumber uint32, fsChain FSChain, st Storage, signer ecdsa.PrivateKey, m MetricCollector, ac aclsvc.ACLChecker, rp ACLInfoExtractor, cs searchsvc.ClientConstructor) protoobject.ObjectServiceServer {
192+
// TODO: configurable capacity
193+
sp, err := ants.NewPool(100, ants.WithNonblocking(true))
194+
if err != nil {
195+
panic(fmt.Errorf("create ants pool: %w", err)) // fails on invalid input only
196+
}
190197
return &server{
191-
handlers: hs,
192-
fsChain: fsChain,
193-
storage: st,
194-
signer: signer,
195-
mNumber: magicNumber,
196-
metrics: m,
197-
aclChecker: ac,
198-
reqInfoProc: rp,
199-
nodeClients: cs,
198+
handlers: hs,
199+
fsChain: fsChain,
200+
storage: st,
201+
signer: signer,
202+
mNumber: magicNumber,
203+
metrics: m,
204+
aclChecker: ac,
205+
reqInfoProc: rp,
206+
nodeClients: cs,
207+
searchWorkers: sp,
200208
}
201209
}
202210

@@ -1999,47 +2007,56 @@ func (s *server) processSearchRequest(ctx context.Context, req *protoobject.Sear
19992007
}
20002008
} else {
20012009
var signed bool
2002-
var signErr error
2010+
var resErr error
20032011
mProcessedNodes := make(map[string]struct{})
20042012
var sets [][]sdkclient.SearchResultItem
20052013
var mores []bool
2006-
if err = s.fsChain.ForEachContainerNode(cnr, func(node sdknetmap.NodeInfo) bool {
2014+
var mtx sync.Mutex
2015+
var wg sync.WaitGroup
2016+
add := func(set []sdkclient.SearchResultItem, more bool) {
2017+
mtx.Lock()
2018+
sets, mores = append(sets, set), append(mores, more)
2019+
mtx.Unlock()
2020+
}
2021+
err = s.fsChain.ForEachContainerNode(cnr, func(node sdknetmap.NodeInfo) bool {
20072022
nodePub := node.PublicKey()
20082023
strKey := string(nodePub)
20092024
if _, ok := mProcessedNodes[strKey]; ok {
20102025
return true
20112026
}
20122027
mProcessedNodes[strKey] = struct{}{}
2013-
2014-
var set []sdkclient.SearchResultItem
2015-
var more bool
2016-
var err error
20172028
if s.fsChain.IsOwnPublicKey(nodePub) {
2018-
var crsr *meta.SearchCursor
2019-
if set, crsr, err = s.storage.SearchObjects(cnr, fs, body.Attributes, cursor, count); err != nil {
2020-
// TODO: log error
2021-
return true
2022-
}
2023-
more = crsr != nil
2024-
} else {
2025-
if !signed {
2026-
req.MetaHeader = &protosession.RequestMetaHeader{Ttl: 1, Origin: req.MetaHeader}
2027-
if req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(s.signer), req, nil); err != nil {
2028-
signErr = fmt.Errorf("sign request: %w", err)
2029-
return false
2030-
}
2031-
signed = true
2032-
}
2033-
// TODO: consider parallelism
2034-
if set, more, err = s.searchOnRemoteNode(ctx, node, req); err != nil {
2035-
// TODO: log error
2036-
return true
2029+
wg.Add(1)
2030+
go func() {
2031+
defer wg.Done()
2032+
if set, crsr, err := s.storage.SearchObjects(cnr, fs, body.Attributes, cursor, count); err == nil {
2033+
add(set, crsr != nil)
2034+
} // TODO: else log error
2035+
}()
2036+
return true
2037+
}
2038+
if !signed {
2039+
req.MetaHeader = &protosession.RequestMetaHeader{Ttl: 1, Origin: req.MetaHeader}
2040+
if req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(s.signer), req, nil); err != nil {
2041+
resErr = fmt.Errorf("sign request: %w", err)
2042+
return false
20372043
}
2044+
signed = true
20382045
}
2039-
sets, mores = append(sets, set), append(mores, more)
2040-
return true
2041-
}); err == nil {
2042-
err = signErr
2046+
wg.Add(1)
2047+
if resErr = s.searchWorkers.Submit(func() {
2048+
defer wg.Done()
2049+
if set, more, err := s.searchOnRemoteNode(ctx, node, req); err == nil {
2050+
add(set, more)
2051+
} // TODO: else log error
2052+
}); resErr != nil {
2053+
wg.Done()
2054+
}
2055+
return resErr == nil
2056+
})
2057+
wg.Wait()
2058+
if err == nil {
2059+
err = resErr
20432060
}
20442061
if err != nil {
20452062
return nil, err

0 commit comments

Comments
 (0)