Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/sync): Implement warp sync strategy #4275

Merged
merged 44 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
3973e40
feat(dot/sync): Implement warp sync strategy
dimartiro Oct 22, 2024
14e80be
Add missing license
dimartiro Oct 22, 2024
cfa416b
Add result method in strategies and switch logic
dimartiro Oct 24, 2024
48bddbc
Fix tests
dimartiro Oct 24, 2024
8e2d855
Small fixes and first tests
dimartiro Oct 28, 2024
f4d620b
Modify current strategy selection
dimartiro Oct 28, 2024
36bb217
Share peer view set between strategies
dimartiro Oct 28, 2024
54d17d2
Add block announce tests
dimartiro Oct 28, 2024
a78a043
Add block announce handshake tests
dimartiro Oct 28, 2024
43dc227
Change comment
dimartiro Oct 28, 2024
6f4f281
Fix lint
dimartiro Oct 29, 2024
fd18587
Add CLI flag for sync mode
dimartiro Oct 29, 2024
8e26aac
Lint
dimartiro Oct 29, 2024
e0963cc
Lint
dimartiro Oct 29, 2024
cb029c2
Use right procotol id for sync
dimartiro Oct 31, 2024
b82723f
Fix comment
dimartiro Oct 31, 2024
1a1cd10
Add warp sync message decode test
dimartiro Oct 31, 2024
8a5f3cc
Add missing license
dimartiro Oct 31, 2024
ea6eee9
Remove import
dimartiro Oct 31, 2024
99cae82
Use right justification number for generic
dimartiro Nov 1, 2024
fa300a8
Disable linting lll
dimartiro Nov 1, 2024
1c66b1b
Fix types
dimartiro Nov 1, 2024
7154b0f
Small fixes
dimartiro Nov 1, 2024
b62b3fa
Create warp sync provider interface
dimartiro Nov 1, 2024
0b212ac
Remove unnecesary logs
dimartiro Nov 1, 2024
439dc21
Clean up, remove logs and local tests
dimartiro Nov 1, 2024
ff29217
Lint
dimartiro Nov 1, 2024
bad0935
Lint
dimartiro Nov 1, 2024
d73af92
Fix Test_PeerSupportsProtocol test
dimartiro Nov 4, 2024
840a340
Remove TODO
dimartiro Nov 4, 2024
484a28d
Fix findScheduledChange
dimartiro Nov 4, 2024
d0aa461
Improve logs and remove tests
dimartiro Nov 4, 2024
3dfbe2e
Improve logs
dimartiro Nov 4, 2024
2e5fdfa
Fix get target block
dimartiro Nov 5, 2024
b878685
Update comment
dimartiro Nov 5, 2024
ddedb34
Remove comment
dimartiro Nov 5, 2024
514d3bb
Refactor
dimartiro Nov 5, 2024
c744293
Update comment
dimartiro Nov 5, 2024
676e7c7
Add error message in panic
dimartiro Nov 5, 2024
d4a5d8e
Fix get target block
dimartiro Nov 9, 2024
bfbc17d
Add warp sync proof fixture
dimartiro Dec 5, 2024
20b303d
Move types to warpsync package
dimartiro Dec 5, 2024
8743d1f
Remove unnused linter comment
dimartiro Dec 16, 2024
c5d6815
Improve logs
dimartiro Jan 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain/kusama/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"

return config
}
1 change: 1 addition & 0 deletions chain/paseo/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"

return config
}
1 change: 1 addition & 0 deletions chain/polkadot/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"

return config
}
1 change: 1 addition & 0 deletions chain/westend-dev/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.RPC.UnsafeRPC = true
config.RPC.WSExternal = true
config.RPC.UnsafeWSExternal = true
config.Core.Sync = "full"

return config
}
1 change: 1 addition & 0 deletions chain/westend-local/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func DefaultConfig() *cfg.Config {
config.RPC.UnsafeRPC = true
config.RPC.WSExternal = true
config.RPC.UnsafeWSExternal = true
config.Core.Sync = "full"

return config
}
Expand Down
1 change: 1 addition & 0 deletions chain/westend/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func DefaultConfig() *cfg.Config {
config.Core.GrandpaAuthority = false
config.Core.Role = 1
config.Network.NoMDNS = false
config.Core.Sync = "full"

return config
}
8 changes: 8 additions & 0 deletions cmd/gossamer/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,14 @@ func addCoreFlags(cmd *cobra.Command) error {
return fmt.Errorf("failed to add --grandpa-interval flag: %s", err)
}

