Skip to content

Commit 20c91bb

Browse files
authored
[p2p] separate p2p topic for consensus and action msg (#4642)
1 parent 3ce82fd commit 20c91bb

File tree

3 files changed

+174
-96
lines changed

3 files changed

+174
-96
lines changed

chainservice/builder.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,14 @@ func (builder *Builder) buildBlockchain(forSubChain, forTest bool) error {
490490
if err := builder.cs.chain.AddSubscriber(builder.cs.actpool); err != nil {
491491
return errors.Wrap(err, "failed to add actpool as subscriber")
492492
}
493+
if sub, ok := builder.cs.p2pAgent.(blockchain.BlockCreationSubscriber); ok {
494+
if err := builder.cs.chain.AddSubscriber(sub); err != nil {
495+
return errors.Wrap(err, "failed to add p2p agent as subscriber")
496+
}
497+
} else {
498+
log.L().Info("p2p agent does not implement BlockCreationSubscriber, skip adding it as subscriber")
499+
}
500+
493501
if builder.cs.indexer != nil && builder.cfg.Chain.EnableAsyncIndexWrite {
494502
// config asks for a standalone indexer
495503
indexBuilder, err := blockindex.NewIndexBuilder(builder.cs.chain.ChainID(), builder.cfg.Genesis, builder.cs.blockdao, builder.cs.indexer)

p2p/agent.go

Lines changed: 161 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"strconv"
1313
"strings"
14+
"sync/atomic"
1415
"time"
1516

1617
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -28,6 +29,7 @@ import (
2829
goproto "github.com/iotexproject/iotex-proto/golang"
2930
"github.com/iotexproject/iotex-proto/golang/iotexrpc"
3031

32+
"github.com/iotexproject/iotex-core/v2/blockchain/block"
3133
"github.com/iotexproject/iotex-core/v2/pkg/lifecycle"
3234
"github.com/iotexproject/iotex-core/v2/pkg/log"
3335
"github.com/iotexproject/iotex-core/v2/pkg/routine"
@@ -67,10 +69,12 @@ func init() {
6769

6870
const (
6971
// TODO: the topic could be fine tuned
70-
_broadcastTopic = "broadcast"
71-
_unicastTopic = "unicast"
72-
_numDialRetries = 8
73-
_dialRetryInterval = 2 * time.Second
72+
_broadcastTopic = "broadcast"
73+
_broadcastSubTopicConsensus = "_consensus"
74+
_broadcastSubTopicAction = "_action"
75+
_unicastTopic = "unicast"
76+
_numDialRetries = 8
77+
_dialRetryInterval = 2 * time.Second
7478
)
7579

7680
type (
@@ -138,6 +142,8 @@ type (
138142
reconnectTimeout time.Duration
139143
reconnectTask *routine.RecurringTask
140144
qosMetrics *Qos
145+
unifiedTopic atomic.Bool
146+
isUnifiedTopic func(height uint64) bool
141147
}
142148

143149
cacheValue struct {
@@ -146,8 +152,16 @@ type (
146152
peerID string
147153
timestamp time.Time
148154
}
155+
156+
Option func(*agent)
149157
)
150158

159+
var WithUnifiedTopicHelper = func(isUnifiedTopic func(height uint64) bool) Option {
160+
return func(a *agent) {
161+
a.isUnifiedTopic = isUnifiedTopic
162+
}
163+
}
164+
151165
// DefaultConfig is the default config of p2p
152166
var DefaultConfig = Config{
153167
Host: "0.0.0.0",
@@ -215,9 +229,10 @@ func NewAgent(
215229
validateBroadcastInbound ValidateBroadcastInbound,
216230
broadcastHandler HandleBroadcastInbound,
217231
unicastHandler HandleUnicastInboundAsync,
232+
opts ...Option,
218233
) Agent {
219234
log.L().Info("p2p agent", log.Hex("topicSuffix", genesisHash[22:]))
220-
return &agent{
235+
a := &agent{
221236
cfg: cfg,
222237
chainID: chainID,
223238
// Make sure the honest node only care the messages related the chain from the same genesis
@@ -231,6 +246,11 @@ func NewAgent(
231246
reconnectTimeout: cfg.ReconnectInterval,
232247
qosMetrics: NewQoS(time.Now(), 2*cfg.ReconnectInterval),
233248
}
249+
a.unifiedTopic.Store(true)
250+
for _, opt := range opts {
251+
opt(a)
252+
}
253+
return a
234254
}
235255

236256
func (p *agent) duplicateActions(msg *iotexrpc.BroadcastMsg) bool {
@@ -278,101 +298,118 @@ func (p *agent) Start(ctx context.Context) error {
278298
return errors.Wrap(err, "error when instantiating Agent host")
279299
}
280300

281-
if err := host.AddBroadcastPubSub(
282-
ctx,
283-
_broadcastTopic+p.topicSuffix,
284-
func(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
285-
if pid.String() == host.HostIdentity() {
286-
return pubsub.ValidationAccept
287-
}
288-
var broadcast iotexrpc.BroadcastMsg
289-
if err := proto.Unmarshal(msg.Data, &broadcast); err != nil {
290-
log.L().Debug("error when unmarshaling broadcast message", zap.Error(err))
291-
return pubsub.ValidationReject
292-
}
293-
if broadcast.ChainId != p.chainID {
294-
log.L().Debug("chain ID mismatch", zap.Uint32("received", broadcast.ChainId), zap.Uint32("expecting", p.chainID))
295-
return pubsub.ValidationReject
296-
}
297-
pMsg, err := goproto.TypifyRPCMsg(broadcast.MsgType, broadcast.MsgBody)
301+
broadcastValidator := func(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
302+
if pid.String() == host.HostIdentity() {
303+
return pubsub.ValidationAccept
304+
}
305+
var broadcast iotexrpc.BroadcastMsg
306+
if err := proto.Unmarshal(msg.Data, &broadcast); err != nil {
307+
log.L().Debug("error when unmarshaling broadcast message", zap.Error(err))
308+
return pubsub.ValidationReject
309+
}
310+
if broadcast.ChainId != p.chainID {
311+
log.L().Debug("chain ID mismatch", zap.Uint32("received", broadcast.ChainId), zap.Uint32("expecting", p.chainID))
312+
return pubsub.ValidationReject
313+
}
314+
pMsg, err := goproto.TypifyRPCMsg(broadcast.MsgType, broadcast.MsgBody)
315+
if err != nil {
316+
log.L().Debug("error when typifying broadcast message", zap.Error(err))
317+
return pubsub.ValidationReject
318+
}
319+
// dedup message
320+
if p.duplicateActions(&broadcast) {
321+
log.L().Debug("duplicate msg", zap.Int("type", int(broadcast.MsgType)))
322+
return pubsub.ValidationIgnore
323+
}
324+
ignore, err := p.validatorBroadcastInbound(pMsg)
325+
if err != nil {
326+
log.L().Debug("error when validating broadcast message", zap.Error(err))
327+
return pubsub.ValidationReject
328+
}
329+
if ignore {
330+
log.L().Debug("invalid broadcast message")
331+
return pubsub.ValidationIgnore
332+
}
333+
p.caches.Add(hash.Hash256b(msg.Data), &cacheValue{
334+
msgType: broadcast.MsgType,
335+
message: pMsg,
336+
peerID: pid.String(),
337+
timestamp: time.Now(),
338+
})
339+
return pubsub.ValidationAccept
340+
}
341+
broadcastHandler := func(ctx context.Context, pid peer.ID, data []byte) (err error) {
342+
// Blocking handling the broadcast message until the agent is started
343+
<-ready
344+
if pid.String() == host.HostIdentity() {
345+
return nil
346+
}
347+
var (
348+
peerID string
349+
pMsg proto.Message
350+
msgType iotexrpc.MessageType
351+
latency int64
352+
)
353+
defer func() {
354+
status := _successStr
298355
if err != nil {
299-
log.L().Debug("error when typifying broadcast message", zap.Error(err))
300-
return pubsub.ValidationReject
356+
status = _failureStr
301357
}
302-
// dedup message
303-
if p.duplicateActions(&broadcast) {
304-
log.L().Debug("duplicate msg", zap.Int("type", int(broadcast.MsgType)))
305-
return pubsub.ValidationIgnore
358+
_p2pMsgCounter.WithLabelValues("broadcast", strconv.Itoa(int(msgType)), "in", peerID, status).Inc()
359+
_p2pMsgLatency.WithLabelValues("broadcast", strconv.Itoa(int(msgType)), status).Observe(float64(latency))
360+
}()
361+
// Check the cache first
362+
cache, ok := p.caches.Get(hash.Hash256b(data))
363+
if ok {
364+
value := cache.(*cacheValue)
365+
pMsg = value.message
366+
peerID = value.peerID
367+
msgType = value.msgType
368+
latency = time.Since(value.timestamp).Nanoseconds() / time.Millisecond.Nanoseconds()
369+
} else {
370+
var broadcast iotexrpc.BroadcastMsg
371+
if err = proto.Unmarshal(data, &broadcast); err != nil {
372+
// TODO: unexpected error
373+
err = errors.Wrap(err, "error when marshaling broadcast message")
374+
return
306375
}
307-
ignore, err := p.validatorBroadcastInbound(pMsg)
376+
t := broadcast.GetTimestamp().AsTime()
377+
latency = time.Since(t).Nanoseconds() / time.Millisecond.Nanoseconds()
378+
pMsg, err = goproto.TypifyRPCMsg(broadcast.MsgType, broadcast.MsgBody)
308379
if err != nil {
309-
log.L().Debug("error when validating broadcast message", zap.Error(err))
310-
return pubsub.ValidationReject
311-
}
312-
if ignore {
313-
log.L().Debug("invalid broadcast message")
314-
return pubsub.ValidationIgnore
315-
}
316-
p.caches.Add(hash.Hash256b(msg.Data), &cacheValue{
317-
msgType: broadcast.MsgType,
318-
message: pMsg,
319-
peerID: pid.String(),
320-
timestamp: time.Now(),
321-
})
322-
return pubsub.ValidationAccept
323-
}, func(ctx context.Context, pid peer.ID, data []byte) (err error) {
324-
// Blocking handling the broadcast message until the agent is started
325-
<-ready
326-
if pid.String() == host.HostIdentity() {
327-
return nil
328-
}
329-
var (
330-
peerID string
331-
pMsg proto.Message
332-
msgType iotexrpc.MessageType
333-
latency int64
334-
)
335-
defer func() {
336-
status := _successStr
337-
if err != nil {
338-
status = _failureStr
339-
}
340-
_p2pMsgCounter.WithLabelValues("broadcast", strconv.Itoa(int(msgType)), "in", peerID, status).Inc()
341-
_p2pMsgLatency.WithLabelValues("broadcast", strconv.Itoa(int(msgType)), status).Observe(float64(latency))
342-
}()
343-
// Check the cache first
344-
cache, ok := p.caches.Get(hash.Hash256b(data))
345-
if ok {
346-
value := cache.(*cacheValue)
347-
pMsg = value.message
348-
peerID = value.peerID
349-
msgType = value.msgType
350-
latency = time.Since(value.timestamp).Nanoseconds() / time.Millisecond.Nanoseconds()
351-
} else {
352-
var broadcast iotexrpc.BroadcastMsg
353-
if err = proto.Unmarshal(data, &broadcast); err != nil {
354-
// TODO: unexpected error
355-
err = errors.Wrap(err, "error when marshaling broadcast message")
356-
return
357-
}
358-
t := broadcast.GetTimestamp().AsTime()
359-
latency = time.Since(t).Nanoseconds() / time.Millisecond.Nanoseconds()
360-
pMsg, err = goproto.TypifyRPCMsg(broadcast.MsgType, broadcast.MsgBody)
361-
if err != nil {
362-
err = errors.Wrap(err, "error when typifying broadcast message")
363-
return
364-
}
365-
peerID = broadcast.PeerId
366-
msgType = broadcast.MsgType
380+
err = errors.Wrap(err, "error when typifying broadcast message")
381+
return
367382
}
368-
// TODO: skip signature verification for actions
369-
p.broadcastInboundHandler(ctx, p.chainID, peerID, pMsg)
370-
p.qosMetrics.updateRecvBroadcast(time.Now())
371-
return
372-
}); err != nil {
373-
return errors.Wrap(err, "error when adding broadcast pubsub")
383+
peerID = broadcast.PeerId
384+
msgType = broadcast.MsgType
385+
}
386+
// TODO: skip signature verification for actions
387+
p.broadcastInboundHandler(ctx, p.chainID, peerID, pMsg)
388+
p.qosMetrics.updateRecvBroadcast(time.Now())
389+
return
374390
}
375391

392+
// default topics
393+
if err := host.AddBroadcastPubSub(
394+
ctx,
395+
_broadcastTopic+p.topicSuffix,
396+
broadcastValidator, broadcastHandler); err != nil {
397+
return errors.Wrap(err, "error when adding broadcast pubsub")
398+
}
399+
// add consensus topic
400+
if err := host.AddBroadcastPubSub(
401+
ctx,
402+
_broadcastTopic+_broadcastSubTopicConsensus+p.topicSuffix,
403+
nil, broadcastHandler); err != nil {
404+
return errors.Wrap(err, "error when adding broadcast pubsub")
405+
}
406+
// add action topic
407+
if err := host.AddBroadcastPubSub(
408+
ctx,
409+
_broadcastTopic+_broadcastSubTopicAction+p.topicSuffix,
410+
broadcastValidator, broadcastHandler); err != nil {
411+
return errors.Wrap(err, "error when adding broadcast pubsub")
412+
}
376413
if err := host.AddUnicastPubSub(_unicastTopic+p.topicSuffix, func(ctx context.Context, peerInfo peer.AddrInfo, data []byte) (err error) {
377414
// Blocking handling the unicast message until the agent is started
378415
<-ready
@@ -500,15 +537,36 @@ func (p *agent) BroadcastOutbound(ctx context.Context, msg proto.Message) (err e
500537
return
501538
}
502539
t := time.Now()
503-
if err = host.Broadcast(ctx, _broadcastTopic+p.topicSuffix, data); err != nil {
504-
err = errors.Wrap(err, "error when sending broadcast message")
540+
topic := p.messageTopic(msgType)
541+
if err = host.Broadcast(ctx, topic, data); err != nil {
542+
err = errors.Wrapf(err, "error when sending broadcast message to topic %s", topic)
505543
p.qosMetrics.updateSendBroadcast(t, false)
506544
return
507545
}
508546
p.qosMetrics.updateSendBroadcast(t, true)
509547
return
510548
}
511549

550+
func (p *agent) messageTopic(msgType iotexrpc.MessageType) string {
551+
defaultTopic := _broadcastTopic + p.topicSuffix
552+
switch msgType {
553+
case iotexrpc.MessageType_ACTION, iotexrpc.MessageType_ACTIONS:
554+
// TODO: can be removed after wake activated
555+
if p.unifiedTopic.Load() {
556+
return defaultTopic
557+
}
558+
return _broadcastTopic + _broadcastSubTopicAction + p.topicSuffix
559+
case iotexrpc.MessageType_CONSENSUS:
560+
// TODO: can be removed after wake activated
561+
if p.unifiedTopic.Load() {
562+
return defaultTopic
563+
}
564+
return _broadcastTopic + _broadcastSubTopicConsensus + p.topicSuffix
565+
default:
566+
return defaultTopic
567+
}
568+
}
569+
512570
func (p *agent) UnicastOutbound(ctx context.Context, peer peer.AddrInfo, msg proto.Message) (err error) {
513571
host := p.host
514572
if host == nil {
@@ -592,6 +650,14 @@ func (p *agent) BuildReport() string {
592650
return ""
593651
}
594652

653+
func (p *agent) ReceiveBlock(blk *block.Block) error {
654+
if p.isUnifiedTopic == nil {
655+
return nil
656+
}
657+
p.unifiedTopic.Store(p.isUnifiedTopic(blk.Height()))
658+
return nil
659+
}
660+
595661
func (p *agent) connectBootNode(ctx context.Context) error {
596662
if len(p.cfg.BootstrapNodes) == 0 {
597663
return nil

server/itx/server.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"go.uber.org/zap"
1919
"google.golang.org/protobuf/proto"
2020

21+
"github.com/iotexproject/iotex-proto/golang/iotextypes"
22+
2123
"github.com/iotexproject/iotex-core/v2/action"
2224
"github.com/iotexproject/iotex-core/v2/api"
2325
"github.com/iotexproject/iotex-core/v2/chainservice"
@@ -30,7 +32,6 @@ import (
3032
"github.com/iotexproject/iotex-core/v2/pkg/routine"
3133
"github.com/iotexproject/iotex-core/v2/pkg/util/httputil"
3234
"github.com/iotexproject/iotex-core/v2/server/itx/nodestats"
33-
"github.com/iotexproject/iotex-proto/golang/iotextypes"
3435
)
3536

3637
// Server is the iotex server instance containing all components.
@@ -98,6 +99,9 @@ func newServer(cfg config.Config, testing bool) (*Server, error) {
9899
dispatcher.ValidateMessage,
99100
dispatcher.HandleBroadcast,
100101
dispatcher.HandleTell,
102+
p2p.WithUnifiedTopicHelper(func(height uint64) bool {
103+
return height <= cfg.Genesis.WakeBlockHeight
104+
}),
101105
)
102106
}
103107
chains := make(map[uint32]*chainservice.ChainService)

0 commit comments

Comments
 (0)