From 7bcbac9d615a6cf820e2ecc0530f9e7f578d8ddd Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 20 Sep 2024 15:58:04 +0800 Subject: [PATCH] refactor: get task job and delete task job (#3522) Signed-off-by: Gaius --- internal/job/types.go | 48 ++++---- manager/job/job.go | 3 +- manager/job/mocks/task_mock.go | 39 +++---- manager/job/task.go | 34 +++--- manager/job/task_test.go | 202 --------------------------------- manager/service/job.go | 96 +++++++++------- pkg/slices/slices.go | 17 ++- pkg/slices/slices_test.go | 24 +++- scheduler/job/job.go | 106 +++++++---------- test/e2e/v2/manager/job.go | 59 ++++++++++ test/e2e/v2/manager/preheat.go | 32 ------ 11 files changed, 240 insertions(+), 420 deletions(-) delete mode 100644 manager/job/task_test.go create mode 100644 test/e2e/v2/manager/job.go diff --git a/internal/job/types.go b/internal/job/types.go index 179ecc02608..4f4f431470f 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -18,12 +18,6 @@ package job import ( "time" - - "github.com/bits-and-blooms/bitset" - - nethttp "d7y.io/dragonfly/v2/pkg/net/http" - "d7y.io/dragonfly/v2/scheduler/config" - "d7y.io/dragonfly/v2/scheduler/resource" ) // PreheatRequest defines the request parameters for preheating. @@ -64,7 +58,8 @@ type PreheatFailureTask struct { // GetTaskRequest defines the request parameters for getting task. type GetTaskRequest struct { - TaskID string `json:"task_id" validate:"required"` + TaskID string `json:"task_id" validate:"required"` + Timeout time.Duration `json:"timeout" validate:"omitempty"` } // GetTaskResponse defines the response parameters for getting task. @@ -75,19 +70,12 @@ type GetTaskResponse struct { // Peer represents the peer information. type Peer struct { - ID string `json:"id"` - Config *config.ResourceConfig `json:"config,omitempty"` - Range *nethttp.Range `json:"range,omitempty"` - Priority int32 `json:"priority"` - Pieces map[int32]*resource.Piece `json:"pieces,omitempty"` - FinishedPieces *bitset.BitSet `json:"finished_pieces,omitempty"` - PieceCosts []time.Duration `json:"piece_costs"` - Cost time.Duration `json:"cost,omitempty"` - BlockParents []string `json:"block_parents"` - NeedBackToSource bool `json:"need_back_to_source"` - PieceUpdatedAt time.Time `json:"piece_updated_at"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID string `json:"id"` + Hostname string `json:"hostname"` + IP string `json:"ip"` + HostType string `json:"host_type"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } // DeleteTaskRequest defines the request parameters for deleting task. @@ -98,18 +86,22 @@ type DeleteTaskRequest struct { // DeleteTaskResponse defines the response parameters for deleting task. type DeleteTaskResponse struct { - SuccessPeers []*DeleteSuccessPeer `json:"success_peers"` - FailurePeers []*DeleteFailurePeer `json:"failure_peers"` + SuccessTasks []*DeleteSuccessTask `json:"success_tasks"` + FailureTasks []*DeleteFailureTask `json:"failure_tasks"` SchedulerClusterID uint `json:"scheduler_cluster_id"` } -// DeleteSuccessPeer defines the response parameters for deleting peer successfully. -type DeleteSuccessPeer struct { - Peer +// DeleteSuccessTask defines the response parameters for deleting peer successfully. +type DeleteSuccessTask struct { + Hostname string `json:"hostname"` + IP string `json:"ip"` + HostType string `json:"host_type"` } -// DeleteFailurePeer defines the response parameters for deleting peer failed. -type DeleteFailurePeer struct { - Peer +// DeleteFailureTask defines the response parameters for deleting peer failed. +type DeleteFailureTask struct { + Hostname string `json:"hostname"` + IP string `json:"ip"` + HostType string `json:"host_type"` Description string `json:"description"` } diff --git a/manager/job/job.go b/manager/job/job.go index d68878cab11..409e56a265a 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -83,12 +83,11 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) { return nil, err } - task := newTask(j) return &Job{ Job: j, Preheat: preheat, SyncPeers: syncPeers, - Task: task, + Task: newTask(j), GC: gc, }, nil } diff --git a/manager/job/mocks/task_mock.go b/manager/job/mocks/task_mock.go index 6bd44961a62..82b59c2670e 100644 --- a/manager/job/mocks/task_mock.go +++ b/manager/job/mocks/task_mock.go @@ -16,7 +16,6 @@ import ( job "d7y.io/dragonfly/v2/internal/job" models "d7y.io/dragonfly/v2/manager/models" types "d7y.io/dragonfly/v2/manager/types" - tasks "github.com/RichardKnop/machinery/v1/tasks" gomock "go.uber.org/mock/gomock" ) @@ -43,34 +42,32 @@ func (m *MockTask) EXPECT() *MockTaskMockRecorder { return m.recorder } -// DeleteTask mocks base method. -func (m *MockTask) DeleteTask(arg0 context.Context, arg1 []models.Scheduler, arg2 types.DeleteTaskArgs) (*tasks.Group, map[string]*job.DeleteTaskResponse, error) { +// CreateDeleteTask mocks base method. +func (m *MockTask) CreateDeleteTask(arg0 context.Context, arg1 []models.Scheduler, arg2 types.DeleteTaskArgs) (*job.GroupJobState, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteTask", arg0, arg1, arg2) - ret0, _ := ret[0].(*tasks.Group) - ret1, _ := ret[1].(map[string]*job.DeleteTaskResponse) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret := m.ctrl.Call(m, "CreateDeleteTask", arg0, arg1, arg2) + ret0, _ := ret[0].(*job.GroupJobState) + ret1, _ := ret[1].(error) + return ret0, ret1 } -// DeleteTask indicates an expected call of DeleteTask. -func (mr *MockTaskMockRecorder) DeleteTask(arg0, arg1, arg2 any) *gomock.Call { +// CreateDeleteTask indicates an expected call of CreateDeleteTask. +func (mr *MockTaskMockRecorder) CreateDeleteTask(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockTask)(nil).DeleteTask), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDeleteTask", reflect.TypeOf((*MockTask)(nil).CreateDeleteTask), arg0, arg1, arg2) } -// GetTask mocks base method. -func (m *MockTask) GetTask(arg0 context.Context, arg1 []models.Scheduler, arg2 types.GetTaskArgs) (*tasks.Group, map[string]*job.GetTaskResponse, error) { +// CreateGetTask mocks base method. +func (m *MockTask) CreateGetTask(arg0 context.Context, arg1 []models.Scheduler, arg2 types.GetTaskArgs) (*job.GroupJobState, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetTask", arg0, arg1, arg2) - ret0, _ := ret[0].(*tasks.Group) - ret1, _ := ret[1].(map[string]*job.GetTaskResponse) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret := m.ctrl.Call(m, "CreateGetTask", arg0, arg1, arg2) + ret0, _ := ret[0].(*job.GroupJobState) + ret1, _ := ret[1].(error) + return ret0, ret1 } -// GetTask indicates an expected call of GetTask. -func (mr *MockTaskMockRecorder) GetTask(arg0, arg1, arg2 any) *gomock.Call { +// CreateGetTask indicates an expected call of CreateGetTask. +func (mr *MockTaskMockRecorder) CreateGetTask(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTask", reflect.TypeOf((*MockTask)(nil).GetTask), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGetTask", reflect.TypeOf((*MockTask)(nil).CreateGetTask), arg0, arg1, arg2) } diff --git a/manager/job/task.go b/manager/job/task.go index 47959b24f9f..9ac70036163 100644 --- a/manager/job/task.go +++ b/manager/job/task.go @@ -36,11 +36,11 @@ import ( // Task is an interface for manager tasks. type Task interface { - // CreateDeleteTask create a delete task job. - CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTaskArgs) (*internaljob.GroupJobState, error) - // CreateGetTask create a get task job. CreateGetTask(context.Context, []models.Scheduler, types.GetTaskArgs) (*internaljob.GroupJobState, error) + + // CreateDeleteTask create a delete task job. + CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTaskArgs) (*internaljob.GroupJobState, error) } // task is an implementation of Task. @@ -53,16 +53,16 @@ func newTask(job *internaljob.Job) Task { return &task{job} } -// CreateDeleteTask create a delete task job. -func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTaskArgs) (*internaljob.GroupJobState, error) { +// CreateGetTask create a get task job. +func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, json types.GetTaskArgs) (*internaljob.GroupJobState, error) { var span trace.Span - ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer)) - span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID)) + ctx, span = tracer.Start(ctx, config.SpanGetTask, trace.WithSpanKind(trace.SpanKindProducer)) + span.SetAttributes(config.AttributeGetTaskID.String(json.TaskID)) defer span.End() args, err := internaljob.MarshalRequest(json) if err != nil { - logger.Errorf("delete task marshal request: %v, error: %v", args, err) + logger.Errorf("get tasks marshal request: %v, error: %v", args, err) return nil, err } @@ -75,7 +75,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul for _, queue := range queues { signatures = append(signatures, &machineryv1tasks.Signature{ UUID: fmt.Sprintf("task_%s", uuid.New().String()), - Name: internaljob.DeleteTaskJob, + Name: internaljob.GetTaskJob, RoutingKey: queue.String(), Args: args, }) @@ -93,7 +93,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul logger.Infof("create task group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) if _, err := t.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { - logger.Errorf("create preheat group %s failed", group.GroupUUID, err) + logger.Errorf("create task group %s failed", group.GroupUUID, err) return nil, err } @@ -104,16 +104,16 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul }, nil } -// CreateGetTask create a get task job. -func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, json types.GetTaskArgs) (*internaljob.GroupJobState, error) { +// CreateDeleteTask create a delete task job. +func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTaskArgs) (*internaljob.GroupJobState, error) { var span trace.Span - ctx, span = tracer.Start(ctx, config.SpanGetTask, trace.WithSpanKind(trace.SpanKindProducer)) - span.SetAttributes(config.AttributeGetTaskID.String(json.TaskID)) + ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer)) + span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID)) defer span.End() args, err := internaljob.MarshalRequest(json) if err != nil { - logger.Errorf("list tasks marshal request: %v, error: %v", args, err) + logger.Errorf("delete task marshal request: %v, error: %v", args, err) return nil, err } @@ -126,7 +126,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, for _, queue := range queues { signatures = append(signatures, &machineryv1tasks.Signature{ UUID: fmt.Sprintf("task_%s", uuid.New().String()), - Name: internaljob.GetTaskJob, + Name: internaljob.DeleteTaskJob, RoutingKey: queue.String(), Args: args, }) @@ -144,7 +144,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, logger.Infof("create task group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) if _, err := t.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { - logger.Errorf("create task group %s failed", group.GroupUUID, err) + logger.Errorf("create preheat group %s failed", group.GroupUUID, err) return nil, err } diff --git a/manager/job/task_test.go b/manager/job/task_test.go deleted file mode 100644 index 70d070a6d9b..00000000000 --- a/manager/job/task_test.go +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright 2024 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package job - -import ( - "context" - "errors" - "testing" - - machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" - testifyassert "github.com/stretchr/testify/assert" - "go.uber.org/mock/gomock" - - "d7y.io/dragonfly/v2/internal/job" - internaljob "d7y.io/dragonfly/v2/internal/job" - "d7y.io/dragonfly/v2/manager/job/mocks" - "d7y.io/dragonfly/v2/manager/models" - "d7y.io/dragonfly/v2/manager/types" -) - -func TestDeleteTask(t *testing.T) { - tests := []struct { - name string - setupMocks func(mockTask *mocks.MockTask) - ctx context.Context - schedulers []models.Scheduler - args types.DeleteTaskArgs - expect func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.DeleteTaskResponse, err error) - }{ - { - name: "DeleteTask succeeds", - setupMocks: func(mockTask *mocks.MockTask) { - expectedGroup := &machineryv1tasks.Group{ - GroupUUID: "test-group-uuid", - } - expectedResponses := map[string]*job.DeleteTaskResponse{ - "scheduler1": { - SuccessPeers: []*job.DeleteSuccessPeer{ - { - Peer: internaljob.Peer{ID: "peer1"}, - }, - }, - FailurePeers: []*job.DeleteFailurePeer{}, - }, - "scheduler2": { - SuccessPeers: []*job.DeleteSuccessPeer{}, - FailurePeers: []*job.DeleteFailurePeer{ - { - Peer: internaljob.Peer{ID: "peer2"}, - Description: "Failed to delete", - }, - }, - }, - } - mockTask.EXPECT().DeleteTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(expectedGroup, expectedResponses, nil) - }, - ctx: context.TODO(), - schedulers: []models.Scheduler{ - {Hostname: "scheduler1"}, - {Hostname: "scheduler2"}, - }, - args: types.DeleteTaskArgs{ - TaskID: "test-task-id", - }, - expect: func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.DeleteTaskResponse, err error) { - assert := testifyassert.New(t) - assert.NoError(err) - assert.Equal("test-group-uuid", group.GroupUUID) - assert.Equal("peer1", responses["scheduler1"].SuccessPeers[0].Peer.ID) - assert.Equal("peer2", responses["scheduler2"].FailurePeers[0].Peer.ID) - }, - }, - { - name: "DeleteTask fails", - setupMocks: func(mockTask *mocks.MockTask) { - mockTask.EXPECT().DeleteTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil, errors.New("delete task error")) - }, - ctx: context.TODO(), - schedulers: []models.Scheduler{ - {Hostname: "scheduler1"}, - }, - args: types.DeleteTaskArgs{ - TaskID: "test-task-id", - }, - expect: func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.DeleteTaskResponse, err error) { - assert := testifyassert.New(t) - assert.Error(err) - assert.Nil(group) - assert.Nil(responses) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockTask := mocks.NewMockTask(ctrl) - tc.setupMocks(mockTask) - - group, responses, err := mockTask.DeleteTask(tc.ctx, tc.schedulers, tc.args) - tc.expect(t, group, responses, err) - }) - } -} - -func TestGetTask(t *testing.T) { - tests := []struct { - name string - setupMocks func(mockTask *mocks.MockTask) - ctx context.Context - schedulers []models.Scheduler - args types.GetTaskArgs - expect func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.GetTaskResponse, err error) - }{ - { - name: "GetTask succeeds", - setupMocks: func(mockTask *mocks.MockTask) { - expectedGroup := &machineryv1tasks.Group{ - GroupUUID: "test-group-uuid", - } - expectedResponses := map[string]*job.GetTaskResponse{ - "scheduler1": { - Peers: []*internaljob.Peer{ - {ID: "peer1"}, - {ID: "peer2"}, - }, - }, - "scheduler2": { - Peers: []*internaljob.Peer{ - {ID: "peer3"}, - {ID: "peer4"}, - }, - }, - } - mockTask.EXPECT().GetTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(expectedGroup, expectedResponses, nil) - }, - ctx: context.TODO(), - schedulers: []models.Scheduler{ - {Hostname: "scheduler1"}, - {Hostname: "scheduler2"}, - }, - args: types.GetTaskArgs{ - TaskID: "test-task-id", - }, - expect: func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.GetTaskResponse, err error) { - assert := testifyassert.New(t) - assert.NoError(err) - assert.Equal("test-group-uuid", group.GroupUUID) - assert.Equal("peer1", responses["scheduler1"].Peers[0].ID) - assert.Equal("peer3", responses["scheduler2"].Peers[0].ID) - }, - }, - { - name: "GetTask fails", - setupMocks: func(mockTask *mocks.MockTask) { - mockTask.EXPECT().GetTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil, errors.New("get task error")) - }, - ctx: context.TODO(), - schedulers: []models.Scheduler{ - {Hostname: "scheduler1"}, - }, - args: types.GetTaskArgs{ - TaskID: "test-task-id", - }, - expect: func(t *testing.T, group *machineryv1tasks.Group, responses map[string]*job.GetTaskResponse, err error) { - assert := testifyassert.New(t) - assert.Error(err) - assert.Nil(group) - assert.Nil(responses) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockTask := mocks.NewMockTask(ctrl) - tc.setupMocks(mockTask) - - group, responses, err := mockTask.GetTask(tc.ctx, tc.schedulers, tc.args) - tc.expect(t, group, responses, err) - }) - } -} diff --git a/manager/service/job.go b/manager/service/job.go index 40c78eca19b..3f4eaa2de9a 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -50,7 +50,7 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat return nil, err } - candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs, []string{types.SchedulerFeaturePreheat}) if err != nil { return nil, err } @@ -83,31 +83,27 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat return &job, nil } -func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDeleteTaskJobRequest) (*models.Job, error) { - if json.Args.Timeout == 0 { - json.Args.Timeout = types.DefaultJobTimeout - } - +func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTaskJobRequest) (*models.Job, error) { args, err := structure.StructToMap(json.Args) if err != nil { return nil, err } - allSchedulers, err := s.findAllSchedulers(ctx, json.SchedulerClusterIDs) + schedulers, err := s.findSchedulers(ctx, json.SchedulerClusterIDs) if err != nil { return nil, err } - var allSchedulerClusters []models.SchedulerCluster - for _, allScheduler := range allSchedulers { - allSchedulerClusters = append(allSchedulerClusters, allScheduler.SchedulerCluster) - } - - groupJobState, err := s.job.CreateDeleteTask(ctx, allSchedulers, json.Args) + groupJobState, err := s.job.CreateGetTask(ctx, schedulers, json.Args) if err != nil { return nil, err } + var schedulerClusters []models.SchedulerCluster + for _, scheduler := range schedulers { + schedulerClusters = append(schedulerClusters, scheduler.SchedulerCluster) + } + job := models.Job{ TaskID: groupJobState.GroupUUID, BIO: json.BIO, @@ -115,38 +111,42 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele State: groupJobState.State, Args: args, UserID: json.UserID, - SchedulerClusters: allSchedulerClusters, + SchedulerClusters: schedulerClusters, } if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { return nil, err } - go s.pollingJob(context.Background(), internaljob.DeleteTaskJob, job.ID, job.TaskID) + go s.pollingJob(context.Background(), internaljob.GetTaskJob, job.ID, job.TaskID) return &job, nil } -func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTaskJobRequest) (*models.Job, error) { - allSchedulers, err := s.findAllSchedulers(ctx, json.SchedulerClusterIDs) - if err != nil { - return nil, err +func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDeleteTaskJobRequest) (*models.Job, error) { + if json.Args.Timeout == 0 { + json.Args.Timeout = types.DefaultJobTimeout } - var allSchedulerClusters []models.SchedulerCluster - for _, allScheduler := range allSchedulers { - allSchedulerClusters = append(allSchedulerClusters, allScheduler.SchedulerCluster) + args, err := structure.StructToMap(json.Args) + if err != nil { + return nil, err } - args, err := structure.StructToMap(json.Args) + schedulers, err := s.findSchedulers(ctx, json.SchedulerClusterIDs) if err != nil { return nil, err } - groupJobState, err := s.job.CreateGetTask(ctx, allSchedulers, json.Args) + groupJobState, err := s.job.CreateDeleteTask(ctx, schedulers, json.Args) if err != nil { return nil, err } + var schedulerClusters []models.SchedulerCluster + for _, scheduler := range schedulers { + schedulerClusters = append(schedulerClusters, scheduler.SchedulerCluster) + } + job := models.Job{ TaskID: groupJobState.GroupUUID, BIO: json.BIO, @@ -154,19 +154,19 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask State: groupJobState.State, Args: args, UserID: json.UserID, - SchedulerClusters: allSchedulerClusters, + SchedulerClusters: schedulerClusters, } if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { return nil, err } - go s.pollingJob(context.Background(), internaljob.GetTaskJob, job.ID, job.TaskID) + go s.pollingJob(context.Background(), internaljob.DeleteTaskJob, job.ID, job.TaskID) return &job, nil } -func (s *service) findAllSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { - var availableSchedulers []models.Scheduler +func (s *service) findSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { + var activeSchedulers []models.Scheduler if len(schedulerClusterIDs) != 0 { // Find the scheduler clusters by request. for _, schedulerClusterID := range schedulerClusterIDs { @@ -183,36 +183,36 @@ func (s *service) findAllSchedulers(ctx context.Context, schedulerClusterIDs []u return nil, err } - availableSchedulers = append(availableSchedulers, schedulers...) + activeSchedulers = append(activeSchedulers, schedulers...) } } else { // Find all of the scheduler clusters that has active schedulers. - var availableSchedulersClusters []models.SchedulerCluster - if err := s.db.WithContext(ctx).Find(&availableSchedulersClusters).Error; err != nil { + var schedulerClusters []models.SchedulerCluster + if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil { return nil, err } - for _, availableSchedulersCluster := range availableSchedulersClusters { + for _, schedulerCluster := range schedulerClusters { var schedulers []models.Scheduler if err := s.db.WithContext(ctx).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{ - SchedulerClusterID: availableSchedulersCluster.ID, + SchedulerClusterID: schedulerCluster.ID, State: models.SchedulerStateActive, }).Error; err != nil { continue } - availableSchedulers = append(availableSchedulers, schedulers...) + activeSchedulers = append(activeSchedulers, schedulers...) } } - if len(availableSchedulers) == 0 { - return nil, errors.New("available schedulers not found") + if len(activeSchedulers) == 0 { + return nil, errors.New("active schedulers not found") } - return availableSchedulers, nil + return activeSchedulers, nil } -func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { +func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint, features []string) ([]models.Scheduler, error) { var candidateSchedulers []models.Scheduler if len(schedulerClusterIDs) != 0 { // Find the scheduler clusters by request. @@ -230,9 +230,15 @@ func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterI return nil, err } - // Scan the schedulers to find the first scheduler that supports preheat. for _, scheduler := range schedulers { - if slices.Contains(scheduler.Features, types.SchedulerFeaturePreheat) { + // If the features is empty, return the first scheduler directly. + if len(features) == 0 { + candidateSchedulers = append(candidateSchedulers, scheduler) + break + } + + // Scan the schedulers to find the first scheduler that supports feature. + if slices.Contains(scheduler.Features, features...) { candidateSchedulers = append(candidateSchedulers, scheduler) break } @@ -254,9 +260,15 @@ func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterI continue } - // Scan the schedulers to find the first scheduler that supports preheat. for _, scheduler := range schedulers { - if slices.Contains(scheduler.Features, types.SchedulerFeaturePreheat) { + // If the features is empty, return the first scheduler directly. + if len(features) == 0 { + candidateSchedulers = append(candidateSchedulers, scheduler) + break + } + + // Scan the schedulers to find the first scheduler that supports feature. + if slices.Contains(scheduler.Features, features...) { candidateSchedulers = append(candidateSchedulers, scheduler) break } diff --git a/pkg/slices/slices.go b/pkg/slices/slices.go index 4b8b42761dc..3d9965ec7f5 100644 --- a/pkg/slices/slices.go +++ b/pkg/slices/slices.go @@ -16,15 +16,20 @@ package slices -// Contains returns true if an element is present in a collection. -func Contains[T comparable](s []T, e T) bool { - for _, v := range s { - if v == e { - return true +// Contains returns true if elements is present in a collection. +func Contains[T comparable](s []T, els ...T) bool { + ss := make(map[T]struct{}, len(s)) + for _, el := range s { + ss[el] = struct{}{} + } + + for _, el := range els { + if _, found := ss[el]; !found { + return false } } - return false + return true } // FindDuplicate returns duplicate element in a collection. diff --git a/pkg/slices/slices_test.go b/pkg/slices/slices_test.go index cfb12cb31ef..0ed97fc6453 100644 --- a/pkg/slices/slices_test.go +++ b/pkg/slices/slices_test.go @@ -25,26 +25,38 @@ func TestContains(t *testing.T) { tests := []struct { name string input []int - element int + elements []int expected bool }{ { - name: "element present", + name: "single element present", input: []int{1, 2, 3}, - element: 2, + elements: []int{2}, expected: true, }, { - name: "element not present", + name: "single element not present", input: []int{1, 2, 3}, - element: 4, + elements: []int{4}, + expected: false, + }, + { + name: "multi elements present", + input: []int{1, 2, 3}, + elements: []int{2, 3}, + expected: true, + }, + { + name: "multi elements not present", + input: []int{1, 2, 3}, + elements: []int{2, 4}, expected: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := Contains(tt.input, tt.element) + result := Contains(tt.input, tt.elements...) if result != tt.expected { t.Errorf("expected %v, but got %v", tt.expected, result) } diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 2b2ae40a403..8a9d71a7d66 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -489,22 +489,32 @@ func (j *job) getTask(ctx context.Context, data string) (string, error) { } if err := validator.New().Struct(req); err != nil { - logger.Errorf("getTask %s validate failed: %s", req.TaskID, err.Error()) + logger.Errorf("get task %s validate failed: %s", req.TaskID, err.Error()) return "", err } + log := logger.WithTaskID(req.TaskID) + log.Infof("get task request: %#v", req) + task, ok := j.resource.TaskManager().Load(req.TaskID) if !ok { // Do not return error if task not found, just retunr empty response. - logger.Warnf("task %s not found", req.TaskID) - return internaljob.MarshalResponse(&internaljob.GetTaskResponse{}) + log.Warn("task not found") + return internaljob.MarshalResponse(&internaljob.GetTaskResponse{ + SchedulerClusterID: j.config.Manager.SchedulerClusterID, + }) } - // Convert peer struct to peer response. var peers []*internaljob.Peer for _, peer := range task.LoadPeers() { - peers = append(peers, convertPeer(peer)) - + peers = append(peers, &internaljob.Peer{ + ID: peer.ID, + Hostname: peer.Host.Hostname, + IP: peer.Host.IP, + HostType: peer.Host.Type.Name(), + CreatedAt: peer.CreatedAt.Load(), + UpdatedAt: peer.UpdatedAt.Load(), + }) } return internaljob.MarshalResponse(&internaljob.GetTaskResponse{ @@ -522,22 +532,27 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { } if err := validator.New().Struct(req); err != nil { - logger.Errorf("deleteTask %s validate failed: %s", req.TaskID, err.Error()) + logger.Errorf("delete task %s validate failed: %s", req.TaskID, err.Error()) return "", err } + log := logger.WithTaskID(req.TaskID) + log.Infof("delete task request: %#v", req) + ctx, cancel := context.WithTimeout(ctx, req.Timeout) defer cancel() task, ok := j.resource.TaskManager().Load(req.TaskID) if !ok { // Do not return error if task not found, just retunr empty response. - logger.Warnf("task %s not found", req.TaskID) - return internaljob.MarshalResponse(&internaljob.DeleteTaskResponse{}) + log.Warn("task not found") + return internaljob.MarshalResponse(&internaljob.DeleteTaskResponse{ + SchedulerClusterID: j.config.Manager.SchedulerClusterID, + }) } - successPeers := []*internaljob.DeleteSuccessPeer{} - failurePeers := []*internaljob.DeleteFailurePeer{} + successTasks := []*internaljob.DeleteSuccessTask{} + failureTasks := []*internaljob.DeleteFailureTask{} finishedPeers := task.LoadFinishedPeers() for _, finishedPeer := range finishedPeers { @@ -547,8 +562,10 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, addr) if err != nil { log.Errorf("get client from %s failed: %s", addr, err.Error()) - failurePeers = append(failurePeers, &internaljob.DeleteFailurePeer{ - Peer: *convertPeer(finishedPeer), + failureTasks = append(failureTasks, &internaljob.DeleteFailureTask{ + Hostname: finishedPeer.Host.Hostname, + IP: finishedPeer.Host.IP, + HostType: finishedPeer.Host.Type.Name(), Description: fmt.Sprintf("task %s failed: %s", req.TaskID, err.Error()), }) @@ -558,67 +575,28 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) { if err = dfdaemonClient.DeleteTask(ctx, &dfdaemonv2.DeleteTaskRequest{ TaskId: req.TaskID, }); err != nil { - logger.Errorf("delete task failed: %s", err.Error()) - failurePeers = append(failurePeers, &internaljob.DeleteFailurePeer{ - Peer: *convertPeer(finishedPeer), + log.Errorf("delete task failed: %s", err.Error()) + failureTasks = append(failureTasks, &internaljob.DeleteFailureTask{ + Hostname: finishedPeer.Host.Hostname, + IP: finishedPeer.Host.IP, + HostType: finishedPeer.Host.Type.Name(), Description: fmt.Sprintf("task %s failed: %s", req.TaskID, err.Error()), }) continue } - successPeers = append(successPeers, &internaljob.DeleteSuccessPeer{ - Peer: *convertPeer(finishedPeer), + task.DeletePeer(finishedPeer.ID) + successTasks = append(successTasks, &internaljob.DeleteSuccessTask{ + Hostname: finishedPeer.Host.Hostname, + IP: finishedPeer.Host.IP, + HostType: finishedPeer.Host.Type.Name(), }) } return internaljob.MarshalResponse(&internaljob.DeleteTaskResponse{ - FailurePeers: failurePeers, - SuccessPeers: successPeers, + SuccessTasks: successTasks, + FailureTasks: failureTasks, SchedulerClusterID: j.config.Manager.SchedulerClusterID, }) } - -func convertPeer(p *resource.Peer) *internaljob.Peer { - peer := &internaljob.Peer{ - ID: p.ID, - Config: p.Config, - Range: p.Range, - Priority: int32(p.Priority), - FinishedPieces: p.FinishedPieces, - PieceCosts: p.PieceCosts(), - NeedBackToSource: p.NeedBackToSource != nil && p.NeedBackToSource.Load(), - } - - if p.BlockParents != nil && p.BlockParents.Len() > 0 { - peer.BlockParents = p.BlockParents.Values() - } - - if p.Cost != nil { - peer.Cost = p.Cost.Load() - } - - if p.PieceUpdatedAt != nil { - peer.PieceUpdatedAt = p.PieceUpdatedAt.Load() - } - - if p.CreatedAt != nil { - peer.CreatedAt = p.CreatedAt.Load() - } - - if p.UpdatedAt != nil { - peer.UpdatedAt = p.UpdatedAt.Load() - } - - peer.Pieces = make(map[int32]*resource.Piece) - p.Pieces.Range(func(key, value interface{}) bool { - k, ok1 := key.(int32) - v, ok2 := value.(*resource.Piece) - if ok1 && ok2 { - peer.Pieces[k] = v - } - return true - }) - - return peer -} diff --git a/test/e2e/v2/manager/job.go b/test/e2e/v2/manager/job.go new file mode 100644 index 00000000000..39520e92adf --- /dev/null +++ b/test/e2e/v2/manager/job.go @@ -0,0 +1,59 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package manager + +import ( + "context" + "encoding/json" + "fmt" + "time" + + machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" + . "github.com/onsi/gomega" //nolint + + "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/test/e2e/v2/util" +) + +func waitForDone(job *models.Job, pod *util.PodExec) bool { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return false + case <-ticker.C: + out, err := pod.CurlCommand("", nil, nil, + fmt.Sprintf("http://dragonfly-manager.dragonfly-system.svc:8080/api/v1/jobs/%d", job.ID)).CombinedOutput() + fmt.Println(string(out)) + Expect(err).NotTo(HaveOccurred()) + err = json.Unmarshal(out, job) + Expect(err).NotTo(HaveOccurred()) + switch job.State { + case machineryv1tasks.StateSuccess: + return true + case machineryv1tasks.StateFailure: + return false + default: + } + } + } +} diff --git a/test/e2e/v2/manager/preheat.go b/test/e2e/v2/manager/preheat.go index c25c3566e9c..855c33d080d 100644 --- a/test/e2e/v2/manager/preheat.go +++ b/test/e2e/v2/manager/preheat.go @@ -17,12 +17,9 @@ package manager import ( - "context" "encoding/json" "fmt" - "time" - machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" . "github.com/onsi/ginkgo/v2" //nolint . "github.com/onsi/gomega" //nolint @@ -405,32 +402,3 @@ var _ = Describe("Preheat with Manager", func() { }) }) }) - -func waitForDone(job *models.Job, pod *util.PodExec) bool { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return false - case <-ticker.C: - out, err := pod.CurlCommand("", nil, nil, - fmt.Sprintf("http://dragonfly-manager.dragonfly-system.svc:8080/api/v1/jobs/%d", job.ID)).CombinedOutput() - fmt.Println(string(out)) - Expect(err).NotTo(HaveOccurred()) - err = json.Unmarshal(out, job) - Expect(err).NotTo(HaveOccurred()) - switch job.State { - case machineryv1tasks.StateSuccess: - return true - case machineryv1tasks.StateFailure: - return false - default: - } - } - } -}