Skip to content

Commit 3418cda

Browse files
committed
Problem: parallel tx execution is not supported
add basic support in sdk: - add a TxExecutor baseapp option - add TxIndex/TxCount/MsgIndex in context Update CHANGELOG.md Signed-off-by: yihuang <[email protected]> fix misspell fix lint run gci fix lint gci seems not compatible with gofumpt
1 parent a6c7aa5 commit 3418cda

File tree

9 files changed

+151
-51
lines changed

9 files changed

+151
-51
lines changed

.golangci.yml

-9
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ linters:
2020
- exportloopref
2121
- goconst
2222
- gocritic
23-
- gci
2423
- gofumpt
2524
- gosec
2625
- gosimple
@@ -63,14 +62,6 @@ issues:
6362
max-same-issues: 10000
6463

6564
linters-settings:
66-
gci:
67-
custom-order: true
68-
sections:
69-
- standard # Standard section: captures all standard packages.
70-
- default # Default section: contains all imports that could not be matched to another section type.
71-
- prefix(cosmossdk.io)
72-
- prefix(github.com/cosmos/cosmos-sdk)
73-
7465
gosec:
7566
# To select a subset of rules to run.
7667
# Available rules: https://github.com/securego/gosec#available-rules

CHANGELOG.md

+7-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,13 @@ Ref: https://keepachangelog.com/en/1.0.0/
3838

3939
## [Unreleased]
4040

