From bf6f9c5fc301844c56879f95cae6a7fd031c615d Mon Sep 17 00:00:00 2001 From: Aleksandar Petrov <8142643+aleks-p@users.noreply.github.com> Date: Fri, 30 Aug 2024 13:42:16 -0300 Subject: [PATCH] feat: do not recover from panics with fatal errors (#3534) * feat: do not recover from panics with fatal errors * Do not panic on logical compaction issues (bugs) --- pkg/experiment/metastore/metastore_fsm.go | 15 ++++ .../metastore/metastore_fsm_test.go | 80 +++++++++++++++++++ .../metastore_state_poll_compaction_jobs.go | 31 +++++-- ...tastore_state_poll_compaction_jobs_test.go | 14 ++++ 4 files changed, 134 insertions(+), 6 deletions(-) create mode 100644 pkg/experiment/metastore/metastore_fsm_test.go diff --git a/pkg/experiment/metastore/metastore_fsm.go b/pkg/experiment/metastore/metastore_fsm.go index 5d7c2caba5..a015b56825 100644 --- a/pkg/experiment/metastore/metastore_fsm.go +++ b/pkg/experiment/metastore/metastore_fsm.go @@ -1,6 +1,7 @@ package metastore import ( + "errors" "fmt" "io" "reflect" @@ -57,6 +58,16 @@ type fsmError struct { err error } +type fatalCommandError struct { + err error +} + +func (e fatalCommandError) Error() string { + return fmt.Sprintf("fatal FSM command error: %v", e.err) +} + +func (e fatalCommandError) Unwrap() error { return e } + func errResponse(l *raft.Log, err error) fsmResponse { return fsmResponse{err: &fsmError{log: l, err: err}} } @@ -129,6 +140,10 @@ func handleCommand[Req, Resp proto.Message](raw []byte, cmd *raft.Log, call comm var resp fsmResponse defer func() { if r := recover(); r != nil { + var fCommandError fatalCommandError + if errors.As(r.(error), &fCommandError) { + panic(fCommandError) + } resp.err = util.PanicError(r) } }() diff --git a/pkg/experiment/metastore/metastore_fsm_test.go b/pkg/experiment/metastore/metastore_fsm_test.go new file mode 100644 index 0000000000..c2fcd0a66f --- /dev/null +++ b/pkg/experiment/metastore/metastore_fsm_test.go @@ -0,0 +1,80 @@ +package metastore + +import ( + "errors" + "testing" + + "github.com/hashicorp/raft" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" + + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" +) + +var testError = errors.New("test error") +var typedNil *metastorev1.AddBlockResponse = nil + +func Test_handleCommandErrorHandling(t *testing.T) { + type args[Req proto.Message, Resp proto.Message] struct { + raw []byte + cmd *raft.Log + call commandCall[Req, Resp] + } + type testCase[Req proto.Message, Resp proto.Message] struct { + name string + args args[Req, Resp] + want fsmResponse + wantPanic bool + } + tests := []testCase[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse]{ + { + name: "no error", + args: args[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse]{ + raw: make([]byte, 0), + cmd: &raft.Log{}, + call: func(log *raft.Log, request *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { + return &metastorev1.AddBlockResponse{}, nil + }, + }, + want: fsmResponse{ + msg: &metastorev1.AddBlockResponse{}, + err: nil, + }, + }, + { + name: "a simple error is returned", + args: args[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse]{ + raw: make([]byte, 0), + cmd: &raft.Log{}, + call: func(log *raft.Log, request *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { + return nil, testError + }, + }, + want: fsmResponse{ + msg: typedNil, + err: testError, + }, + }, + { + name: "a panic with a fatal error results in a real panic", + args: args[*metastorev1.AddBlockRequest, *metastorev1.AddBlockResponse]{ + raw: make([]byte, 0), + cmd: &raft.Log{}, + call: func(log *raft.Log, request *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) { + panic(fatalCommandError{testError}) + }, + }, + wantPanic: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer func() { + if r := recover(); r != nil { + assert.True(t, tt.wantPanic) + } + }() + assert.Equal(t, tt.want, handleCommand(tt.args.raw, tt.args.cmd, tt.args.call)) + }) + } +} diff --git a/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go b/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go index a735d800df..ab605425b1 100644 --- a/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go +++ b/pkg/experiment/metastore/metastore_state_poll_compaction_jobs.go @@ -65,12 +65,12 @@ func (m *metastoreState) pollCompactionJobs(request *compactorv1.PollCompactionJ level.Warn(m.logger).Log("msg", "job is not assigned to the worker", "job", jobUpdate.JobName, "raft_log_index", jobUpdate.RaftLogIndex) continue } - jobKey := tenantShard{tenant: job.TenantId, shard: job.Shard} level.Debug(m.logger).Log("msg", "processing status update for compaction job", "job", jobUpdate.JobName, "status", jobUpdate.Status) switch jobUpdate.Status { case compactorv1.CompactionStatus_COMPACTION_STATUS_SUCCESS: // clean up the job, we don't keep completed jobs around m.compactionJobQueue.evict(job.Name, job.RaftLogIndex) + jobKey := tenantShard{tenant: job.TenantId, shard: job.Shard} stateUpdate.deletedJobs[jobKey] = append(stateUpdate.deletedJobs[jobKey], job.Name) m.compactionMetrics.completedJobs.WithLabelValues( fmt.Sprint(job.Shard), job.TenantId, fmt.Sprint(job.CompactionLevel)).Inc() @@ -179,7 +179,7 @@ func (m *metastoreState) pollCompactionJobs(request *compactorv1.PollCompactionJ err = m.writeToDb(stateUpdate) if err != nil { - panic(fmt.Errorf("error writing metastore compaction state to db: %w", err)) + panic(fatalCommandError{fmt.Errorf("error persisting metadata state to db, %w", err)}) } return resp, nil @@ -268,7 +268,12 @@ func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error { for _, b := range blocks { block := m.findBlock(shard, b) if block == nil { - return fmt.Errorf("block %s not found in shard %d", b, shard) + level.Error(m.logger).Log( + "msg", "a newly compacted block could not be found", + "block", b, + "shard", shard, + ) + continue } name, key := keyForBlockMeta(shard, "", b) err := updateBlockMetadataBucket(tx, name, func(bucket *bbolt.Bucket) error { @@ -294,7 +299,11 @@ func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error { for _, jobName := range sTable.newJobs { job := m.findJob(jobName) if job == nil { - return fmt.Errorf("job %s not found", jobName) + level.Error(m.logger).Log( + "msg", "a newly added job could not be found", + "job", jobName, + ) + continue } err := m.persistCompactionJob(job.Shard, job.TenantId, job, tx) if err != nil { @@ -305,7 +314,13 @@ func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error { for _, l := range levels { queue := m.getOrCreateCompactionBlockQueue(key).blocksByLevel[l] if queue == nil { - return fmt.Errorf("block queue for %v and level %d not found", key, l) + level.Error(m.logger).Log( + "msg", "block queue not found", + "shard", key.shard, + "tenant", key.tenant, + "level", l, + ) + continue } err := m.persistCompactionJobBlockQueue(key.shard, key.tenant, l, queue, tx) if err != nil { @@ -327,7 +342,11 @@ func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error { for _, jobName := range sTable.updatedJobs { job := m.findJob(jobName) if job == nil { - return fmt.Errorf("job %s not found", jobName) + level.Error(m.logger).Log( + "msg", "an updated job could not be found", + "job", jobName, + ) + continue } err := m.persistCompactionJob(job.Shard, job.TenantId, job, tx) if err != nil { diff --git a/pkg/experiment/metastore/metastore_state_poll_compaction_jobs_test.go b/pkg/experiment/metastore/metastore_state_poll_compaction_jobs_test.go index 3ba17e43ff..1a35e2270e 100644 --- a/pkg/experiment/metastore/metastore_state_poll_compaction_jobs_test.go +++ b/pkg/experiment/metastore/metastore_state_poll_compaction_jobs_test.go @@ -271,6 +271,20 @@ func Test_FailedCompaction(t *testing.T) { verifyCompactionState(t, m) } +func Test_PanicWithDbErrors(t *testing.T) { + m := initState(t) + addLevel0Blocks(m, 20) + + // set up panic recovery + defer func() { + r := recover() + require.NotNilf(t, r, "we should panic when a DB error is returned") + }() + // close the db, this should cause errors when persisting the state + _ = m.db.boltdb.Close() + _, _ = m.pollCompactionJobs(&compactorv1.PollCompactionJobsRequest{JobCapacity: 2}, 20, 20) +} + func addLevel0Blocks(m *metastoreState, count int) { for i := 0; i < count; i++ { b := createBlock(i, 0, "", 0)