Skip to content

Commit

Permalink
blockchain: make subscriptions arguments read only
Browse files Browse the repository at this point in the history
There is no need to accept rw
channel. Strengthening the type to read-only will allow the caller to
ensure control of reading from the provided channel.

Closes #2885

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Dec 28, 2023
1 parent 3176f72 commit df365e6
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 26 deletions.
4 changes: 2 additions & 2 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type FakeChain struct {
config.Blockchain
*mempool.Pool
blocksCh []chan *block.Block
blocksCh []chan<- *block.Block
Blockheight atomic.Uint32
PoolTxF func(*transaction.Transaction) error
poolTxWithData func(*transaction.Transaction, any, *mempool.Pool) error
Expand Down Expand Up @@ -346,7 +346,7 @@ func (chain *FakeChain) PoolTx(tx *transaction.Transaction, _ ...*mempool.Pool)
}

// SubscribeForBlocks implements the Blockchainer interface.
func (chain *FakeChain) SubscribeForBlocks(ch chan *block.Block) {
func (chain *FakeChain) SubscribeForBlocks(ch chan<- *block.Block) {
chain.blocksCh = append(chain.blocksCh, ch)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Ledger interface {
GetTransaction(util.Uint256) (*transaction.Transaction, uint32, error)
ComputeNextBlockValidators() []*keys.PublicKey
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
SubscribeForBlocks(ch chan *coreb.Block)
SubscribeForBlocks(ch chan<- *coreb.Block)
UnsubscribeFromBlocks(ch chan *coreb.Block)
GetBaseExecFee() int64
CalculateAttributesFee(tx *transaction.Transaction) int64
Expand Down
31 changes: 16 additions & 15 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,27 +1327,27 @@ func (bc *Blockchain) notificationDispatcher() {
// These are just sets of subscribers, though modelled as maps
// for ease of management (not a lot of subscriptions is really
// expected, but maps are convenient for adding/deleting elements).
blockFeed = make(map[chan *block.Block]bool)
headerFeed = make(map[chan *block.Header]bool)
txFeed = make(map[chan *transaction.Transaction]bool)
notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool)
executionFeed = make(map[chan *state.AppExecResult]bool)
blockFeed = make(map[chan<- *block.Block]bool)
headerFeed = make(map[chan<- *block.Header]bool)
txFeed = make(map[chan<- *transaction.Transaction]bool)
notificationFeed = make(map[chan<- *state.ContainedNotificationEvent]bool)
executionFeed = make(map[chan<- *state.AppExecResult]bool)
)
for {
select {
case <-bc.stopCh:
return
case sub := <-bc.subCh:
switch ch := sub.(type) {
case chan *block.Header:
case chan<- *block.Header:
headerFeed[ch] = true
case chan *block.Block:
case chan<- *block.Block:
blockFeed[ch] = true
case chan *transaction.Transaction:
case chan<- *transaction.Transaction:
txFeed[ch] = true
case chan *state.ContainedNotificationEvent:
case chan<- *state.ContainedNotificationEvent:
notificationFeed[ch] = true
case chan *state.AppExecResult:
case chan<- *state.AppExecResult:
executionFeed[ch] = true
default:
panic(fmt.Sprintf("bad subscription: %T", sub))
Expand Down Expand Up @@ -1377,6 +1377,7 @@ func (bc *Blockchain) notificationDispatcher() {
}
for ch := range executionFeed {
ch <- aer
//aer := <- ch
}
for i := range aer.Events {
for ch := range notificationFeed {
Expand Down Expand Up @@ -2285,7 +2286,7 @@ func (bc *Blockchain) GetConfig() config.Blockchain {
// Make sure it's read from regularly as not reading these events might affect
// other Blockchain functions. Make sure you're not changing the received blocks,
// as it may affect the functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) {
func (bc *Blockchain) SubscribeForBlocks(ch chan<- *block.Block) {
bc.subCh <- ch
}

Expand All @@ -2295,7 +2296,7 @@ func (bc *Blockchain) SubscribeForBlocks(ch chan *block.Block) {
// affect other Blockchain functions. Make sure you're not changing the received
// headers, as it may affect the functionality of Blockchain and other
// subscribers.
func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) {
func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan<- *block.Header) {
bc.subCh <- ch
}

Expand All @@ -2305,7 +2306,7 @@ func (bc *Blockchain) SubscribeForHeadersOfAddedBlocks(ch chan *block.Header) {
// as not reading these events might affect other Blockchain functions. Make sure
// you're not changing the received transactions, as it may affect the
// functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction) {
func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transaction) {
bc.subCh <- ch
}

Expand All @@ -2317,7 +2318,7 @@ func (bc *Blockchain) SubscribeForTransactions(ch chan *transaction.Transaction)
// read from regularly as not reading these events might affect other Blockchain
// functions. Make sure you're not changing the received notification events, as
// it may affect the functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotificationEvent) {
func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) {
bc.subCh <- ch
}

Expand All @@ -2327,7 +2328,7 @@ func (bc *Blockchain) SubscribeForNotifications(ch chan *state.ContainedNotifica
// reading these events might affect other Blockchain functions. Make sure you're
// not changing the received execution results, as it may affect the
// functionality of Blockchain and other subscribers.
func (bc *Blockchain) SubscribeForExecutions(ch chan *state.AppExecResult) {
func (bc *Blockchain) SubscribeForExecutions(ch chan<- *state.AppExecResult) {
bc.subCh <- ch
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type (
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
PoolTxWithData(t *transaction.Transaction, data any, mp *mempool.Pool, feer mempool.Feer, verificationFunction func(t *transaction.Transaction, data any) error) error
RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block))
SubscribeForBlocks(ch chan *block.Block)
SubscribeForBlocks(ch chan<- *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/services/notary/notary.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type (
BlockHeight() uint32
GetMaxVerificationGAS() int64
GetNotaryContractScriptHash() util.Uint160
SubscribeForBlocks(ch chan *block.Block)
SubscribeForBlocks(ch chan<- *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block)
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/services/rpcsrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ type (
HeaderHeight() uint32
InitVerificationContext(ic *interop.Context, hash util.Uint160, witness *transaction.Witness) error
P2PSigExtensionsEnabled() bool
SubscribeForBlocks(ch chan *block.Block)
SubscribeForHeadersOfAddedBlocks(ch chan *block.Header)
SubscribeForExecutions(ch chan *state.AppExecResult)
SubscribeForNotifications(ch chan *state.ContainedNotificationEvent)
SubscribeForTransactions(ch chan *transaction.Transaction)
SubscribeForBlocks(ch chan<- *block.Block)
SubscribeForHeadersOfAddedBlocks(ch chan<- *block.Header)
SubscribeForExecutions(ch chan<- *state.AppExecResult)
SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent)
SubscribeForTransactions(ch chan<- *transaction.Transaction)
UnsubscribeFromBlocks(ch chan *block.Block)
UnsubscribeFromHeadersOfAddedBlocks(ch chan *block.Header)
UnsubscribeFromExecutions(ch chan *state.AppExecResult)
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/stateroot/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type (
GetConfig() config.Blockchain
GetDesignatedByRole(role noderoles.Role) (keys.PublicKeys, uint32, error)
HeaderHeight() uint32
SubscribeForBlocks(ch chan *block.Block)
SubscribeForBlocks(ch chan<- *block.Block)
UnsubscribeFromBlocks(ch chan *block.Block)
}

Expand Down

0 comments on commit df365e6

Please sign in to comment.