Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 102 additions & 47 deletions module/builder/collection/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import (
"github.com/onflow/flow-go/utils/logging"
)

var (
// ErrNotEnoughHistory represents a state in which the node cannot propose a regular collection
// because it does not have enough blocks of history (to deduplicate transactions that may have
// been included in previous finalized collections).
ErrNotEnoughHistory = errors.New("not enough history")
)

// Builder is the builder for collection block payloads. Upon providing a
// payload hash, it also memorizes the payload contents.
//
Expand Down Expand Up @@ -118,9 +125,9 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.HeaderBody
defer parentSpan.End()

// STEP 1: build a lookup for excluding duplicated transactions.
// This is briefly how it works:
// Overview:
//
// Let E be the global transaction expiry.
// Let E denote global transaction expiry.
// When incorporating a new collection C, with reference height R, we enforce
// that it contains only transactions with reference heights in [R,R+E).
// * if we are building C:
Expand All @@ -130,13 +137,13 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.HeaderBody
// * if we are validating C:
// * honest validators only consider C valid if all its transactions have reference heights in [R,R+E)
//
// Therefore, to check for duplicates, we only need a lookup for transactions in collection
// Therefore, to check for duplicates, we only need a lookup for transactions in collections
// with expiry windows that overlap with our collection under construction.
//
// A collection with overlapping expiry window can be finalized or un-finalized.
// * to find all non-expired and finalized collections, we make use of an index
// (main_chain_finalized_height -> cluster_block_ids with respective reference height), to search for a range of main chain heights
// which could be only referenced by collections with overlapping expiry windows.
// (main_chain_finalized_height -> cluster_block_ids with respective reference height), to search for a range
// of main chain heights, which could be only referenced by collections with overlapping expiry windows.
// * to find all overlapping and un-finalized collections, we can't use the above index, because it's
// only for finalized collections. Instead, we simply traverse along the chain up to the last
// finalized block. This could possibly include some collections with expiry windows that DON'T
Expand Down Expand Up @@ -184,23 +191,40 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.HeaderBody
return nil, err
}

// STEP 1b: create a lookup of all transactions previously included in
// the finalized collections. Any transactions already included in finalized
// collections can be removed from the mempool.
// STEP 1b: create a lookup of all transactions previously included in the finalized collections.
// Any transactions already included in finalized collections can be removed from the mempool.
//
// CAUTION: In this block (specifically method `populateFinalizedAncestryLookup`), we need to pay
// ensure that we have sufficient history such that we can scan all non-expired collections for
// duplicates. Otherwise, we might be accidentally including a non-expired transaction again that
// was already included in a collection further back in history - this would be a slashable offense.
sufficientHistory := true
span, _ = b.tracer.StartSpanFromContext(ctx, trace.COLBuildOnFinalizedLookup)
err = b.populateFinalizedAncestryLookup(lctx, buildCtx)
span.End()
if err != nil {
return nil, fmt.Errorf("could not populate finalized ancestry lookup: %w", err)
if !errors.Is(err, ErrNotEnoughHistory) {
return nil, fmt.Errorf("could not populate finalized ancestry lookup: %w", err)
}
sufficientHistory = false
}

// STEP 2: build a payload of valid transactions, while at the same
// time figuring out the correct reference block ID for the collection.
span, _ = b.tracer.StartSpanFromContext(ctx, trace.COLBuildOnCreatePayload)
payload, priorityTransactionsCount, err := b.buildPayload(buildCtx)
span.End()
if err != nil {
return nil, fmt.Errorf("could not build payload: %w", err)
// STEP 2: construct payload
var payload *cluster.Payload
var priorityTransactionsCount uint
if sufficientHistory {
// Step 2 (common case): build a payload of valid transactions, while at the same
// time figuring out the correct reference block ID for the collection.
span, _ = b.tracer.StartSpanFromContext(ctx, trace.COLBuildOnCreatePayload)
payload, priorityTransactionsCount, err = b.buildPayload(buildCtx)
span.End()
if err != nil {
return nil, fmt.Errorf("could not build payload: %w", err)
}
} else {
// Step 2 (edge case of very recently bootstrapped node with truncated history):
// we can't guarantee that no transactions are duplicated because we have too little history, hence build an empty payload
payload = cluster.NewEmptyPayload(buildCtx.highestPossibleReferenceBlockID())
}

// STEP 3: we have a set of transactions that are valid to include on this fork.
Expand Down Expand Up @@ -372,46 +396,57 @@ func (b *Builder) populateUnfinalizedAncestryLookup(ctx *blockBuildContext) erro
return err
}

