Skip to content

Commit

Permalink
cli: add upload-state command
Browse files Browse the repository at this point in the history
Close #3782

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Feb 11, 2025
1 parent 4d5ef3d commit 06e0312
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 6 deletions.
8 changes: 4 additions & 4 deletions cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func newGraceContext() context.Context {
}

func initBCWithMetrics(cfg config.Config, log *zap.Logger) (*core.Blockchain, *metrics.Service, *metrics.Service, error) {
chain, _, err := initBlockChain(cfg, log)
chain, _, err := InitBlockChain(cfg, log)
if err != nil {
return nil, nil, nil, cli.Exit(err, 1)
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func resetDB(ctx *cli.Context) error {
if logCloser != nil {
defer func() { _ = logCloser() }()
}
chain, store, err := initBlockChain(cfg, log)
chain, store, err := InitBlockChain(cfg, log)
if err != nil {
return cli.Exit(fmt.Errorf("failed to create Blockchain instance: %w", err), 1)
}
Expand Down Expand Up @@ -655,8 +655,8 @@ Main:
return nil
}

// initBlockChain initializes BlockChain with preselected DB.
func initBlockChain(cfg config.Config, log *zap.Logger) (*core.Blockchain, storage.Store, error) {
// InitBlockChain initializes BlockChain with preselected DB.
func InitBlockChain(cfg config.Config, log *zap.Logger) (*core.Blockchain, storage.Store, error) {
store, err := storage.NewStore(cfg.ApplicationConfiguration.DBConfiguration)
if err != nil {
return nil, nil, cli.Exit(fmt.Errorf("could not initialize storage: %w", err), 1)
Expand Down
4 changes: 2 additions & 2 deletions cli/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,12 @@ func TestRestoreDB(t *testing.T) {

func TestInitBlockChain(t *testing.T) {
t.Run("bad storage", func(t *testing.T) {
_, _, err := initBlockChain(config.Config{}, nil)
_, _, err := InitBlockChain(config.Config{}, nil)
require.Error(t, err)
})

t.Run("empty logger", func(t *testing.T) {
_, _, err := initBlockChain(config.Config{
_, _, err := InitBlockChain(config.Config{
ApplicationConfiguration: config.ApplicationConfiguration{
DBConfiguration: dbconfig.DBConfiguration{
Type: dbconfig.InMemoryDB,
Expand Down
63 changes: 63 additions & 0 deletions cli/util/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,62 @@ func NewCommands() []*cli.Command {
options.Debug,
}, options.RPC...)
uploadBinFlags = append(uploadBinFlags, options.Wallet...)

uploadStateFlags := append([]cli.Flag{
&cli.StringSliceFlag{
Name: "fs-rpc-endpoint",
Aliases: []string{"fsr"},
Usage: "List of NeoFS storage node RPC addresses (comma-separated or multiple --fs-rpc-endpoint flags)",
Required: true,
Action: func(ctx *cli.Context, fsRpcEndpoints []string) error {
for _, endpoint := range fsRpcEndpoints {
if endpoint == "" {
return cli.Exit("NeoFS RPC endpoint cannot contain empty values", 1)
}

Check warning on line 115 in cli/util/convert.go

View check run for this annotation

Codecov / codecov/patch

cli/util/convert.go#L112-L115

Added lines #L112 - L115 were not covered by tests
}
return nil

Check warning on line 117 in cli/util/convert.go

View check run for this annotation

Codecov / codecov/patch

cli/util/convert.go#L117

Added line #L117 was not covered by tests
},
},
&cli.StringFlag{
Name: "container",
Aliases: []string{"cid"},
Usage: "NeoFS container ID to upload blocks to",
Required: true,
Action: cmdargs.EnsureNotEmpty("container"),
},
&cli.StringFlag{
Name: "state-attribute",
Usage: "Attribute key of the state object",
Value: neofs.DefaultStateAttribute,
Action: cmdargs.EnsureNotEmpty("state-attribute"),
}, &cli.UintFlag{
Name: "workers",
Usage: "Number of workers to traverse and upload objects concurrently",
Value: 20,
},
&cli.UintFlag{
Name: "searchers",
Usage: "Number of concurrent searches for objects",
Value: 100,
},
&cli.UintFlag{
Name: "retries",
Usage: "Maximum number of Neo/NeoFS node request retries",
Value: neofs.MaxRetries,
Action: func(context *cli.Context, u uint) error {
if u < 1 {
return cli.Exit("retries should be greater than 0", 1)
}
return nil

Check warning on line 150 in cli/util/convert.go

View check run for this annotation

Codecov / codecov/patch

cli/util/convert.go#L146-L150

Added lines #L146 - L150 were not covered by tests
},
},
&flags.AddressFlag{
Name: "address",
Usage: "Address to use for signing the uploading and searching transactions in NeoFS",
},
options.Debug, options.Config, options.ConfigFile, options.RelativePath,
}, options.Wallet...)
uploadStateFlags = append(uploadStateFlags, options.Network...)
return []*cli.Command{
{
Name: "util",
Expand Down Expand Up @@ -185,6 +241,13 @@ func NewCommands() []*cli.Command {
Action: uploadBin,
Flags: uploadBinFlags,
},
{
Name: "upload-state",
Usage: "Traverse the MPT and upload them to the NeoFS container",
UsageText: "neo-go util upload-state --fs-rpc-endpoint <address1>[,<address2>[...]] --container <cid> --state-attribute state --wallet <wallet> [--wallet-config <config>] [--address <address>] [--workers <num>] [--searchers <num>][--retries <num>] [--debug]",
Action: traverseMPT,
Flags: uploadStateFlags,
},
},
},
}
Expand Down
250 changes: 250 additions & 0 deletions cli/util/upload_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package util

import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/cli/server"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/pool"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
)

const (
// defaultBatchUploadSize is the default size of the batch of state objects to upload.
defaultBatchUploadSize = 1000
)

func traverseMPT(ctx *cli.Context) error {
if err := cmdargs.EnsureNone(ctx); err != nil {
return err
}
rpcNeoFS := ctx.StringSlice("fs-rpc-endpoint")
containerIDStr := ctx.String("container")
attr := ctx.String("state-attribute")
maxRetries := ctx.Uint("retries")
debug := ctx.Bool("debug")
numWorkers := ctx.Int("workers")

acc, _, err := options.GetAccFromContext(ctx)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to load account: %v", err), 1)
}

Check warning on line 49 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L35-L49

Added lines #L35 - L49 were not covered by tests

var containerID cid.ID
if err = containerID.DecodeString(containerIDStr); err != nil {
return cli.Exit(fmt.Sprintf("failed to decode container ID: %v", err), 1)
}
signer := user.NewAutoIDSignerRFC6979(acc.PrivateKey().PrivateKey)

params := pool.DefaultOptions()
params.SetHealthcheckTimeout(neofs.DefaultHealthcheckTimeout)
params.SetNodeDialTimeout(neofs.DefaultDialTimeout)
params.SetNodeStreamTimeout(neofs.DefaultStreamTimeout)
p, err := pool.New(pool.NewFlatNodeParams(rpcNeoFS), signer, params)
if err != nil {
return cli.Exit(fmt.Sprintf("failed to create NeoFS pool: %v", err), 1)
}
pWrapper := poolWrapper{p}
if err = pWrapper.Dial(context.Background()); err != nil {
return cli.Exit(fmt.Sprintf("failed to dial NeoFS pool: %v", err), 1)
}
defer p.Close()

var containerObj container.Container
err = retry(func() error {
containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{})
return err
}, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to get container with ID %s: %w", containerID, err), 1)
}
containerMagic := containerObj.Attribute("Magic")

logger := zap.NewExample()
cfg, err := options.GetConfigFromContext(ctx)
if err != nil {
return cli.Exit(err, 1)
}

Check warning on line 85 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L51-L85

Added lines #L51 - L85 were not covered by tests

chain, store, err := server.InitBlockChain(cfg, logger)
if err != nil {
return cli.Exit(err, 1)
}

Check warning on line 90 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L87-L90

Added lines #L87 - L90 were not covered by tests

magic := strconv.Itoa(int(chain.GetConfig().Magic))
if containerMagic != magic {
return cli.Exit(fmt.Sprintf("container magic %s does not match the network magic %s", containerMagic, magic), 1)
}

Check warning on line 95 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L92-L95

Added lines #L92 - L95 were not covered by tests

lastUploadedStateBatch, err := searchStateLastBatch(ctx, pWrapper, containerID, acc.PrivateKey(), attr, maxRetries, debug)

stateModule := chain.GetStateModule()
currentHeight := stateModule.CurrentLocalHeight()
if currentHeight <= lastUploadedStateBatch*defaultBatchUploadSize {
fmt.Fprintf(ctx.App.Writer, "No new states to upload. Need to upload starting from %d, current height %d\n", lastUploadedStateBatch*defaultBatchUploadSize, currentHeight)
return nil
}

Check warning on line 104 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L97-L104

Added lines #L97 - L104 were not covered by tests

fmt.Fprintf(ctx.App.Writer, "Latest state root found at height %d, current height %d\n", lastUploadedStateBatch*defaultBatchUploadSize, currentHeight)
for batchStart := lastUploadedStateBatch * defaultBatchUploadSize; batchStart < currentHeight; batchStart += defaultBatchUploadSize {
var (
batchEnd = min(batchStart+defaultBatchUploadSize, currentHeight)
errCh = make(chan error)
doneCh = make(chan struct{})
wg sync.WaitGroup
)
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", batchStart, batchEnd-1)
wg.Add(numWorkers)
for i := range numWorkers {
go func(i uint32) {
defer wg.Done()
for blockIndex := batchStart + i; blockIndex < batchEnd; blockIndex += uint32(numWorkers) {
stateRoot, err := stateModule.GetStateRoot(blockIndex)
if err != nil {
select {
case errCh <- err:
default:

Check warning on line 124 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L106-L124

Added lines #L106 - L124 were not covered by tests
}
return

Check warning on line 126 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L126

Added line #L126 was not covered by tests
}

nodes, err := traverseRawMPT(stateRoot.Root, store, mpt.ModeLatest)
if err != nil {
select {
case errCh <- err:
default:

Check warning on line 133 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L129-L133

Added lines #L129 - L133 were not covered by tests
}
return

Check warning on line 135 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L135

Added line #L135 was not covered by tests
}
objBytes, err := EncodeMPTNodes(nodes)
if err != nil {
select {
case errCh <- err:
default:

Check warning on line 141 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L137-L141

Added lines #L137 - L141 were not covered by tests
}
return

Check warning on line 143 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L143

Added line #L143 was not covered by tests
}
//h, err := chain.GetHeader(stateRoot.Hash())
//if err != nil {
// select {
// case errCh <- err:
// default:
// }
// return
//}
attrs := []object.Attribute{
*object.NewAttribute(attr, strconv.Itoa(int(blockIndex))),
*object.NewAttribute("Timestamp", strconv.FormatInt(time.Now().Unix(), 10)),
*object.NewAttribute("StateRoot", stateRoot.Root.StringLE()),
//*object.NewAttribute("BlockTime", strconv.FormatUint(h.Timestamp, 10)),
}
_, err = uploadObj(ctx.Context, pWrapper, signer, containerID, objBytes, attrs)
if err != nil {
select {
case errCh <- err:
default:

Check warning on line 163 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L153-L163

Added lines #L153 - L163 were not covered by tests
}
return

Check warning on line 165 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L165

Added line #L165 was not covered by tests
}
}
}(uint32(i))
}
go func() {
wg.Wait()
close(doneCh)
}()
select {
case err := <-errCh:
return cli.Exit(fmt.Sprintf("failed to process batch: %v", err), 1)
case <-doneCh:

Check warning on line 177 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L170-L177

Added lines #L170 - L177 were not covered by tests
}
}
err = store.Close()
if err != nil {
return cli.Exit(fmt.Errorf("failed to close the DB: %w", err), 1)
}
return nil

Check warning on line 184 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L180-L184

Added lines #L180 - L184 were not covered by tests
}

func traverseRawMPT(root util.Uint256, store storage.Store, mode mpt.TrieMode) ([][]byte, error) {
cache := storage.NewMemCachedStore(store)
billet := mpt.NewBillet(root, mode, 0, cache)
var nodes [][]byte

err := billet.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool {
nodes = append(nodes, nodeBytes)
return false
}, false)

Check warning on line 195 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L187-L195

Added lines #L187 - L195 were not covered by tests

if err != nil {
return nil, fmt.Errorf("failed to traverse MPT: %w", err)
}
return nodes, nil

Check warning on line 200 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L197-L200

Added lines #L197 - L200 were not covered by tests
}

// searchStateLastBatch searches for the last not empty batch (defaultBatchUploadSize) of state objects in the container.
func searchStateLastBatch(ctx *cli.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, attributeKey string, maxRetries uint, debug bool) (uint32, error) {
var (
doneCh = make(chan struct{})
errCh = make(chan error)

existingBatchStateCount = uint32(0)
)
go func() {
defer close(doneCh)
for i := 0; ; i++ {
indexIDs := searchObjects(ctx.Context, p, containerID, privKeys, attributeKey, uint(i*defaultBatchUploadSize), uint(i+1)*defaultBatchUploadSize, 100, maxRetries, debug, errCh)
resOIDs := make([]oid.ID, 0, 1)
for id := range indexIDs {
resOIDs = append(resOIDs, id)
break

Check warning on line 218 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L204-L218

Added lines #L204 - L218 were not covered by tests
}
if len(resOIDs) == 0 {
break

Check warning on line 221 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L220-L221

Added lines #L220 - L221 were not covered by tests
}
existingBatchStateCount++

Check warning on line 223 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L223

Added line #L223 was not covered by tests
}
}()
select {
case err := <-errCh:
return existingBatchStateCount, err
case <-doneCh:
if existingBatchStateCount > 0 {
return existingBatchStateCount - 1, nil
}
return 0, nil

Check warning on line 233 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L226-L233

Added lines #L226 - L233 were not covered by tests
}
}

func EncodeMPTNodes(nodes [][]byte) ([]byte, error) {
bw := io.NewBufBinWriter()
bw.BinWriter.WriteVarUint(uint64(len(nodes)))
if bw.Err != nil {
return nil, fmt.Errorf("failed to encode node count: %w", bw.Err)
}
for _, n := range nodes {
bw.BinWriter.WriteVarBytes(n) // Encodes length + data.
if bw.Err != nil {
return nil, fmt.Errorf("failed to encode MPT node: %w", bw.Err)
}

Check warning on line 247 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L237-L247

Added lines #L237 - L247 were not covered by tests
}
return bw.Bytes(), nil

Check warning on line 249 in cli/util/upload_state.go

View check run for this annotation

Codecov / codecov/patch

cli/util/upload_state.go#L249

Added line #L249 was not covered by tests
}
2 changes: 2 additions & 0 deletions pkg/services/helpers/neofs/blockstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
DefaultBlockAttribute = "Block"
// DefaultIndexFileAttribute is the default attribute name for index file objects.
DefaultIndexFileAttribute = "Index"
// DefaultStateAttribute is the default attribute name for state objects.
DefaultStateAttribute = "State"

// DefaultSearchBatchSize is a number of objects to search in a batch. We need to
// search with EQ filter to avoid partially-completed SEARCH responses. If EQ search
Expand Down

0 comments on commit 06e0312

Please sign in to comment.