41-
## Bug Fixes
41+
### Features
42+
43+
* (baseapp) [#205](https://github.com/crypto-org-chain/cosmos-sdk/pull/205) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support tx parallel execution.
44+
45+
## [Unreleased-Upstream]
46+
47+
### Bug Fixes
4248

4349
* (crypto) [#19691](https://github.com/cosmos/cosmos-sdk/pull/19745) Fix tx sign doesn't throw an error when incorrect Ledger is used.
4450

baseapp/abci.go

+58-33
Original file line numberDiff line numberDiff line change
@@ -336,11 +336,11 @@ func (app *BaseApp) ApplySnapshotChunk(req *abci.RequestApplySnapshotChunk) (*ab
336336
func (app *BaseApp) CheckTx(req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
337337
var mode execMode
338338

339-
switch {
340-
case req.Type == abci.CheckTxType_New:
339+
switch req.Type {
340+
case abci.CheckTxType_New:
341341
mode = execModeCheck
342342

343-
case req.Type == abci.CheckTxType_Recheck:
343+
case abci.CheckTxType_Recheck:
344344
mode = execModeReCheck
345345

346346
default:
@@ -775,48 +775,34 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
775775

776776
// Reset the gas meter so that the AnteHandlers aren't required to
777777
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
778-
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
778+
app.finalizeBlockState.SetContext(
779+
app.finalizeBlockState.Context().
780+
WithBlockGasMeter(gasMeter).
781+
WithTxCount(len(req.Txs)),
782+
)
779783

780784
// Iterate over all raw transactions in the proposal and attempt to execute
781785
// them, gathering the execution results.
782786
//
783787
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
784788
// vote extensions, so skip those.
785-
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
786-
for _, rawTx := range req.Txs {
787-
var response *abci.ExecTxResult
788-
789-
if _, err := app.txDecoder(rawTx); err == nil {
790-
response = app.deliverTx(rawTx)
791-
} else {
792-
// In the case where a transaction included in a block proposal is malformed,
793-
// we still want to return a default response to comet. This is because comet
794-
// expects a response for each transaction included in a block proposal.
795-
response = sdkerrors.ResponseExecTxResultWithEvents(
796-
sdkerrors.ErrTxDecode,
797-
0,
798-
0,
799-
nil,
800-
false,
801-
)
802-
}
803-
804-
// check after every tx if we should abort
805-
select {
806-
case <-ctx.Done():
807-
return nil, ctx.Err()
808-
default:
809-
// continue
810-
}
811-
812-
txResults = append(txResults, response)
789+
txResults, err := app.executeTxs(ctx, req.Txs)
790+
if err != nil {
791+
// usually due to canceled
792+
return nil, err
813793
}
814794

815795
if app.finalizeBlockState.ms.TracingEnabled() {
816796
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
817797
}
818798

819-
endBlock, err := app.endBlock(app.finalizeBlockState.Context())
799+
var blockGasUsed uint64
800+
for _, res := range txResults {
801+
blockGasUsed += uint64(res.GasUsed)
802+
}
803+
sdkCtx := app.finalizeBlockState.Context().WithBlockGasUsed(blockGasUsed)
804+
805+
endBlock, err := app.endBlock(sdkCtx)
820806
if err != nil {
821807
return nil, err
822808
}
@@ -840,6 +826,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
840826
}, nil
841827
}
842828

829+
func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
830+
if app.txExecutor != nil {
831+
return app.txExecutor(ctx, len(txs), app.finalizeBlockState.ms, func(i int, ms storetypes.MultiStore) *abci.ExecTxResult {
832+
return app.deliverTxWithMultiStore(txs[i], i, ms)
833+
})
834+
}
835+
836+
txResults := make([]*abci.ExecTxResult, 0, len(txs))
837+
for i, rawTx := range txs {
838+
var response *abci.ExecTxResult
839+
840+
if _, err := app.txDecoder(rawTx); err == nil {
841+
response = app.deliverTx(rawTx, i)
842+
} else {
843+
// In the case where a transaction included in a block proposal is malformed,
844+
// we still want to return a default response to comet. This is because comet
845+
// expects a response for each transaction included in a block proposal.
846+
response = sdkerrors.ResponseExecTxResultWithEvents(
847+
sdkerrors.ErrTxDecode,
848+
0,
849+
0,
850+
nil,
851+
false,
852+
)
853+
}
854+
855+
// check after every tx if we should abort
856+
select {
857+
case <-ctx.Done():
858+
return nil, ctx.Err()
859+
default:
860+
// continue
861+
}
862+
863+
txResults = append(txResults, response)
864+
}
865+
return txResults, nil
866+
}
867+
843868
// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
844869
// Specifically, it will execute an application's BeginBlock (if defined), followed
845870
// by the transactions in the proposal, finally followed by the application's

baseapp/baseapp.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ type BaseApp struct {
193193
//
194194
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
195195
disableBlockGasMeter bool
196+
197+
// Optional alternative tx executor, used for block-stm parallel transaction execution.
198+
txExecutor TxExecutor
196199
}
197200

198201
// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
@@ -659,13 +662,14 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
659662
}
660663

661664
// retrieve the context for the tx w/ txBytes and other memoized values.
662-
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
665+
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
663666
modeState := app.getState(mode)
664667
if modeState == nil {
665668
panic(fmt.Sprintf("state is nil for mode %v", mode))
666669
}
667670
ctx := modeState.Context().
668-
WithTxBytes(txBytes)
671+
WithTxBytes(txBytes).
672+
WithTxIndex(txIndex)
669673
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed
670674

671675
ctx = ctx.WithConsensusParams(app.GetConsensusParams(ctx))
@@ -746,7 +750,11 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock,
746750
return resp, nil
747751
}
748752

749-
func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
753+
func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult {
754+
return app.deliverTxWithMultiStore(tx, txIndex, nil)
755+
}
756+
757+
func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore) *abci.ExecTxResult {
750758
gInfo := sdk.GasInfo{}
751759
resultStr := "successful"
752760

@@ -759,7 +767,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
759767
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
760768
}()
761769

762-
gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx)
770+
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, txIndex, txMultiStore)
763771
if err != nil {
764772
resultStr = "failed"
765773
resp = sdkerrors.ResponseExecTxResultWithEvents(
@@ -817,12 +825,19 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) {
817825
// returned if the tx does not run out of gas and if all the messages are valid
818826
// and execute successfully. An error is returned otherwise.
819827
func (app *BaseApp) runTx(mode execMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
828+
return app.runTxWithMultiStore(mode, txBytes, -1, nil)
829+
}
830+
831+
func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, txIndex int, txMultiStore storetypes.MultiStore) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
820832
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
821833
// determined by the GasMeter. We need access to the context to get the gas
822834
// meter, so we initialize upfront.
823835
var gasWanted uint64
824836

825-
ctx := app.getContextForTx(mode, txBytes)
837+
ctx := app.getContextForTx(mode, txBytes, txIndex)
838+
if txMultiStore != nil {
839+
ctx = ctx.WithMultiStore(txMultiStore)
840+
}
826841
ms := ctx.MultiStore()
827842

828843
// only run the tx if there is block gas remaining
@@ -1000,6 +1015,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
10001015
break
10011016
}
10021017

1018+
ctx = ctx.WithMsgIndex(i)
1019+
10031020
handler := app.msgServiceRouter.Handler(msg)
10041021
if handler == nil {
10051022
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)

