Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions storage/store/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (e *Events) BatchStore(lctx lockctx.Proof, blockID flow.Identifier, blockEv
eventIndex++
}
}
if err := validateEventOrder(combinedEvents); err != nil {
return fmt.Errorf("invalid event ordering for block %s: %w", blockID, err)
}

storage.OnCommitSucceed(batch, func() {
e.cache.Insert(blockID, combinedEvents)
Expand Down Expand Up @@ -253,6 +256,65 @@ func (e *ServiceEvents) BatchRemoveByBlockID(blockID flow.Identifier, rw storage
return e.cache.RemoveTx(rw, blockID)
}

// validateEventOrder verifies that a flattened slice of block events is correctly
// ordered and internally consistent before being written to storage.
//
// The following invariants are enforced:
// - TransactionIndex must be monotonically non-decreasing.
// - TransactionIndex is permitted to skip values; transactions that emit no
// events are simply absent from the slice.
// - Within a single transaction, EventIndex must form a contiguous sequence
// starting at 0 (i.e. 0, 1, 2, …).
// - The first event of every new transaction must have EventIndex == 0.
//
// The function runs in O(n) time and performs no allocations.
//
// No error returns are expected during normal operation.
// Returns an error if any invariant is violated, including the offending indices.
func validateEventOrder(events []flow.Event) error {
if len(events) == 0 {
return nil
}

if events[0].EventIndex != 0 {
return fmt.Errorf("first event must have EventIndex 0, got EventIndex %d at TransactionIndex %d",
events[0].EventIndex, events[0].TransactionIndex)
}

prevTxIndex := events[0].TransactionIndex
nextEventIndex := uint32(1)

for i := 1; i < len(events); i++ {
e := events[i]

switch {
case e.TransactionIndex == prevTxIndex:
// Same transaction: EventIndex must increment by exactly 1.
if e.EventIndex != nextEventIndex {
return fmt.Errorf("event %d: TransactionIndex %d expected EventIndex %d, got %d",
i, e.TransactionIndex, nextEventIndex, e.EventIndex)
}
nextEventIndex++

case e.TransactionIndex > prevTxIndex:
// New transaction (skips are allowed): must start at EventIndex 0.
if e.EventIndex != 0 {
return fmt.Errorf("event %d: first event of TransactionIndex %d must have EventIndex 0, got %d",
i, e.TransactionIndex, e.EventIndex)
}
prevTxIndex = e.TransactionIndex
nextEventIndex = 1

default:
// TransactionIndex went backwards — never valid.
return fmt.Errorf("event %d: TransactionIndex must be non-decreasing, got %d after %d",
i, e.TransactionIndex, prevTxIndex)
}
}

return nil
}

// sortEventsExecutionOrder sorts events by [txIndex, eventIndex] (execution order).
func sortEventsExecutionOrder(events []flow.Event) {
sort.Slice(events, func(i, j int) bool {
Expand Down
132 changes: 128 additions & 4 deletions storage/store/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ func TestEventStoreRetrieve(t *testing.T) {
evt1_2 := unittest.EventFixture(
unittest.Event.WithEventType(flow.EventAccountCreated),
unittest.Event.WithTransactionIndex(1),
unittest.Event.WithEventIndex(1),
unittest.Event.WithEventIndex(0), // first event of tx1 must be 0
unittest.Event.WithTransactionID(tx2ID),
)

evt2_1 := unittest.EventFixture(
unittest.Event.WithEventType(flow.EventAccountUpdated),
unittest.Event.WithTransactionIndex(2),
unittest.Event.WithEventIndex(2),
unittest.Event.WithEventIndex(0), // first event of tx2 must be 0
unittest.Event.WithTransactionID(tx2ID),
)

Expand Down Expand Up @@ -322,14 +322,14 @@ func TestEventStoreAndRemove(t *testing.T) {
evt1_2 := unittest.EventFixture(
unittest.Event.WithEventType(flow.EventAccountCreated),
unittest.Event.WithTransactionIndex(1),
unittest.Event.WithEventIndex(1),
unittest.Event.WithEventIndex(0), // fixed
unittest.Event.WithTransactionID(tx2ID),
)

evt2_1 := unittest.EventFixture(
unittest.Event.WithEventType(flow.EventAccountUpdated),
unittest.Event.WithTransactionIndex(2),
unittest.Event.WithEventIndex(2),
unittest.Event.WithEventIndex(0), // fixed
unittest.Event.WithTransactionID(tx2ID),
)

Expand Down Expand Up @@ -363,3 +363,127 @@ func TestEventStoreAndRemove(t *testing.T) {
require.Len(t, event, 0)
})
}

// TestValidateEventOrder verifies that validateEventOrder correctly accepts valid
// event sequences and rejects all invalid orderings.
func TestValidateEventOrder(t *testing.T) {
lockManager := storage.NewTestingLockManager()

// helper to build a minimal event with just the index fields set
makeEvent := func(txIndex, eventIndex uint32) flow.Event {
return flow.Event{
TransactionIndex: txIndex,
EventIndex: eventIndex,
}
}

t.Run("empty slice is valid", func(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
s := store.NewEvents(metrics.NewNoopCollector(), db)
blockID := unittest.IdentifierFixture()
err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error {
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return s.BatchStore(lctx, blockID, []flow.EventsList{}, rw)
})
})
require.NoError(t, err)
})
})

t.Run("valid single transaction", func(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
s := store.NewEvents(metrics.NewNoopCollector(), db)
blockID := unittest.IdentifierFixture()
events := []flow.EventsList{{makeEvent(0, 0), makeEvent(0, 1), makeEvent(0, 2)}}
err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error {
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return s.BatchStore(lctx, blockID, events, rw)
})
})
require.NoError(t, err)
})
})

