Skip to content

Commit 43eae7d

Browse files
Merge branch 'master' into alex/result-approvals-godoc
2 parents c80c420 + e4b005e commit 43eae7d

File tree

207 files changed

+4837
-2170
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

207 files changed

+4837
-2170
lines changed

access/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1427,7 +1427,7 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
14271427

14281428
sub := h.api.SendAndSubscribeTransactionStatuses(ctx, &tx, request.GetEventEncodingVersion())
14291429

1430-
messageIndex := counters.NewMonotonousCounter(0)
1430+
messageIndex := counters.NewMonotonicCounter(0)
14311431
return subscription.HandleRPCSubscription(sub, func(txResults []*TransactionResult) error {
14321432
for i := range txResults {
14331433
index := messageIndex.Value()

cmd/access/node_builder/access_node_builder.go

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ import (
117117
"github.com/onflow/flow-go/state/protocol/blocktimer"
118118
"github.com/onflow/flow-go/storage"
119119
bstorage "github.com/onflow/flow-go/storage/badger"
120+
"github.com/onflow/flow-go/storage/operation/badgerimpl"
121+
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
120122
pstorage "github.com/onflow/flow-go/storage/pebble"
123+
"github.com/onflow/flow-go/storage/store"
121124
"github.com/onflow/flow-go/utils/grpcutils"
122125
)
123126

@@ -229,9 +232,10 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
229232
IdleTimeout: rest.DefaultIdleTimeout,
230233
MaxRequestSize: commonrest.DefaultMaxRequestSize,
231234
},
232-
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
233-
CompressorName: grpcutils.NoCompressor,
234-
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
235+
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
236+
CompressorName: grpcutils.NoCompressor,
237+
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
238+
EnableWebSocketsStreamAPI: false,
235239
},
236240
stateStreamConf: statestreambackend.Config{
237241
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
@@ -290,7 +294,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
290294
versionControlEnabled: true,
291295
storeTxResultErrorMessages: false,
292296
stopControlEnabled: false,
293-
registerDBPruneThreshold: pruner.DefaultThreshold,
297+
registerDBPruneThreshold: 0,
294298
}
295299
}
296300

@@ -552,8 +556,8 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
552556
func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
553557
var ds datastore.Batching
554558
var bs network.BlobService
555-
var processedBlockHeight storage.ConsumerProgress
556-
var processedNotifications storage.ConsumerProgress
559+
var processedBlockHeight storage.ConsumerProgressInitializer
560+
var processedNotifications storage.ConsumerProgressInitializer
557561
var bsDependable *module.ProxiedReadyDoneAware
558562
var execDataDistributor *edrequester.ExecutionDataDistributor
559563
var execDataCacheBackend *herocache.BlockExecutionData
@@ -607,21 +611,30 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
607611
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
608612
// Note: progress is stored in the datastore's DB since that is where the jobqueue
609613
// writes execution data to.
610-
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
611-
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
614+
var db storage.DB
615+
edmdb := builder.ExecutionDatastoreManager.DB()
616+
617+
if bdb, ok := edmdb.(*badger.DB); ok {
618+
db = badgerimpl.ToDB(bdb)
619+
} else if pdb, ok := edmdb.(*pebble.DB); ok {
620+
db = pebbleimpl.ToDB(pdb)
612621
} else {
613-
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
622+
return fmt.Errorf("unsupported execution data DB type: %T", edmdb)
614623
}
624+
625+
processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
615626
return nil
616627
}).
617628
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
618629
// Note: progress is stored in the datastore's DB since that is where the jobqueue
619630
// writes execution data to.
631+
var db storage.DB
620632
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
621-
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
633+
db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB))
622634
} else {
623-
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
635+
db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB))
624636
}
637+
processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
625638
return nil
626639
}).
627640
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
@@ -848,15 +861,15 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
848861
}
849862

