From ab353befdeea34ea96d67e8bdfc8925e3bd8187f Mon Sep 17 00:00:00 2001 From: jimboj Date: Tue, 24 Sep 2024 18:49:04 +0900 Subject: [PATCH] wip/neighborMsg handling --- lib/grandpa/grandpa.go | 24 +++++++++++++++- lib/grandpa/message_handler.go | 25 ++++++++++++++++- lib/grandpa/message_tracker.go | 50 ++++++++++++++++++++++++++++++---- 3 files changed, 91 insertions(+), 8 deletions(-) diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index a4ed7159b01..7ee453e886e 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -1139,8 +1139,30 @@ func (s *Service) handleVoteMessage(from peer.ID, vote *VoteMessage) (err error) return nil } +func (s *Service) handleNeighborMessage(round uint64, setID uint64) error { + // TODO sender side of neighbor msg + highestHeader, err := s.blockState.GetHighestFinalisedHeader() + if err != nil { + return err + } + neighbourMessage := &NeighbourPacketV1{ + Round: round, + SetID: setID, + Number: uint32(highestHeader.Number), + } + + cm, err := neighbourMessage.ToConsensusMessage() + if err != nil { + return fmt.Errorf("converting neighbour message to network message: %w", err) + } + + logger.Errorf("sending neighbour message: %v", neighbourMessage) + s.network.GossipMessage(cm) + return nil +} + func (s *Service) handleCommitMessage(commitMessage *CommitMessage) error { - logger.Debugf("received commit message: %+v", commitMessage) + logger.Warnf("received commit message: %+v", commitMessage) err := verifyBlockHashAgainstBlockNumber(s.blockState, commitMessage.Vote.Hash, uint(commitMessage.Vote.Number)) diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index c9ce3892349..16f6d078969 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -31,6 +31,8 @@ type MessageHandler struct { grandpa *Service blockState BlockState telemetry Telemetry + + isStart bool // This is a temp hacky way } // NewMessageHandler returns a new MessageHandler @@ -39,6 +41,7 @@ func NewMessageHandler(grandpa *Service, blockState BlockState, telemetryMailer grandpa: grandpa, blockState: blockState, telemetry: telemetryMailer, + isStart: true, } } @@ -82,8 +85,28 @@ func (h *MessageHandler) handleMessage(from peer.ID, m GrandpaMessage) (network. } } -func (h *MessageHandler) handleNeighbourMessage(_ *NeighbourPacketV1) error { +func (h *MessageHandler) handleNeighbourMessage(packet *NeighbourPacketV1) error { // TODO(#2931) + // This should be the receiver side of the handling messages, NOT GOSSIP + if h.isStart { + logger.Errorf("Received initial neighbor msg") + neighbourMessage := &NeighbourPacketV1{ + Round: packet.Round, + SetID: packet.SetID, + Number: packet.Number, + } + + cm, err := neighbourMessage.ToConsensusMessage() + if err != nil { + return fmt.Errorf("converting neighbour message to network message: %w", err) + } + + logger.Errorf("sending neighbour message: %v", neighbourMessage) + h.grandpa.network.GossipMessage(cm) + h.isStart = false + } + + // TODO handle in normal case? return nil } diff --git a/lib/grandpa/message_tracker.go b/lib/grandpa/message_tracker.go index 381760acc62..35bedf71621 100644 --- a/lib/grandpa/message_tracker.go +++ b/lib/grandpa/message_tracker.go @@ -22,6 +22,9 @@ type tracker struct { in chan *types.Block // receive imported block from BlockState stopped chan struct{} + neighborIn chan NeighbourPacketV1 // trigger the sending of a neighbor message + stoppedNeighbor chan struct{} + catchUpResponseMessageMutex sync.Mutex // round(uint64) is used as key and *CatchUpResponse as value catchUpResponseMessages map[uint64]*CatchUpResponse @@ -33,22 +36,28 @@ func newTracker(bs BlockState, handler *MessageHandler) *tracker { commitsCapacity = 1000 ) return &tracker{ - blockState: bs, - handler: handler, - votes: newVotesTracker(votesCapacity), - commits: newCommitsTracker(commitsCapacity), - in: bs.GetImportedBlockNotifierChannel(), - stopped: make(chan struct{}), + blockState: bs, + handler: handler, + votes: newVotesTracker(votesCapacity), + commits: newCommitsTracker(commitsCapacity), + in: bs.GetImportedBlockNotifierChannel(), + stopped: make(chan struct{}), + + neighborIn: make(chan NeighbourPacketV1), + stoppedNeighbor: make(chan struct{}), + catchUpResponseMessages: make(map[uint64]*CatchUpResponse), } } func (t *tracker) start() { go t.handleBlocks() + go t.handleNeighborMessage() } func (t *tracker) stop() { close(t.stopped) + close(t.stoppedNeighbor) t.blockState.FreeImportedBlockNotifierChannel(t.in) } @@ -62,6 +71,11 @@ func (t *tracker) addVote(peerID peer.ID, message *VoteMessage) { func (t *tracker) addCommit(cm *CommitMessage) { t.commits.add(cm) + t.neighborIn <- NeighbourPacketV1{ + Round: cm.Round + 1, + SetID: cm.SetID, // need to hceck for set changes + Number: 0, // This gets modified later + } } func (t *tracker) addCatchUpResponse(_ *CatchUpResponse) { @@ -92,6 +106,30 @@ func (t *tracker) handleBlocks() { } } +func (t *tracker) handleNeighborMessage() { + // https://github.com/paritytech/polkadot-sdk/blob/08498f5473351c3d2f8eacbe1bfd7bc6d3a2ef8d/substrate/client/consensus/grandpa/src/communication/mod.rs#L73 + const duration = time.Minute * 2 + ticker := time.NewTicker(duration) + defer ticker.Stop() + + for { + select { + case msg := <-t.neighborIn: + logger.Warnf("Event Channel handleNeighborMessage Triggered") + err := t.handler.grandpa.handleNeighborMessage(msg.Round, msg.SetID) + if err != nil { + logger.Errorf("handling neighbor message: %v", err) + } + + ticker.Reset(duration) + case <-ticker.C: + logger.Warnf("Tick handleNeighborMessage") + case <-t.stoppedNeighbor: + return + } + } +} + func (t *tracker) handleBlock(b *types.Block) { h := b.Header.Hash() vms := t.votes.messages(h)