Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
opts = append(opts, blob.WithReprovideInterval(-1))
}

opts = append(opts, blob.WithUseBloomCache(builder.BitswapBloomCacheEnabled))

var err error
bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, builder.ExecutionDatastoreManager.Datastore(), opts...)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ func (exeNode *ExecutionNode) LoadBlobService(
opts = append(opts, blob.WithReprovideInterval(-1))
}

opts = append(opts, blob.WithUseBloomCache(node.BitswapBloomCacheEnabled))

if exeNode.exeConf.blobstoreRateLimit > 0 && exeNode.exeConf.blobstoreBurstLimit > 0 {
opts = append(opts, blob.WithRateLimit(float64(exeNode.exeConf.blobstoreRateLimit), exeNode.exeConf.blobstoreBurstLimit))
}
Expand Down
20 changes: 14 additions & 6 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ type BaseConfig struct {
// This is only meaningful to Access and Execution nodes.
BitswapReprovideEnabled bool

// BitswapBloomCacheEnabled configures whether the Bitswap bloom cache is enabled.
// When disabled, uses a plain blockstore instead of cached blockstore, avoiding
// the CPU cost of building the bloom filter on startup. Pebble's built-in bloom
// filters (persisted in SSTables) are still used for efficient lookups.
// This is only meaningful to Access and Execution nodes.
BitswapBloomCacheEnabled bool

TransactionFeesDisabled bool
}

Expand Down Expand Up @@ -297,12 +304,13 @@ func DefaultBaseConfig() *BaseConfig {
Duration: 10 * time.Second,
},

HeroCacheMetricsEnable: false,
SyncCoreConfig: chainsync.DefaultConfig(),
CodecFactory: codecFactory,
ComplianceConfig: compliance.DefaultConfig(),
DhtSystemEnabled: true,
BitswapReprovideEnabled: true,
HeroCacheMetricsEnable: false,
SyncCoreConfig: chainsync.DefaultConfig(),
CodecFactory: codecFactory,
ComplianceConfig: compliance.DefaultConfig(),
DhtSystemEnabled: true,
BitswapReprovideEnabled: true,
BitswapBloomCacheEnabled: true, // default: use cached blockstore TODO leo: change default to false
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test cases also passed when I changed the default to false.

}
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,8 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
),
}

opts = append(opts, blob.WithUseBloomCache(builder.BitswapBloomCacheEnabled))

var err error
bs, err = node.EngineRegistry.RegisterBlobService(channels.PublicExecutionDataService, ds, opts...)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
"bitswap-reprovide-enabled",
defaultConfig.BitswapReprovideEnabled,
"[experimental] whether to enable bitswap reproviding. This is an experimental feature. Use with caution.")
fnb.flags.BoolVar(&fnb.BaseConfig.BitswapBloomCacheEnabled,
"bitswap-bloom-cache-enabled",
defaultConfig.BitswapBloomCacheEnabled,
"[experimental] whether to enable bitswap bloom cache. When disabled, uses a plain blockstore instead of cached blockstore, avoiding the CPU cost of building the bloom filter on startup. Pebble's built-in bloom filters (persisted in SSTables) are still used. This is an experimental feature. Use with caution.")

// dynamic node startup flags
fnb.flags.StringVar(&fnb.BaseConfig.DynamicStartupANPubkey,
Expand Down
12 changes: 12 additions & 0 deletions integration/tests/access/cohort3/execution_state_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,25 @@ func (s *ExecutionStateSyncSuite) executionDataForHeight(ctx context.Context, no
BlockId: header.ID[:],
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
})

if err != nil {
s.log.Info().
Uint64("height", height).
Hex("block_id", header.ID[:]).
Err(err).
Msg("failed to get execution data")
return err
}

blockED, err = convert.MessageToBlockExecutionData(ed.GetBlockExecutionData(), flow.Localnet.Chain())
s.Require().NoError(err, "could not convert execution data")

s.log.Info().
Uint64("height", height).
Hex("block_id", header.ID[:]).
Int("chunks", len(blockED.ChunkExecutionDatas)).
Msg("successfully retrieved execution data")

return err
}), "could not get execution data for block %d", height)

Expand Down
37 changes: 29 additions & 8 deletions network/p2p/blob/blob_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var _ component.Component = (*blobService)(nil)
type BlobServiceConfig struct {
ReprovideInterval time.Duration // the interval at which the DHT provider entries are refreshed
BitswapOptions []bitswap.Option // options to pass to the Bitswap service
UseBloomCache bool // if true, use the bloom cache (cached blockstore), otherwise use plain blockstore
}
Comment on lines 55 to 59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cd network/p2p/blob && wc -l blob_service.go

