Skip to content

Commit de0faae

Browse files
envestccCoderZhi
andauthored
[admin] pause/unpause blockchain (#4656)
--------- Co-authored-by: CoderZhi <[email protected]>
1 parent 85196bc commit de0faae

File tree

8 files changed

+88
-2
lines changed

8 files changed

+88
-2
lines changed

blockchain/blockchain.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ var (
5757
ErrInsufficientGas = errors.New("insufficient intrinsic gas value")
5858
// ErrBalance indicates the error of balance
5959
ErrBalance = errors.New("invalid balance")
60+
// ErrPaused indicates the error of blockchain is paused
61+
ErrPaused = errors.New("blockchain is paused")
6062
)
6163

6264
func init() {
@@ -112,6 +114,8 @@ type (
112114

113115
// RemoveSubscriber make you listen to every single produced block
114116
RemoveSubscriber(BlockCreationSubscriber) error
117+
// Pause pauses the blockchain
118+
Pause(bool)
115119
}
116120

117121
// BlockMinter is the block minter interface
@@ -133,7 +137,8 @@ type (
133137
timerFactory *prometheustimer.TimerFactory
134138

135139
// used by account-based model
136-
bbf BlockMinter
140+
bbf BlockMinter
141+
pause bool
137142
}
138143
)
139144

@@ -264,6 +269,12 @@ func (bc *blockchain) Stop(ctx context.Context) error {
264269
return bc.lifecycle.OnStop(ctx)
265270
}
266271

272+
func (bc *blockchain) Pause(pause bool) {
273+
bc.mu.Lock()
274+
defer bc.mu.Unlock()
275+
bc.pause = pause
276+
}
277+
267278
func (bc *blockchain) BlockHeaderByHeight(height uint64) (*block.Header, error) {
268279
return bc.dao.HeaderByHeight(height)
269280
}
@@ -473,6 +484,9 @@ func (bc *blockchain) MintNewBlock(timestamp time.Time, opts ...MintOption) (*bl
473484
func (bc *blockchain) CommitBlock(blk *block.Block) error {
474485
bc.mu.Lock()
475486
defer bc.mu.Unlock()
487+
if bc.pause {
488+
return errors.Wrapf(ErrPaused, "blockchain is paused, cannot commit block %d, %x", blk.Height(), blk.HashBlock())
489+
}
476490
timer := bc.timerFactory.NewTimer("CommitBlock")
477491
defer timer.End()
478492
return bc.commitBlock(blk)

blocksync/blocksync.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.uber.org/zap"
1919
"google.golang.org/protobuf/proto"
2020

21+
"github.com/iotexproject/iotex-core/v2/blockchain"
2122
"github.com/iotexproject/iotex-core/v2/blockchain/block"
2223
"github.com/iotexproject/iotex-core/v2/blockchain/blockdao"
2324
"github.com/iotexproject/iotex-core/v2/pkg/fastrand"
@@ -169,6 +170,8 @@ func (bs *blockSyncer) commitBlocks(blks []*peerBlock) bool {
169170
return true
170171
case blockdao.ErrRemoteHeightTooLow:
171172
log.L().Info("remote height too low", zap.Uint64("height", blk.block.Height()))
173+
case blockchain.ErrPaused:
174+
log.L().Info("blockchain is paused, skip committing block", zap.Uint64("height", blk.block.Height()))
172175
default:
173176
bs.blockP2pPeer(blk.pid)
174177
log.L().Error("failed to commit block", zap.Error(err), zap.Uint64("height", blk.block.Height()), zap.String("peer", blk.pid))

chainservice/chainservice.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type ChainService struct {
8383
minter *factory.Minter
8484

8585
lastReceivedBlockHeight uint64
86+
paused atomic.Bool
8687
}
8788

8889
// Start starts the server
@@ -101,6 +102,9 @@ func (cs *ChainService) Start(ctx context.Context) error {
101102
case <-ctx.Done():
102103
return
103104
case <-ticker.C:
105+
if cs.paused.Load() {
106+
continue
107+
}
104108
currentHeight := cs.chain.TipHeight()
105109
lrbh := atomic.LoadUint64(&cs.lastReceivedBlockHeight)
106110
if currentHeight == lastHeight && lastReceivedBlockHeight != lrbh {
@@ -119,6 +123,10 @@ func (cs *ChainService) Stop(ctx context.Context) error {
119123
return cs.lifecycle.OnStopSequentially(ctx)
120124
}
121125

126+
func (cs *ChainService) Pause(pause bool) {
127+
cs.paused.Store(pause)
128+
}
129+
122130
// ReportFullness switch on or off block sync
123131
func (cs *ChainService) ReportFullness(_ context.Context, messageType iotexrpc.MessageType, fullness float32) {
124132
_blockchainFullnessMtc.WithLabelValues(iotexrpc.MessageType_name[int32(messageType)]).Set(float64(fullness))

consensus/scheme/rolldpos/rolldposctx.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,9 @@ func (ctx *rollDPoSCtx) Commit(msg interface{}) (bool, error) {
593593
switch err := ctx.chain.CommitBlock(pendingBlock); errors.Cause(err) {
594594
case blockchain.ErrInvalidTipHeight:
595595
return true, nil
596+
case blockchain.ErrPaused:
597+
ctx.logger().Info("chain is paused, block will not be committed")
598+
return false, nil
596599
case nil:
597600
break
598601
default:

consensus/scheme/standalone.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ import (
1212
"github.com/pkg/errors"
1313
"go.uber.org/zap"
1414

15+
"github.com/iotexproject/iotex-proto/golang/iotextypes"
16+
1517
"github.com/iotexproject/iotex-core/v2/blockchain"
1618
"github.com/iotexproject/iotex-core/v2/blockchain/block"
1719
"github.com/iotexproject/iotex-core/v2/pkg/log"
1820
"github.com/iotexproject/iotex-core/v2/pkg/routine"
19-
"github.com/iotexproject/iotex-proto/golang/iotextypes"
2021
)
2122

2223
// Standalone is the consensus scheme that periodically create blocks
@@ -39,6 +40,10 @@ func (s *standaloneHandler) Run() {
3940
}
4041

4142
if err := s.commitCb(blk); err != nil {
43+
if errors.Is(err, blockchain.ErrPaused) {
44+
log.L().Info("Consensus is paused, skip committing block", zap.Uint64("height", blk.Height()))
45+
return
46+
}
4247
log.L().Error("Failed to commit.", zap.Error(err))
4348
return
4449
}

server/itx/pauseable.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package itx
2+
3+
import (
4+
"net/http"
5+
)
6+
7+
type (
8+
Pauseable interface {
9+
Pause(bool)
10+
}
11+
PauseMgr struct {
12+
pauses []Pauseable
13+
}
14+
)
15+
16+
func NewPauseMgr(pauses ...Pauseable) *PauseMgr {
17+
return &PauseMgr{
18+
pauses: pauses,
19+
}
20+
}
21+
22+
func (pm *PauseMgr) Pause(pause bool) {
23+
for _, p := range pm.pauses {
24+
p.Pause(pause)
25+
}
26+
}
27+
28+
func (pm *PauseMgr) HandlePause(w http.ResponseWriter, r *http.Request) {
29+
pm.Pause(true)
30+
w.WriteHeader(http.StatusOK)
31+
}
32+
33+
func (pm *PauseMgr) HandleUnPause(w http.ResponseWriter, r *http.Request) {
34+
pm.Pause(false)
35+
w.WriteHeader(http.StatusOK)
36+
}

server/itx/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Server struct {
4343
p2pAgent p2p.Agent
4444
dispatcher dispatcher.Dispatcher
4545
nodeStats *nodestats.NodeStats
46+
pauseMgr *PauseMgr
4647
initializedSubChains map[uint32]bool
4748
mutex sync.RWMutex
4849
subModuleCancel context.CancelFunc
@@ -120,6 +121,7 @@ func newServer(cfg config.Config, testing bool) (*Server, error) {
120121
return nil, errors.Wrap(err, "fail to create chain service")
121122
}
122123
nodeStats := nodestats.NewNodeStats(rpcStats, cs.BlockSync(), p2pAgent)
124+
pauseMgr := NewPauseMgr(cs.Blockchain(), cs)
123125
apiServer, err := cs.NewAPIServer(cfg.API, cfg.Chain.EnableArchiveMode)
124126
if err != nil {
125127
return nil, errors.Wrap(err, "failed to create api server")
@@ -141,6 +143,7 @@ func newServer(cfg config.Config, testing bool) (*Server, error) {
141143
chainservices: chains,
142144
apiServers: apiServers,
143145
nodeStats: nodeStats,
146+
pauseMgr: pauseMgr,
144147
initializedSubChains: map[uint32]bool{},
145148
}
146149
// Setup sub-chain starter
@@ -302,6 +305,8 @@ func StartServer(ctx context.Context, svr *Server, probeSvr *probe.Server, cfg c
302305
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
303306
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
304307
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
308+
mux.Handle("/pause", http.HandlerFunc(svr.pauseMgr.HandlePause))
309+
mux.Handle("/unpause", http.HandlerFunc(svr.pauseMgr.HandleUnPause))
305310

306311
port := fmt.Sprintf(":%d", cfg.System.HTTPAdminPort)
307312
adminserv = httputil.NewServer(port, mux)

test/mock/mock_blockchain/mock_blockchain.go

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)