if err := addStringFlagBindViper(cmd,
"sync",
config.Core.Sync,
"sync mode [warp | full]",
"core.sync"); err != nil {
return fmt.Errorf("failed to add --sync flag: %s", err)
}

return nil
}

Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ const (
DefaultSystemName = "Gossamer"
// DefaultSystemVersion is the default system version
DefaultSystemVersion = "0.0.0"

// DefaultSyncMode is the default block sync mode
DefaultSyncMode = "full"
)

// DefaultRPCModules the default RPC modules
Expand Down Expand Up @@ -188,6 +191,7 @@ type CoreConfig struct {
GrandpaAuthority bool `mapstructure:"grandpa-authority"`
WasmInterpreter string `mapstructure:"wasm-interpreter,omitempty"`
GrandpaInterval time.Duration `mapstructure:"grandpa-interval,omitempty"`
Sync string `mapstructure:"sync,omitempty"`
}

// StateConfig contains the configuration for the state.
Expand Down Expand Up @@ -363,6 +367,7 @@ func DefaultConfig() *Config {
GrandpaAuthority: true,
WasmInterpreter: DefaultWasmInterpreter,
GrandpaInterval: DefaultDiscoveryInterval,
Sync: DefaultSyncMode,
},
Network: &NetworkConfig{
Port: DefaultNetworkPort,
Expand Down Expand Up @@ -444,6 +449,7 @@ func DefaultConfigFromSpec(nodeSpec *genesis.Genesis) *Config {
GrandpaAuthority: true,
WasmInterpreter: DefaultWasmInterpreter,
GrandpaInterval: DefaultDiscoveryInterval,
Sync: DefaultSyncMode,
},
Network: &NetworkConfig{
Port: DefaultNetworkPort,
Expand Down Expand Up @@ -525,6 +531,7 @@ func Copy(c *Config) Config {
GrandpaAuthority: c.Core.GrandpaAuthority,
WasmInterpreter: c.Core.WasmInterpreter,
GrandpaInterval: c.Core.GrandpaInterval,
Sync: c.Core.Sync,
},
Network: &NetworkConfig{
Port: c.Network.Port,
Expand Down
12 changes: 11 additions & 1 deletion dot/network/host_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package network

import (
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -396,12 +397,21 @@ func Test_PeerSupportsProtocol(t *testing.T) {
}
require.NoError(t, err)

genesisHash := nodeA.blockState.GenesisHash().String()
genesisHash = strings.TrimPrefix(genesisHash, "0x")
fullSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, SyncID)
warpSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, WarpSyncID)

tests := []struct {
protocol protocol.ID
expect bool
}{
{
protocol: protocol.ID("/gossamer/test/0/sync/2"),
protocol: protocol.ID(fullSyncProtocolId),
expect: true,
},
{
protocol: protocol.ID(warpSyncProtocolId),
expect: true,
},
{
Expand Down
6 changes: 6 additions & 0 deletions dot/network/messages/warp_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ type WarpProofRequest struct {
Begin common.Hash
}

func NewWarpProofRequest(from common.Hash) *WarpProofRequest {
return &WarpProofRequest{
Begin: from,
}
}

// Decode decodes the message into a WarpProofRequest
func (wpr *WarpProofRequest) Decode(in []byte) error {
return scale.Unmarshal(in, wpr)
Expand Down
5 changes: 3 additions & 2 deletions dot/network/mock_warp_sync_provider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,14 @@ func (s *Service) Start() error {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

genesisHashProtocolId := protocol.ID(s.cfg.BlockState.GenesisHash().String())
genesisHash := s.blockState.GenesisHash().String()
genesisHash = strings.TrimPrefix(genesisHash, "0x")
fullSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, SyncID)
warpSyncProtocolId := fmt.Sprintf("/%s%s", genesisHash, WarpSyncID)

s.host.registerStreamHandler(s.host.protocolID+SyncID, s.handleSyncStream)
s.host.registerStreamHandler(protocol.ID(fullSyncProtocolId), s.handleSyncStream)
s.host.registerStreamHandler(s.host.protocolID+lightID, s.handleLightStream)
s.host.registerStreamHandler(genesisHashProtocolId+WarpSyncID, s.handleWarpSyncStream)
s.host.registerStreamHandler(protocol.ID(warpSyncProtocolId), s.handleWarpSyncStream)

// register block announce protocol
err := s.RegisterNotificationsProtocol(
Expand Down Expand Up @@ -622,13 +625,16 @@ func (s *Service) SendMessage(to peer.ID, msg NotificationsMessage) error {
func (s *Service) GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) *RequestResponseProtocol {

protocolID := s.host.protocolID + protocol.ID(subprotocol)
genesisHash := s.blockState.GenesisHash().String()
genesisHash = strings.TrimPrefix(genesisHash, "0x")
protocolId := fmt.Sprintf("/%s%s", genesisHash, subprotocol)

return &RequestResponseProtocol{
ctx: s.ctx,
host: s.host,
requestTimeout: requestTimeout,
maxResponseSize: maxResponseSize,
protocolID: protocolID,
protocolID: protocol.ID(protocolId),
responseBuf: make([]byte, maxResponseSize),
responseBufMu: sync.Mutex{},
}
Expand Down
11 changes: 2 additions & 9 deletions dot/network/warp_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,16 @@ import (
"fmt"

"github.com/ChainSafe/gossamer/dot/network/messages"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/primitives/consensus/grandpa"
primitives "github.com/ChainSafe/gossamer/internal/primitives/consensus/grandpa"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/grandpa/warpsync"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

const MaxAllowedSameRequestPerPeer = 5

type WarpSyncVerificationResult struct {
SetId grandpa.SetID
AuthorityList primitives.AuthorityList
Header types.Header
Completed bool
}

// WarpSyncProvider is an interface for generating warp sync proofs
type WarpSyncProvider interface {
// Generate proof starting at given block hash. The proof is accumulated until maximum proof
Expand All @@ -34,7 +27,7 @@ type WarpSyncProvider interface {
encodedProof []byte,
setId grandpa.SetID,
authorities primitives.AuthorityList,
) (*WarpSyncVerificationResult, error)
) (*warpsync.WarpSyncVerificationResult, error)
}

func (s *Service) handleWarpSyncRequest(req messages.WarpProofRequest) ([]byte, error) {
Expand Down
10 changes: 10 additions & 0 deletions dot/peerset/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,14 @@ const (
// SameBlockSyncRequest used when a peer send us more than the max number of the same request.
SameBlockSyncRequest Reputation = math.MinInt32
SameBlockSyncRequestReason = "same block sync request"

// UnexpectedResponseValue is used when peer send an unexpected response.
UnexpectedResponseValue Reputation = -(1 << 29)
// UnexpectedResponseReason is used when peer send an unexpected response.
UnexpectedResponseReason = "Unexpected response"

// BadWarpProofValue is used when peer send invalid warp sync proof.
BadWarpProofValue Reputation = -(1 << 29)
// BadWarpProofReason is used when peer send invalid warp sync proof.
BadWarpProofReason = "Bad warp proof"
)
34 changes: 29 additions & 5 deletions dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/ChainSafe/gossamer/lib/crypto/sr25519"
"github.com/ChainSafe/gossamer/lib/genesis"
"github.com/ChainSafe/gossamer/lib/grandpa"
"github.com/ChainSafe/gossamer/lib/grandpa/warpsync"
"github.com/ChainSafe/gossamer/lib/keystore"
"github.com/ChainSafe/gossamer/lib/runtime"
rtstorage "github.com/ChainSafe/gossamer/lib/runtime/storage"
Expand Down Expand Up @@ -349,7 +350,7 @@ func (nodeBuilder) createNetworkService(config *cfg.Config, stateSrvc *state.Ser
return nil, fmt.Errorf("failed to parse network log level: %w", err)
}

warpSyncProvider := grandpa.NewWarpSyncProofProvider(
warpSyncProvider := warpsync.NewWarpSyncProofProvider(
stateSrvc.Block, stateSrvc.Grandpa,
)

Expand Down Expand Up @@ -523,8 +524,28 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg sync
return nil, fmt.Errorf("failed to parse sync log level: %w", err)
}

requestMaker := net.GetRequestResponseProtocol(network.SyncID,
blockRequestTimeout, network.MaxBlockResponseSize)
// Should be shared between all sync strategies
peersView := sync.NewPeerViewSet()

var warpSyncStrategy sync.Strategy

if config.Core.Sync == "warp" {
warpSyncProvider := warpsync.NewWarpSyncProofProvider(st.Block, st.Grandpa)

warpSyncCfg := &sync.WarpSyncConfig{
Telemetry: telemetryMailer,
BadBlocks: genesisData.BadBlocks,
WarpSyncProvider: warpSyncProvider,
WarpSyncRequestMaker: net.GetRequestResponseProtocol(network.WarpSyncID,
blockRequestTimeout, network.MaxBlockResponseSize),
SyncRequestMaker: net.GetRequestResponseProtocol(network.SyncID,
blockRequestTimeout, network.MaxBlockResponseSize),
BlockState: st.Block,
Peers: peersView,
}

warpSyncStrategy = sync.NewWarpSyncStrategy(warpSyncCfg)
}

syncCfg := &sync.FullSyncConfig{
BlockState: st.Block,
Expand All @@ -535,7 +556,9 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg sync
BlockImportHandler: cs,
Telemetry: telemetryMailer,
BadBlocks: genesisData.BadBlocks,
RequestMaker: requestMaker,
RequestMaker: net.GetRequestResponseProtocol(network.SyncID,
blockRequestTimeout, network.MaxBlockResponseSize),
Peers: peersView,
}
fullSync := sync.NewFullSyncStrategy(syncCfg)

Expand All @@ -544,7 +567,8 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg sync
sync.WithNetwork(net),
sync.WithBlockState(st.Block),
sync.WithSlotDuration(slotDuration),
sync.WithStrategies(fullSync, nil),
sync.WithWarpSyncStrategy(warpSyncStrategy),
sync.WithFullSyncStrategy(fullSync),
sync.WithMinPeers(config.Network.MinPeers),
), nil
}
Expand Down
11 changes: 8 additions & 3 deletions dot/sync/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import "time"

type ServiceConfig func(svc *SyncService)

func WithStrategies(currentStrategy, defaultStrategy Strategy) ServiceConfig {
func WithWarpSyncStrategy(warpSyncStrategy Strategy) ServiceConfig {
return func(svc *SyncService) {
svc.currentStrategy = currentStrategy
svc.defaultStrategy = defaultStrategy
svc.warpSyncStrategy = warpSyncStrategy
}
}

func WithFullSyncStrategy(fullSyncStrategy Strategy) ServiceConfig {
return func(svc *SyncService) {
svc.fullSyncStrategy = fullSyncStrategy
}
}

Expand Down
13 changes: 8 additions & 5 deletions dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type FullSyncConfig struct {
BadBlocks []string
NumOfTasks int
RequestMaker network.RequestMaker
Peers *peerViewSet
}

type importer interface {
Expand Down Expand Up @@ -75,15 +76,12 @@ func NewFullSyncStrategy(cfg *FullSyncConfig) *FullSyncStrategy {
reqMaker: cfg.RequestMaker,
blockState: cfg.BlockState,
numOfTasks: cfg.NumOfTasks,
peers: cfg.Peers,
blockImporter: newBlockImporter(cfg),
unreadyBlocks: newUnreadyBlocks(),
requestQueue: &requestsQueue[*messages.BlockRequestMessage]{
queue: list.New(),
},
peers: &peerViewSet{
view: make(map[peer.ID]peerView),
target: 0,
},
}
}

Expand All @@ -109,7 +107,7 @@ func (f *FullSyncStrategy) NextActions() ([]*SyncTask, error) {
}

// our best block is equal or ahead of current target.
// in the node's pov we are not legging behind so there's nothing to do
// in the node's pov we are not lagging behind so there's nothing to do
// or we didn't receive block announces, so lets ask for more blocks
if uint32(bestBlockHeader.Number) >= currentTarget {
return f.createTasks(reqsFromQueue), nil
Expand Down Expand Up @@ -405,6 +403,11 @@ func (f *FullSyncStrategy) IsSynced() bool {
return uint32(highestBlock)+messages.MaxBlocksInResponse >= f.peers.getTarget()
}

func (f *FullSyncStrategy) Result() any {
logger.Debug("trying to get a result from full sync strategy which is supposed to run forever")
return nil
}

type RequestResponseData struct {
req *messages.BlockRequestMessage
responseData []*types.BlockData
Expand Down
Loading
Loading