Repository: onflow/flow-go

Length of output: 77


🏁 Script executed:

cat -n network/p2p/blob/blob_service.go | head -160

Repository: onflow/flow-go

Length of output: 6605


🏁 Script executed:

# Verify the exact line where blockStore is used in CachedBlockstore call
sed -n '141,151p' network/p2p/blob/blob_service.go

Repository: onflow/flow-go

Length of output: 354


🏁 Script executed:

# Double-check WithRateLimit implementation to confirm it modifies bs.blockStore
sed -n '95,100p' network/p2p/blob/blob_service.go

Repository: onflow/flow-go

Length of output: 303


🏁 Script executed:

# Verify WithParentBlobService also modifies bs.blockStore
sed -n '75,80p' network/p2p/blob/blob_service.go

Repository: onflow/flow-go

Length of output: 334


Fix CachedBlockstore to wrap the final blockstore after applying options

When UseBloomCache is true, CachedBlockstore is called with the local blockStore variable created on line 125, instead of bs.blockStore. Since options like WithRateLimit and WithParentBlobService mutate only bs.blockStore (not the local variable), their effects are silently discarded. For example:

  • If WithRateLimit is applied, it wraps bs.blockStore, but CachedBlockstore then wraps the unwrapped local blockStore, and the rate limiter is lost.
  • If WithParentBlobService is applied, it replaces bs.blockStore with the parent's store, but CachedBlockstore wraps the new local blockStore instead, breaking the parent relationship.

Change line 144 to use bs.blockStore instead of blockStore:

Proposed fix
 	if bs.config.UseBloomCache {
 		cachedBlockStore, err := blockstore.CachedBlockstore(
 			context.Background(),
-			blockStore,
+			bs.blockStore,
 			blockstore.DefaultCacheOpts(),
 		)
 		if err != nil {
 			return nil, fmt.Errorf("failed to create cached blockstore: %w", err)
 		}
 		bs.blockStore = cachedBlockStore
 	}

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In network/p2p/blob/blob_service.go around lines ~125-145, the CachedBlockstore
call wraps the local variable blockStore instead of bs.blockStore, causing any
option-wrapping (e.g. WithRateLimit, WithParentBlobService) applied to
bs.blockStore to be ignored; change the CachedBlockstore invocation to use
bs.blockStore (the final, possibly-wrapped store) so that all option mutations
are preserved and the parent/RateLimit wrappers remain effective.


// WithReprovideInterval sets the interval at which DHT provider entries are refreshed
Expand Down Expand Up @@ -98,6 +99,17 @@ func WithRateLimit(r float64, b int) network.BlobServiceOption {
}
}

// WithUseBloomCache enables or disables the bloom cache.
// When enabled (true), uses a cached blockstore with bloom filter (default behavior).
// When disabled (false), uses a plain blockstore instead, avoiding the CPU cost of building
// the bloom filter on startup by scanning all keys. Pebble's built-in bloom filters
// (persisted in SSTables) are still used for efficient lookups.
func WithUseBloomCache(use bool) network.BlobServiceOption {
return func(bs network.BlobService) {
bs.(*blobService).config.UseBloomCache = use
}
}

// NewBlobService creates a new BlobService.
func NewBlobService(
host host.Host,
Expand All @@ -109,26 +121,35 @@ func NewBlobService(
opts ...network.BlobServiceOption,
) (*blobService, error) {
bsNetwork := bsnet.NewFromIpfsHost(host, r, bsnet.Prefix(protocol.ID(prefix)))
blockStore, err := blockstore.CachedBlockstore(
context.Background(),
blockstore.NewBlockstore(ds),
blockstore.DefaultCacheOpts(),
)
if err != nil {
return nil, fmt.Errorf("failed to create cached blockstore: %w", err)
}

blockStore := blockstore.NewBlockstore(ds)

bs := &blobService{
prefix: prefix,
config: &BlobServiceConfig{
ReprovideInterval: DefaultReprovideInterval,
UseBloomCache: true, // default: use bloom cache
},
blockStore: blockStore,
}

// Apply options before creating blockstore, as UseBloomCache affects blockstore creation
for _, opt := range opts {
opt(bs)
}

if bs.config.UseBloomCache {
cachedBlockStore, err := blockstore.CachedBlockstore(
context.Background(),
blockStore,
blockstore.DefaultCacheOpts(),
)
if err != nil {
return nil, fmt.Errorf("failed to create cached blockstore: %w", err)
}
bs.blockStore = cachedBlockStore
}

cm := component.NewComponentManagerBuilder().
AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
btswp := bitswap.New(ctx, bsNetwork, bs.blockStore, bs.config.BitswapOptions...)
Expand Down
Loading