Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions cmd/migrate-curio/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
return fmt.Errorf("deal: %s: failed to get raw size from SQL or LID", deal.DealUuid.String())
}

stx, err := mdb.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}

_, err = hdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
// Add deal to HarmonyDB
if !a {
Expand All @@ -343,7 +348,7 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
}

// Mark deal added to harmonyDB
_, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.DealUuid.String())
_, err = stx.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.DealUuid.String())
if err != nil {
return false, fmt.Errorf("deal: %s: failed to mark deal migrated: %w", deal.DealUuid.String(), err)
}
Expand All @@ -359,7 +364,7 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
}

// Mark deal added to pieceDeal in HarmonyDB
_, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.DealUuid.String())
_, err = stx.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.DealUuid.String())
if err != nil {
return false, fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.DealUuid.String(), err)
}
Expand Down Expand Up @@ -395,13 +400,26 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
} else {
llog.Infof("Skipping indexing as sector %d is not unsealed", deal.SectorID)
}

// Mark deal added to pipeline in HarmonyDB
_, err = stx.Exec(`UPDATE Deals SET Pipeline = TRUE WHERE ID = ?`, deal.DealUuid.String())
if err != nil {
return false, fmt.Errorf("deal: %s: failed to mark deal Pipeline migrated: %w", deal.DealUuid.String(), err)
}
}
return true, nil
}, harmonydb.OptionRetry())
if err != nil {
return err
serr := stx.Rollback()
if serr != nil {
return fmt.Errorf("failed to commit Haromy changes: %w and failed to rollback transaction: %w", err, serr)
}
return fmt.Errorf("failed to commit Haromy changes: %w", err)
}
err = stx.Commit()
if err != nil {
return fmt.Errorf("failed to commit Haromy changes: %w", err)
}

}

return nil
Expand Down Expand Up @@ -592,6 +610,11 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
return fmt.Errorf("deal: %s: inbound file size is 0", deal.ID.String())
}

stx, err := mdb.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}

_, err = hdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
if !a {
// Add DDO deal to harmonyDB
Expand All @@ -607,7 +630,7 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
}

// Mark deal added to harmonyDB
_, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.ID.String())
_, err = stx.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.ID.String())
if err != nil {
return false, fmt.Errorf("deal: %s: failed to mark DDO deal migrated: %w", deal.ID.String(), err)
}
Expand All @@ -622,13 +645,12 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
}

// Mark deal added to pieceDeal in HarmonyDB
_, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.ID.String())
_, err = stx.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.ID.String())
if err != nil {
return false, fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.ID.String(), err)
}
}

// TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals
if !c {
// Check if we can index and announce i.e. we have unsealed copy
var exists bool
Expand Down Expand Up @@ -658,11 +680,25 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
} else {
llog.Infof("Skipping indexing as sector %d is not unsealed", deal.SectorID)
}

// Mark deal added to pipeline in HarmonyDB
_, err = stx.Exec(`UPDATE Deals SET Pipeline = TRUE WHERE ID = ?`, deal.ID.String())
if err != nil {
return false, fmt.Errorf("deal: %s: failed to mark deal Pipeline migrated: %w", deal.ID.String(), err)
}
}
return true, nil
}, harmonydb.OptionRetry())
if err != nil {
return err
serr := stx.Rollback()
if serr != nil {
return fmt.Errorf("failed to commit Haromy changes: %w and failed to rollback transaction: %w", err, serr)
}
return fmt.Errorf("failed to commit Haromy changes: %w", err)
}
err = stx.Commit()
if err != nil {
return fmt.Errorf("failed to commit Haromy changes: %w", err)
}
}

Expand Down