850863
if builder.executionDataIndexingEnabled {
851-
var indexedBlockHeight storage.ConsumerProgress
864+
var indexedBlockHeight storage.ConsumerProgressInitializer
852865

853866
builder.
854867
AdminCommand("execute-script", func(config *cmd.NodeConfig) commands.AdminCommand {
855868
return stateSyncCommands.NewExecuteScriptCommand(builder.ScriptExecutor)
856869
}).
857870
Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
858871
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
859-
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
872+
indexedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressExecutionDataIndexerBlockHeight)
860873
return nil
861874
}).
862875
Module("transaction results storage", func(node *cmd.NodeConfig) error {
@@ -1458,6 +1471,13 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
14581471
"websocket-inactivity-timeout",
14591472
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
14601473
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
1474+
1475+
flags.BoolVar(
1476+
&builder.rpcConf.EnableWebSocketsStreamAPI,
1477+
"experimental-enable-websockets-stream-api",
1478+
defaultConfig.rpcConf.EnableWebSocketsStreamAPI,
1479+
"[experimental] enables WebSockets Stream API that operates under /ws endpoint. this flag may change in a future release.",
1480+
)
14611481
}).ValidateFlags(func() error {
14621482
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
14631483
return errors.New("public-network-address must be set if supports-observer is true")
@@ -1633,8 +1653,8 @@ func (builder *FlowAccessNodeBuilder) enqueueRelayNetwork() {
16331653
}
16341654

16351655
func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
1636-
var processedFinalizedBlockHeight storage.ConsumerProgress
1637-
var processedTxErrorMessagesBlockHeight storage.ConsumerProgress
1656+
var processedFinalizedBlockHeight storage.ConsumerProgressInitializer
1657+
var processedTxErrorMessagesBlockHeight storage.ConsumerProgressInitializer
16381658

16391659
if builder.executionDataSyncEnabled {
16401660
builder.BuildExecutionSyncComponents()
@@ -1838,17 +1858,18 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
18381858
return nil
18391859
}).
18401860
Module("processed finalized block height consumer progress", func(node *cmd.NodeConfig) error {
1841-
processedFinalizedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressIngestionEngineBlockHeight)
1861+
processedFinalizedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressIngestionEngineBlockHeight)
18421862
return nil
18431863
}).
18441864
Module("processed last full block height monotonic consumer progress", func(node *cmd.NodeConfig) error {
18451865
rootBlockHeight := node.State.Params().FinalizedRoot().Height
18461866

1847-
var err error
1848-
lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
1849-
bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressLastFullBlockHeight),
1850-
rootBlockHeight,
1851-
)
1867+
progress, err := store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressLastFullBlockHeight).Initialize(rootBlockHeight)
1868+
if err != nil {
1869+
return err
1870+
}
1871+
1872+
lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(progress)
18521873
if err != nil {
18531874
return fmt.Errorf("failed to initialize monotonic consumer progress: %w", err)
18541875
}
@@ -2149,8 +2170,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
21492170

