Skip to content

Commit

Permalink
Merge branch 'feature/atree-inlining-cadence-v0.42' into bastian/port…
Browse files Browse the repository at this point in the history
…-5755-atree-inlining-cadence-v0.42
  • Loading branch information
fxamacker committed Apr 23, 2024
2 parents e5c6ec2 + c7e0f38 commit 1f414a9
Show file tree
Hide file tree
Showing 7 changed files with 553 additions and 55 deletions.
4 changes: 4 additions & 0 deletions cmd/util/cmd/execution-state-extract/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
flagOutputPayloadFileName string
flagOutputPayloadByAddresses string
flagFixSlabsWithBrokenReferences bool
flagFilterUnreferencedSlabs bool
)

var Cmd = &cobra.Command{
Expand Down Expand Up @@ -126,6 +127,9 @@ func init() {

Cmd.Flags().BoolVar(&flagFixSlabsWithBrokenReferences, "fix-testnet-slabs-with-broken-references", false,
"fix slabs with broken references in testnet")

Cmd.Flags().BoolVar(&flagFilterUnreferencedSlabs, "filter-unreferenced-slabs", false,
"filter unreferenced slabs")
}

func run(*cobra.Command, []string) {
Expand Down
15 changes: 12 additions & 3 deletions cmd/util/cmd/execution-state-extract/execution_state_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,13 @@ func createTrieFromPayloads(logger zerolog.Logger, payloads []*ledger.Payload) (

func newMigrations(
log zerolog.Logger,
dir string,
outputDir string,
nWorker int, // number of concurrent worker to migation payloads
runMigrations bool,
fixSlabsWithBrokenReferences bool,
) []ledger.Migration {
if runMigrations {

rwf := reporters.NewReportFileWriterFactory(dir, log)
rwf := reporters.NewReportFileWriterFactory(outputDir, log)

var accountBasedMigrations []migrators.AccountBasedMigration

Expand All @@ -383,6 +382,16 @@ func newMigrations(
)
}

if flagFilterUnreferencedSlabs {
accountBasedMigrations = append(
accountBasedMigrations,
migrators.NewFilterUnreferencedSlabsMigration(
outputDir,
rwf,
),
)
}

accountBasedMigrations = append(
accountBasedMigrations,
migrators.NewAtreeRegisterMigrator(
Expand Down
215 changes: 215 additions & 0 deletions cmd/util/ledger/migrations/filter_unreferenced_slabs_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
package migrations

import (
"context"
"errors"
"fmt"
"path"
"sync"
"time"

"github.com/onflow/atree"
"github.com/onflow/cadence/runtime"
"github.com/onflow/cadence/runtime/common"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/cmd/util/ledger/reporters"
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/model/flow"
)

func StorageIDFromRegisterID(registerID flow.RegisterID) atree.SlabID {
storageID := atree.NewSlabID(
atree.Address([]byte(registerID.Owner)),
atree.SlabIndex([]byte(registerID.Key[1:])),
)
return storageID
}

type FilterUnreferencedSlabsMigration struct {
log zerolog.Logger
rw reporters.ReportWriter
outputDir string
mutex sync.Mutex
filteredPayloads []*ledger.Payload
payloadsFile string
}

var _ AccountBasedMigration = &FilterUnreferencedSlabsMigration{}

const filterUnreferencedSlabsName = "filter-unreferenced-slabs"

func NewFilterUnreferencedSlabsMigration(
outputDir string,
rwf reporters.ReportWriterFactory,
) *FilterUnreferencedSlabsMigration {
return &FilterUnreferencedSlabsMigration{
outputDir: outputDir,
rw: rwf.ReportWriter(filterUnreferencedSlabsName),
filteredPayloads: make([]*ledger.Payload, 0, 50_000),
}
}

func (m *FilterUnreferencedSlabsMigration) InitMigration(
log zerolog.Logger,
_ []*ledger.Payload,
_ int,
) error {
m.log = log.
With().
Str("migration", filterUnreferencedSlabsName).
Logger()

return nil
}

func (m *FilterUnreferencedSlabsMigration) MigrateAccount(
_ context.Context,
address common.Address,
oldPayloads []*ledger.Payload,
) (
newPayloads []*ledger.Payload,
err error,
) {
migrationRuntime, err := NewAtreeRegisterMigratorRuntime(address, oldPayloads)
if err != nil {
return nil, fmt.Errorf("failed to create migrator runtime: %w", err)
}

storage := migrationRuntime.Storage

newPayloads = oldPayloads

err = checkStorageHealth(address, storage, oldPayloads)
if err == nil {
return
}

// The storage health check failed.
// This can happen if there are unreferenced root slabs.
// In this case, we filter out the unreferenced root slabs and all slabs they reference from the payloads.

var unreferencedRootSlabsErr runtime.UnreferencedRootSlabsError
if !errors.As(err, &unreferencedRootSlabsErr) {
return nil, fmt.Errorf("storage health check failed: %w", err)
}

m.log.Warn().
Err(err).
Str("account", address.Hex()).
Msg("filtering unreferenced root slabs")

// Create a set of unreferenced slabs: root slabs, and all slabs they reference.

unreferencedSlabIDs := map[atree.SlabID]struct{}{}
for _, rootSlabID := range unreferencedRootSlabsErr.UnreferencedRootSlabIDs {
unreferencedSlabIDs[rootSlabID] = struct{}{}

childReferences, _, err := storage.GetAllChildReferences(rootSlabID)
if err != nil {
return nil, fmt.Errorf(
"failed to get all child references for root slab %s: %w",
rootSlabID,
err,
)
}

for _, childSlabID := range childReferences {
unreferencedSlabIDs[childSlabID] = struct{}{}
}
}

// Filter out unreferenced slabs.

newCount := len(oldPayloads) - len(unreferencedSlabIDs)
newPayloads = make([]*ledger.Payload, 0, newCount)

filteredPayloads := make([]*ledger.Payload, 0, len(unreferencedSlabIDs))

for _, payload := range oldPayloads {
registerID, _, err := convert.PayloadToRegister(payload)
if err != nil {
return nil, fmt.Errorf("failed to convert payload to register: %w", err)
}

// Filter unreferenced slabs.
if registerID.IsSlabIndex() {
storageID := StorageIDFromRegisterID(registerID)
if _, ok := unreferencedSlabIDs[storageID]; ok {
filteredPayloads = append(filteredPayloads, payload)
continue
}
}

newPayloads = append(newPayloads, payload)
}

m.rw.Write(unreferencedSlabs{
Account: address,
PayloadCount: len(filteredPayloads),
})

m.mergeFilteredPayloads(filteredPayloads)

// Do NOT report the health check error here.
// The health check error is only reported if it is not due to unreferenced slabs.
// If it is due to unreferenced slabs, we filter them out and continue.

return newPayloads, nil
}

func (m *FilterUnreferencedSlabsMigration) mergeFilteredPayloads(payloads []*ledger.Payload) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.filteredPayloads = append(m.filteredPayloads, payloads...)
}

func (m *FilterUnreferencedSlabsMigration) Close() error {
// close the report writer so it flushes to file
m.rw.Close()

err := m.writeFilteredPayloads()
if err != nil {
return fmt.Errorf("failed to write filtered payloads to file: %w", err)
}

return nil
}

func (m *FilterUnreferencedSlabsMigration) writeFilteredPayloads() error {

m.payloadsFile = path.Join(
m.outputDir,
fmt.Sprintf("filtered_%d.payloads", int32(time.Now().Unix())),
)

writtenPayloadCount, err := util.CreatePayloadFile(
m.log,
m.payloadsFile,
m.filteredPayloads,
nil,
true,
)

if err != nil {
return fmt.Errorf("failed to write all filtered payloads to file: %w", err)
}

if writtenPayloadCount != len(m.filteredPayloads) {
return fmt.Errorf(
"failed to write all filtered payloads to file: expected %d, got %d",
len(m.filteredPayloads),
writtenPayloadCount,
)
}

return nil
}

type unreferencedSlabs struct {
Account common.Address `json:"account"`
PayloadCount int `json:"payload_count"`
}
Loading

0 comments on commit 1f414a9

Please sign in to comment.