baseapp/genesis.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)
1313
// ExecuteGenesisTx implements genesis.GenesisState from
1414
// cosmossdk.io/core/genesis to set initial state in genesis
1515
func (ba BaseApp) ExecuteGenesisTx(tx []byte) error {
16-
res := ba.deliverTx(tx)
16+
res := ba.deliverTx(tx, -1)
1717

1818
if res.Code != types.CodeTypeOK {
1919
return errors.New(res.Log)

baseapp/options.go

+10
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ func DisableBlockGasMeter() func(*BaseApp) {
122122
return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) }
123123
}
124124

125+
// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
126+
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
127+
return func(app *BaseApp) { app.txExecutor = executor }
128+
}
129+
125130
func (app *BaseApp) SetName(name string) {
126131
if app.sealed {
127132
panic("SetName() on sealed BaseApp")
@@ -372,3 +377,8 @@ func (app *BaseApp) SetStreamingManager(manager storetypes.StreamingManager) {
372377
func (app *BaseApp) SetDisableBlockGasMeter(disableBlockGasMeter bool) {
373378
app.disableBlockGasMeter = disableBlockGasMeter
374379
}
380+
381+
// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
382+
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
383+
app.txExecutor = executor
384+
}

baseapp/test_helpers.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s
7070
}
7171

7272
func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context {
73-
return app.getContextForTx(execModeFinalize, txBytes)
73+
return app.getContextForTx(execModeFinalize, txBytes, -1)
7474
}
7575

7676
func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context {
77-
return app.getContextForTx(execModeCheck, txBytes)
77+
return app.getContextForTx(execModeCheck, txBytes, -1)
7878
}

baseapp/txexecutor.go

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package baseapp
2+
3+
import (
4+
"context"
5+
6+
abci "github.com/cometbft/cometbft/abci/types"
7+
8+
"cosmossdk.io/store/types"
9+
)
10+
11+
type TxExecutor func(
12+
ctx context.Context,
13+
blockSize int,
14+
cms types.MultiStore,
15+
deliverTxWithMultiStore func(int, types.MultiStore) *abci.ExecTxResult,
16+
) ([]*abci.ExecTxResult, error)

types/context.go

+35
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,15 @@ type Context struct {
6464
streamingManager storetypes.StreamingManager
6565
cometInfo comet.BlockInfo
6666
headerInfo header.Info
67+
68+
// the index of the current tx in the block, -1 means not in finalize block context
69+
txIndex int
70+
// the index of the current msg in the tx, -1 means not in finalize block context
71+
msgIndex int
72+
// the total number of transactions in current block
73+
txCount int
74+
// sum the gas used by all the transactions in the current block, only accessible by end blocker
75+
blockGasUsed uint64
6776
}
6877

6978
// Proposed rename, not done to avoid API breakage
@@ -91,6 +100,10 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans
91100
func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager }
92101
func (c Context) CometInfo() comet.BlockInfo { return c.cometInfo }
93102
func (c Context) HeaderInfo() header.Info { return c.headerInfo }
103+
func (c Context) TxIndex() int { return c.txIndex }
104+
func (c Context) MsgIndex() int { return c.msgIndex }
105+
func (c Context) TxCount() int { return c.txCount }
106+
func (c Context) BlockGasUsed() uint64 { return c.blockGasUsed }
94107

95108
// clone the header before returning
96109
func (c Context) BlockHeader() cmtproto.Header {
@@ -137,6 +150,8 @@ func NewContext(ms storetypes.MultiStore, header cmtproto.Header, isCheckTx bool
137150
eventManager: NewEventManager(),
138151
kvGasConfig: storetypes.KVGasConfig(),
139152
transientKVGasConfig: storetypes.TransientGasConfig(),
153+
txIndex: -1,
154+
msgIndex: -1,
140155
}
141156
}
142157

@@ -310,6 +325,26 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context {
310325
return c
311326
}
312327

328+
func (c Context) WithTxIndex(txIndex int) Context {
329+
c.txIndex = txIndex
330+
return c
331+
}
332+
333+
func (c Context) WithTxCount(txCount int) Context {
334+
c.txCount = txCount
335+
return c
336+
}
337+
338+
func (c Context) WithMsgIndex(msgIndex int) Context {
339+
c.msgIndex = msgIndex
340+
return c
341+
}
342+
343+
func (c Context) WithBlockGasUsed(gasUsed uint64) Context {
344+
c.blockGasUsed = gasUsed
345+
return c
346+
}
347+
313348
// TODO: remove???
314349
func (c Context) IsZero() bool {
315350
return c.ms == nil

0 commit comments

Comments
 (0)