t.Run("valid with skipped transaction indices", func(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
s := store.NewEvents(metrics.NewNoopCollector(), db)
blockID := unittest.IdentifierFixture()
// tx 0 and tx 5 have events; tx 1,2,3,4 emitted nothing — valid skip
events := []flow.EventsList{
{makeEvent(0, 0), makeEvent(0, 1)},
{makeEvent(5, 0), makeEvent(5, 1)},
}
err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error {
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return s.BatchStore(lctx, blockID, events, rw)
})
})
require.NoError(t, err)
})
})

t.Run("invalid: first event has EventIndex != 0", func(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
s := store.NewEvents(metrics.NewNoopCollector(), db)
blockID := unittest.IdentifierFixture()
events := []flow.EventsList{{makeEvent(0, 1)}} // starts at 1, not 0
err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error {
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return s.BatchStore(lctx, blockID, events, rw)
})
})
require.Error(t, err)
})
})

t.Run("invalid: non-contiguous EventIndex within same transaction", func(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
s := store.NewEvents(metrics.NewNoopCollector(), db)
blockID := unittest.IdentifierFixture()
// jumps from EventIndex 0 to 2, skipping 1
events := []flow.EventsList{{makeEvent(0, 0), makeEvent(0, 2)}}
err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error {
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return s.BatchStore(lctx, blockID, events, rw)
})
})
require.Error(t, err)
})
})

t.Run("invalid: new transaction starts with EventIndex != 0", func(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
s := store.NewEvents(metrics.NewNoopCollector(), db)
blockID := unittest.IdentifierFixture()
// tx 1 starts at EventIndex 1 instead of 0
events := []flow.EventsList{
{makeEvent(0, 0)},
{makeEvent(1, 1)},
}
err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error {
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return s.BatchStore(lctx, blockID, events, rw)
})
})
require.Error(t, err)
})
})

t.Run("invalid: decreasing TransactionIndex", func(t *testing.T) {
dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) {
s := store.NewEvents(metrics.NewNoopCollector(), db)
blockID := unittest.IdentifierFixture()
// tx 3 appears after tx 5 — goes backwards
events := []flow.EventsList{
{makeEvent(5, 0)},
{makeEvent(3, 0)},
}
err := unittest.WithLock(t, lockManager, storage.LockInsertEvent, func(lctx lockctx.Context) error {
return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return s.BatchStore(lctx, blockID, events, rw)
})
})
require.Error(t, err)
})
})
}