Skip to content

Commit

Permalink
Make restore progress available even when cluster is not (#4196)
Browse files Browse the repository at this point in the history
* feat(schema): extend view with build status

We need to store view build status because we want
to display it during in sctool progress.

Ref #4191

* refactor(restore): move view build status to view

Since build status is a part of the view definition
in SM DB schema, we should also reflect it in the code.
This also allows us to get rid of the ViewProgress struct.

* feat(restore_test): test view progress after cluster is unavailable

This test checks for #4191.

* fix(restore): store view build status in SM DB instead of querying it in progress

This way there is no problem with getting sctool progress
even when the cluster is no longer available.
This makes TestRestoreTablesProgressIntegration pass.

Fixes #4191
  • Loading branch information
Michal-Leszczynski authored Jan 21, 2025
1 parent 605d9c9 commit f442616
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 57 deletions.
4 changes: 4 additions & 0 deletions pkg/restapi/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type HealthCheckService interface {
// RepairService service interface for the REST API handlers.
type RepairService interface {
GetRun(ctx context.Context, clusterID, taskID, runID uuid.UUID) (*repair.Run, error)
// GetProgress must work even when the cluster is no longer available.
GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (repair.Progress, error)
GetTarget(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (repair.Target, error)
SetIntensity(ctx context.Context, runID uuid.UUID, intensity float64) error
Expand All @@ -59,15 +60,18 @@ type BackupService interface {
ExtractLocations(ctx context.Context, properties []json.RawMessage) []backupspec.Location
List(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, filter backup.ListFilter) ([]backup.ListItem, error)
ListFiles(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, filter backup.ListFilter) ([]backupspec.FilesInfo, error)
// GetProgress must work even when the cluster is no longer available.
GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (backup.Progress, error)
DeleteSnapshot(ctx context.Context, clusterID uuid.UUID, locations []backupspec.Location, snapshotTags []string) error
GetValidationTarget(_ context.Context, clusterID uuid.UUID, properties json.RawMessage) (backup.ValidationTarget, error)
// GetValidationProgress must work even when the cluster is no longer available.
GetValidationProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) ([]backup.ValidationHostProgress, error)
}

// RestoreService service interface for the REST API handlers.
type RestoreService interface {
GetTargetUnitsViews(ctx context.Context, clusterID uuid.UUID, properties json.RawMessage) (restore.Target, []restore.Unit, []restore.View, error)
// GetProgress must work even when the cluster is no longer available.
GetProgress(ctx context.Context, clusterID, taskID, runID uuid.UUID) (restore.Progress, error)
}

Expand Down
20 changes: 7 additions & 13 deletions pkg/service/restore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,12 @@ const (

// View represents statement used for recreating restored (dropped) views.
type View struct {
Keyspace string `json:"keyspace" db:"keyspace_name"`
View string `json:"view" db:"view_name"`
Type ViewType `json:"type" db:"view_type"`
BaseTable string `json:"base_table"`
CreateStmt string `json:"create_stmt"`
Keyspace string `json:"keyspace" db:"keyspace_name"`
View string `json:"view" db:"view_name"`
Type ViewType `json:"type" db:"view_type"`
BaseTable string `json:"base_table"`
CreateStmt string `json:"create_stmt"`
BuildStatus scyllaclient.ViewBuildStatus `json:"status"`
}

func (t View) MarshalUDT(name string, info gocql.TypeInfo) ([]byte, error) {
Expand Down Expand Up @@ -235,7 +236,7 @@ type Progress struct {
SnapshotTag string `json:"snapshot_tag"`
Keyspaces []KeyspaceProgress `json:"keyspaces,omitempty"`
Hosts []HostProgress `json:"hosts,omitempty"`
Views []ViewProgress `json:"views,omitempty"`
Views []View `json:"views,omitempty"`
Stage Stage `json:"stage"`
}

Expand Down Expand Up @@ -266,13 +267,6 @@ type TableProgress struct {
Error string `json:"error,omitempty"`
}

// ViewProgress defines restore progress for the view.
type ViewProgress struct {
View

Status scyllaclient.ViewBuildStatus `json:"status"`
}

// TableName represents full table name.
type TableName struct {
Keyspace string
Expand Down
27 changes: 3 additions & 24 deletions pkg/service/restore/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
package restore

import (
"context"
"slices"
"time"

"github.com/pkg/errors"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/gocqlx/v2/qb"
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)
Expand All @@ -26,7 +25,7 @@ type tableKey struct {
}

// aggregateProgress returns restore progress information classified by keyspace and tables.
func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {
func (w *worker) aggregateProgress() (Progress, error) {
var (
p = Progress{
SnapshotTag: w.run.SnapshotTag,
Expand Down Expand Up @@ -99,27 +98,7 @@ func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {

p.extremeToNil()

for _, v := range w.run.Views {
viewTableName := v.View
if v.Type == SecondaryIndex {
viewTableName += "_index"
}

status, err := w.client.ViewBuildStatus(ctx, v.Keyspace, viewTableName)
if err != nil {
w.logger.Error(ctx, "Couldn't get view build status",
"keyspace", v.Keyspace,
"view", v.View,
"error", err,
)
status = scyllaclient.StatusUnknown
}
p.Views = append(p.Views, ViewProgress{
View: v,
Status: status,
})
}

p.Views = slices.Clone(w.run.Views)
for _, hp := range hostProgress {
p.Hosts = append(p.Hosts, hp)
}
Expand Down
82 changes: 82 additions & 0 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,3 +989,85 @@ func TestRestoreTablesMultiLocationIntegration(t *testing.T) {
t.Fatalf("tables have different contents\nsrc:\n%v\ndst:\n%v", srcM, dstM)
}
}

func TestRestoreTablesProgressIntegration(t *testing.T) {
// It verifies that:
// - view status progress is correct
// - progress is available even when cluster is not

if IsIPV6Network() {
t.Skip("nodes don't have ip6tables and related modules to properly simulate unavailable cluster")
}

h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts())

Print("Keyspace setup")
ks := randomizedName("progress_")
ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}"
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(ksStmt, ks, 1))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(ksStmt, ks, 1))

Print("Table setup")
tab := randomizedName("tab_")
tabStmt := "CREATE TABLE %q.%q (id int PRIMARY KEY, data int)"
ExecStmt(t, h.srcCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))
ExecStmt(t, h.dstCluster.rootSession, fmt.Sprintf(tabStmt, ks, tab))

Print("View setup")
mv := randomizedName("mv_")
CreateMaterializedView(t, h.srcCluster.rootSession, ks, tab, mv)
CreateMaterializedView(t, h.dstCluster.rootSession, ks, tab, mv)

Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 1, ks, tab)

Print("Run backup")
loc := []Location{testLocation("progress", "")}
S3InitBucket(t, loc[0].Path)
tag := h.runBackup(t, map[string]any{
"location": loc,
})

Print("Run restore")
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, nil, h.dstUser)
h.runRestore(t, map[string]any{
"location": loc,
"snapshot_tag": tag,
"restore_tables": true,
})

Print("Validate success")
validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, tab, "id", "data")
validateTableContent[int, int](t, h.srcCluster.rootSession, h.dstCluster.rootSession, ks, mv, "id", "data")

Print("Validate view progress")
pr, err := h.dstRestoreSvc.GetProgress(context.Background(), h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID)
if err != nil {
t.Fatal(errors.Wrap(err, "get progress"))
}
for _, v := range pr.Views {
if v.BuildStatus != scyllaclient.StatusSuccess {
t.Fatalf("Expected status: %s, got: %s", scyllaclient.StatusSuccess, v.BuildStatus)
}
}

BlockREST(t, ManagedClusterHosts()...)
defer func() {
TryUnblockREST(t, ManagedClusterHosts())
if err := EnsureNodesAreUP(t, ManagedClusterHosts(), time.Minute); err != nil {
t.Fatal(err)
}
}()
time.Sleep(100 * time.Millisecond)

Print("Validate view progress when cluster is unavailable")
pr, err = h.dstRestoreSvc.GetProgress(context.Background(), h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID)
if err != nil {
t.Fatal(errors.Wrap(err, "get progress"))
}
for _, v := range pr.Views {
if v.BuildStatus != scyllaclient.StatusSuccess {
t.Fatalf("Expected status: %s, got: %s", scyllaclient.StatusSuccess, v.BuildStatus)
}
}
}
18 changes: 4 additions & 14 deletions pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,8 @@ func (s *Service) GetProgress(ctx context.Context, clusterID, taskID, runID uuid
return Progress{}, errors.Wrap(err, "get run")
}

w, err := s.newProgressWorker(ctx, run)
if err != nil {
return Progress{}, errors.Wrap(err, "create progress worker")
}

pr, err := w.aggregateProgress(ctx)
w := s.newProgressWorker(run)
pr, err := w.aggregateProgress()
if err != nil {
return Progress{}, err
}
Expand Down Expand Up @@ -221,20 +217,14 @@ func (w *worker) setRunInfo(taskID, runID uuid.UUID) {
w.run.ID = runID
}

func (s *Service) newProgressWorker(ctx context.Context, run *Run) (worker, error) {
client, err := s.scyllaClient(ctx, run.ClusterID)
if err != nil {
return worker{}, errors.Wrap(err, "get client")
}

func (s *Service) newProgressWorker(run *Run) worker {
return worker{
run: run,
config: s.config,
logger: s.logger,
metrics: s.metrics,
client: client,
session: s.session,
}, nil
}
}

// GetRun returns run with specified cluster, task and run ID.
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ func (w *tablesWorker) restore(ctx context.Context) error {
return nil
},
StageRecreateViews: func() error {
for _, v := range w.run.Views {
for i, v := range w.run.Views {
if err := w.CreateView(ctx, v); err != nil {
return errors.Wrapf(err, "recreate %s.%s with statement %s", v.Keyspace, v.View, v.CreateStmt)
}
if err := w.WaitForViewBuilding(ctx, v); err != nil {
if err := w.WaitForViewBuilding(ctx, &w.run.Views[i]); err != nil {
return errors.Wrapf(err, "wait for %s.%s", v.Keyspace, v.View)
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/service/restore/worker_views.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (w *worker) CreateView(ctx context.Context, view View) error {
return alterSchemaRetryWrapper(ctx, op, notify)
}

func (w *worker) WaitForViewBuilding(ctx context.Context, view View) error {
func (w *worker) WaitForViewBuilding(ctx context.Context, view *View) error {
labels := metrics.RestoreViewBuildStatusLabels{
ClusterID: w.run.ClusterID.String(),
Keyspace: view.Keyspace,
Expand All @@ -90,6 +90,8 @@ func (w *worker) WaitForViewBuilding(ctx context.Context, view View) error {
return retry.Permanent(err)
}

view.BuildStatus = status
w.insertRun(ctx)
switch status {
case scyllaclient.StatusUnknown:
w.metrics.SetViewBuildStatus(labels, metrics.BuildStatusUnknown)
Expand Down
8 changes: 5 additions & 3 deletions pkg/testutils/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ func WaitForNodeUPOrTimeout(h string, timeout time.Duration) error {
}

// BlockREST blocks the Scylla API ports on h machine by dropping TCP packets.
func BlockREST(t *testing.T, h string) {
func BlockREST(t *testing.T, hosts ...string) {
t.Helper()
if err := RunIptablesCommand(t, h, CmdBlockScyllaREST); err != nil {
t.Error(err)
for _, host := range hosts {
if err := RunIptablesCommand(t, host, CmdBlockScyllaREST); err != nil {
t.Error(err)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions schema/v3.5.0.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE restore_view ADD build_status text;

0 comments on commit f442616

Please sign in to comment.