21502171
if builder.storeTxResultErrorMessages {
21512172
builder.Module("processed error messages block height consumer progress", func(node *cmd.NodeConfig) error {
2152-
processedTxErrorMessagesBlockHeight = bstorage.NewConsumerProgress(
2153-
builder.DB,
2173+
processedTxErrorMessagesBlockHeight = store.NewConsumerProgress(
2174+
badgerimpl.ToDB(builder.DB),
21542175
module.ConsumeProgressEngineTxErrorMessagesBlockHeight,
21552176
)
21562177
return nil
@@ -2349,6 +2370,7 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
23492370
&p2pbuilderconfig.UnicastConfig{
23502371
Unicast: builder.FlowConfig.NetworkConfig.Unicast,
23512372
}).
2373+
SetProtocolPeerCacheList(protocols.FlowProtocolID(builder.SporkID)).
23522374
SetBasicResolver(builder.Resolver).
23532375
SetSubscriptionFilter(networkingsubscription.NewRoleBasedFilter(flow.RoleAccess, builder.IdentityProvider)).
23542376
SetConnectionManager(connManager).

cmd/bootstrap/transit/cmd/pull.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func pull(cmd *cobra.Command, args []string) {
9797
fullOutpath := filepath.Join(flagBootDir, "public-root-information", filepath.Base(file.Name))
9898
fmd5 := utils.CalcMd5(fullOutpath)
9999
// only skip files that have an MD5 hash
100-
if file.MD5 != nil && bytes.Equal(fmd5, file.MD5) {
100+
if len(file.MD5) > 0 && bytes.Equal(fmd5, file.MD5) {
101101
log.Info().Str("source", file.Name).Str("dest", fullOutpath).Msgf("skipping existing file from transit servers")
102102
return
103103
}

cmd/consensus/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,9 @@ func main() {
463463
seals,
464464
getSealingConfigs,
465465
)
466+
if err != nil {
467+
return nil, fmt.Errorf("could not initialize sealing engine: %w", err)
468+
}
466469

467470
// subscribe for finalization events from hotstuff
468471
followerDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock)

cmd/execution_builder.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,10 @@ func (exeNode *ExecutionNode) LoadProviderEngine(
528528
node.FvmOptions...,
529529
)
530530

531+
opts = append(opts, computation.DefaultFVMOptions(
532+
node.RootChainID,
533+
exeNode.exeConf.computationConfig.CadenceTracing,
534+
exeNode.exeConf.computationConfig.ExtensiveTracing)...)
531535
vmCtx := fvm.NewContext(opts...)
532536

533537
var collector module.ExecutionMetrics

cmd/node_builder.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"time"
66

7+
"github.com/cockroachdb/pebble"
78
"github.com/dgraph-io/badger/v2"
89
madns "github.com/multiformats/go-multiaddr-dns"
910
"github.com/onflow/crypto"
@@ -151,6 +152,9 @@ type BaseConfig struct {
151152
DynamicStartupEpoch string
152153
DynamicStartupSleepInterval time.Duration
153154
datadir string
155+
pebbleDir string
156+
badgerDB *badger.DB
157+
pebbleDB *pebble.DB
154158
secretsdir string
155159
secretsDBEnabled bool
156160
InsecureSecretsDB bool
@@ -164,7 +168,6 @@ type BaseConfig struct {
164168
MetricsEnabled bool
165169
guaranteesCacheSize uint
166170
receiptsCacheSize uint
167-
db *badger.DB
168171
HeroCacheMetricsEnable bool
169172
SyncCoreConfig chainsync.Config
170173
CodecFactory func() network.Codec
@@ -198,6 +201,7 @@ type NodeConfig struct {
198201
MetricsRegisterer prometheus.Registerer
199202
Metrics Metrics
200203
DB *badger.DB
204+
PebbleDB *pebble.DB
201205
SecretsDB *badger.DB
202206
Storage Storage
203207
ProtocolEvents *events.Distributor
@@ -253,6 +257,7 @@ type StateExcerptAtBoot struct {
253257

254258
func DefaultBaseConfig() *BaseConfig {
255259
datadir := "/data/protocol"
260+
pebbleDir := "/data/protocol-pebble"
256261

257262
// NOTE: if the codec used in the network component is ever changed any code relying on
258263
// the message format specific to the codec must be updated. i.e: the AuthorizedSenderValidator.
@@ -269,6 +274,9 @@ func DefaultBaseConfig() *BaseConfig {
269274
ObserverMode: false,
270275
BootstrapDir: "bootstrap",
271276
datadir: datadir,
277+
pebbleDir: pebbleDir,
278+
badgerDB: nil,
279+
pebbleDB: nil,
272280
secretsdir: NotSet,
273281
secretsDBEnabled: true,
274282
level: "info",

cmd/observer/node_builder/observer_builder.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,10 @@ import (
108108
"github.com/onflow/flow-go/state/protocol/events/gadgets"
109109
"github.com/onflow/flow-go/storage"
110110
bstorage "github.com/onflow/flow-go/storage/badger"
111+
"github.com/onflow/flow-go/storage/operation/badgerimpl"
112+
"github.com/onflow/flow-go/storage/operation/pebbleimpl"
111113
pstorage "github.com/onflow/flow-go/storage/pebble"
114+
"github.com/onflow/flow-go/storage/store"
112115
"github.com/onflow/flow-go/utils/grpcutils"
113116
"github.com/onflow/flow-go/utils/io"
114117
)
@@ -200,9 +203,10 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
200203
IdleTimeout: rest.DefaultIdleTimeout,
201204
MaxRequestSize: commonrest.DefaultMaxRequestSize,
202205
},
203-
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
204-
CompressorName: grpcutils.NoCompressor,
205-
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
206+
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
207+
CompressorName: grpcutils.NoCompressor,
208+
WebSocketConfig: websockets.NewDefaultWebsocketConfig(),
209+
EnableWebSocketsStreamAPI: false,
206210
},
207211
stateStreamConf: statestreambackend.Config{
208212
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
@@ -819,6 +823,13 @@ func (builder *ObserverServiceBuilder) extraFlags() {
819823
"websocket-inactivity-timeout",
820824
defaultConfig.rpcConf.WebSocketConfig.InactivityTimeout,
821825
"specifies the duration a WebSocket connection can remain open without any active subscriptions before being automatically closed")
826+
827+
flags.BoolVar(
828+
&builder.rpcConf.EnableWebSocketsStreamAPI,
829+
"experimental-enable-websockets-stream-api",
830+
defaultConfig.rpcConf.EnableWebSocketsStreamAPI,
831+
"[experimental] enables WebSockets Stream API that operates under /ws endpoint. this flag may change in a future release.",
832+
)
822833
}).ValidateFlags(func() error {
823834
if builder.executionDataSyncEnabled {
824835
if builder.executionDataConfig.FetchTimeout <= 0 {
@@ -1057,8 +1068,8 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) {
10571068
func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder {
10581069
var ds datastore.Batching
10591070
var bs network.BlobService
1060-
var processedBlockHeight storage.ConsumerProgress
1061-
var processedNotifications storage.ConsumerProgress
1071+
var processedBlockHeight storage.ConsumerProgressInitializer
1072+
var processedNotifications storage.ConsumerProgressInitializer
10621073
var publicBsDependable *module.ProxiedReadyDoneAware
10631074
var execDataDistributor *edrequester.ExecutionDataDistributor
10641075
var execDataCacheBackend *herocache.BlockExecutionData
@@ -1112,21 +1123,26 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
11121123
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
11131124
// Note: progress is stored in the datastore's DB since that is where the jobqueue
11141125
// writes execution data to.
1126+
var db storage.DB
11151127
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
1116-
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
1128+
db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB))
11171129
} else {
1118-
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
1130+
db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB))
11191131
}
1132+
1133+
processedBlockHeight = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterBlockHeight)
11201134
return nil
11211135
}).
11221136
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
11231137
// Note: progress is stored in the datastore's DB since that is where the jobqueue
11241138
// writes execution data to.
1139+
var db storage.DB
11251140
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
1126-
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
1141+
db = badgerimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*badger.DB))
11271142
} else {
1128-
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
1143+
db = pebbleimpl.ToDB(builder.ExecutionDatastoreManager.DB().(*pebble.DB))
11291144
}
1145+
processedNotifications = store.NewConsumerProgress(db, module.ConsumeProgressExecutionDataRequesterNotification)
11301146
return nil
11311147
}).
11321148
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
@@ -1311,11 +1327,11 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
13111327
return builder.ExecutionDataPruner, nil
13121328
})
13131329
if builder.executionDataIndexingEnabled {
1314-
var indexedBlockHeight storage.ConsumerProgress
1330+
var indexedBlockHeight storage.ConsumerProgressInitializer
13151331

13161332
builder.Module("indexed block height consumer progress", func(node *cmd.NodeConfig) error {
13171333
// Note: progress is stored in the MAIN db since that is where indexed execution data is stored.
1318-
indexedBlockHeight = bstorage.NewConsumerProgress(builder.DB, module.ConsumeProgressExecutionDataIndexerBlockHeight)
1334+
indexedBlockHeight = store.NewConsumerProgress(badgerimpl.ToDB(builder.DB), module.ConsumeProgressExecutionDataIndexerBlockHeight)
13191335
return nil
13201336
}).Module("transaction results storage", func(node *cmd.NodeConfig) error {
13211337
builder.Storage.LightTransactionResults = bstorage.NewLightTransactionResults(node.Metrics.Cache, node.DB, bstorage.DefaultCacheSize)

0 commit comments

Comments
 (0)