// populateFinalizedAncestryLookup traverses the reference block height index to
// populate the transaction lookup (used for deduplication) and the rate limiter
// (used to limit transaction submission by payer).
// populateFinalizedAncestryLookup scans all finalized collections that could contain transactions still
// eligible for inclusion in our collection under construction. With the transactions from these collections,
// we populate the transaction lookup (used for deduplication) and the rate limiter (used to limit transaction
// submission by payer).
//
// Overview:
// Cluster blocks reference blocks on the main consensus chain. For the resulting collection to not
// be immediately expired, the reference block must be somewhat recent. On finalization, the collector's
// consensus `MutableState` updates the index:
//
// The traversal is structured so that we check every collection whose reference
// block height translates to a possible constituent transaction which could also
// appear in the collection we are building.
// main consensus height -> _finalized_ cluster block IDs (set) referencing that height
//
// First, we determine the height range of main consensus blocks that are recent enough, such that collections
// using them as a reference blocks can still contain non-expired transactions.
// Seconds, we look up all finalized collections that have reference blocks in that height range, using the
// index described above. Only those finalized collections can possibly contain transactions which are still
// eligible for inclusion in our collection under construction.
//
// Error returns:
// - [ErrNotEnoughHistory] if the node does not have enough history to definitively avoid duplicate transactions
func (b *Builder) populateFinalizedAncestryLookup(lctx lockctx.Proof, ctx *blockBuildContext) error {
minRefHeight := ctx.lowestPossibleReferenceBlockHeight()
maxRefHeight := ctx.highestPossibleReferenceBlockHeight()
lookup := ctx.lookup
limiter := ctx.limiter

// Let E be the global transaction expiry constant, measured in blocks. For each
// T ∈ `includedTransactions`, we have to decide whether the transaction
// already appeared in _any_ finalized cluster block.
// Notation:
// - consider a valid cluster block C and let c be its reference block height
// - consider a transaction T ∈ `includedTransactions` and let t denote its
// reference block height
//
// Boundary conditions:
// 1. C's reference block height is equal to the lowest reference block height of
// all its constituent transactions. Hence, for collection C to potentially contain T, it must satisfy c <= t.
// 2. For T to be eligible for inclusion in collection C, _none_ of the transactions within C are allowed
// to be expired w.r.t. C's reference block. Hence, for collection C to potentially contain T, it must satisfy t < c + E.
// First, we determine the height range of main consensus blocks that have to be scanned (for details
// see method doc). Then Lookup finalized cluster blocks, which reference consensus blocks in that
// height range. Only those finalized collections can possibly contain transactions still eligible
// for inclusion in the collection we are building.
//
// Therefore, for collection C to potentially contain transaction T, it must satisfy t - E < c <= t.
// In other words, we only need to inspect collections with reference block height c ∈ (t-E, t].
// Consequently, for a set of transactions, with `minRefHeight` (`maxRefHeight`) being the smallest (largest)
// reference block height, we only need to inspect collections with c ∈ (minRefHeight-E, maxRefHeight].

// the finalized cluster blocks which could possibly contain any conflicting transactions
// CAUTION: the following logic assumes that the collector node has synced all cluster blocks starting from the
// cluster root block of the current epoch. While we assume (i) that the node has all relevant cluster blocks,
// the node also needs to have sufficient history of the main chain (assumption (ii)) to index those cluster
// blocks by their reference heights. We check requirement (ii) below and emit an [ErrNotEnoughHistory] otherwise.
var clusterBlockIDs []flow.Identifier
start, end := findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight, ctx)
refHeightAvailable := b.protoState.Params().FinalizedRoot().Height
if start < refHeightAvailable {
return fmt.Errorf("cannot deduplicate transactions: need heights from %d (bootstrapped from %d): %w", start, refHeightAvailable, ErrNotEnoughHistory)
}
err := operation.LookupClusterBlocksByReferenceHeightRange(lctx, b.db.Reader(), start, end, &clusterBlockIDs)
if err != nil {
return fmt.Errorf("could not lookup finalized cluster blocks by reference height range [%d,%d]: %w", start, end, err)
}

// populate the [blockBuildContext.lookup] and [blockBuildContext.limiter]:
clusterBlockIDsSet := make(map[flow.Identifier]struct{})
for _, id := range clusterBlockIDs {
clusterBlockIDsSet[id] = struct{}{}
}
for _, blockID := range clusterBlockIDs {
header, err := b.clusterHeaders.ByBlockID(blockID)
if err != nil {
Expand Down Expand Up @@ -636,13 +671,33 @@ func (b *Builder) buildHeader(
}, nil
}

// findRefHeightSearchRangeForConflictingClusterBlocks computes the range of reference block heights of ancestor blocks
// findRefHeightSearchRangeForConflictingClusterBlocks computes the range of reference block heights
// which could possibly contain transactions duplicating those in our collection under construction, based on the range
// of reference heights of transactions in the collection under construction.
// Input range is the (inclusive) range of reference heights of transactions eligible for inclusion in the collection
// under construction. Output range is the (inclusive) range of reference heights which need to be searched in order to
// under construction. We output the (inclusive) range of reference heights which need to be searched in order to
// avoid transaction repeats.
//
// APPROACH (part 1, common case)
// Notation:
// - Consider a valid cluster block C and let c be its reference block height.
// - Let T be a transaction included in collection C (we write T ∈ C). Furthermore,
// let t denote T's reference block height
// - Let E denote the global transaction expiry constant, measured in blocks.
//
// Boundary conditions (protocol mandates):
// 1. C's reference block height is equal to the lowest reference block height of all its constituent.
// transactions. Hence, for collection C to potentially contain T, it must satisfy c ≤ t.
// 2. For T to be eligible for inclusion in collection C, _none_ of the transactions within C are allowed
// to be expired w.r.t. C's reference block. Hence, for collection C to potentially contain T, it must
// satisfy t < c + E.
//
// Therefore, for collection C to potentially contain transaction T, it must satisfy t - E < c ≤ t.
// In other words, we only need to inspect collections with reference block height c ∈ (t-E, t].
// Consequently, for a set of transactions, with `minRefHeight` (`maxRefHeight`) being the smallest (largest)
// reference block height, we only need to inspect collections with c ∈ (minRefHeight-E, maxRefHeight].
//
// APPROACH (part 2, epoch boundaries)
// Within a single epoch, we have argued that for a set of transactions, with `minRefHeight` (`maxRefHeight`) being
// the smallest (largest) reference block height, we only need to inspect collections with reference block heights
// c ∈ (minRefHeight-E, maxRefHeight]. Note that the lower bound is exclusive, while the upper bound is inclusive,
Expand All @@ -653,11 +708,11 @@ func (b *Builder) buildHeader(
//
// In order to take epoch boundaries into account, we note: A collector cluster is only responsible for transactions whose
// reference blocks are within the cluster's operating epoch. Thus, we can bound the lower end of the search range by the
// height of the first block in the epoch. Formally, we only need to inspect collections with reference block height
//
// c ∈ [max{minRefHeight-E+1, epochFirstHeight}, maxRefHeight]
// height of the first block in the epoch. Formally, we only need to inspect collections with reference block height:
// c ∈ [max{minRefHeight-E+1, epochFirstHeight}, maxRefHeight]
func findRefHeightSearchRangeForConflictingClusterBlocks(minRefHeight, maxRefHeight uint64, ctx *blockBuildContext) (start, end uint64) {
// in order to avoid underflow, we rewrite the lower-bound equation entirely without subtraction:
// In order to avoid underflow, we want to work entirely without subtraction.
// We rewrite the boolean condition for checking the lower epoch boundary:
// max{minRefHeight-E+1, epochFirstHeight} == epochFirstHeight
// ⇔ minRefHeight - E + 1 ≤ epochFirstHeight
// ⇔ minRefHeight - E < epochFirstHeight
Expand Down