diff --git a/api/gen/proto/go/metastore/v1/raft_log/raft_log.pb.go b/api/gen/proto/go/metastore/v1/raft_log/raft_log.pb.go index d98d173413..9cb8f84af2 100644 --- a/api/gen/proto/go/metastore/v1/raft_log/raft_log.pb.go +++ b/api/gen/proto/go/metastore/v1/raft_log/raft_log.pb.go @@ -347,6 +347,7 @@ type CompactionPlanUpdate struct { AssignedJobs []*AssignedCompactionJob `protobuf:"bytes,2,rep,name=assigned_jobs,json=assignedJobs,proto3" json:"assigned_jobs,omitempty"` UpdatedJobs []*UpdatedCompactionJob `protobuf:"bytes,3,rep,name=updated_jobs,json=updatedJobs,proto3" json:"updated_jobs,omitempty"` CompletedJobs []*CompletedCompactionJob `protobuf:"bytes,4,rep,name=completed_jobs,json=completedJobs,proto3" json:"completed_jobs,omitempty"` + EvictedJobs []*EvictedCompactionJob `protobuf:"bytes,5,rep,name=evicted_jobs,json=evictedJobs,proto3" json:"evicted_jobs,omitempty"` } func (x *CompactionPlanUpdate) Reset() { @@ -409,6 +410,13 @@ func (x *CompactionPlanUpdate) GetCompletedJobs() []*CompletedCompactionJob { return nil } +func (x *CompactionPlanUpdate) GetEvictedJobs() []*EvictedCompactionJob { + if x != nil { + return x.EvictedJobs + } + return nil +} + type NewCompactionJob struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -621,6 +629,53 @@ func (x *CompletedCompactionJob) GetCompactedBlocks() *v1.CompactedBlocks { return nil } +type EvictedCompactionJob struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + State *CompactionJobState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` +} + +func (x *EvictedCompactionJob) Reset() { + *x = EvictedCompactionJob{} + if protoimpl.UnsafeEnabled { + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EvictedCompactionJob) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EvictedCompactionJob) ProtoMessage() {} + +func (x *EvictedCompactionJob) ProtoReflect() protoreflect.Message { + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EvictedCompactionJob.ProtoReflect.Descriptor instead. +func (*EvictedCompactionJob) Descriptor() ([]byte, []int) { + return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{10} +} + +func (x *EvictedCompactionJob) GetState() *CompactionJobState { + if x != nil { + return x.State + } + return nil +} + // CompactionJobState is produced in response to // the compaction worker status update request. // @@ -644,7 +699,7 @@ type CompactionJobState struct { func (x *CompactionJobState) Reset() { *x = CompactionJobState{} if protoimpl.UnsafeEnabled { - mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[10] + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -657,7 +712,7 @@ func (x *CompactionJobState) String() string { func (*CompactionJobState) ProtoMessage() {} func (x *CompactionJobState) ProtoReflect() protoreflect.Message { - mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[10] + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -670,7 +725,7 @@ func (x *CompactionJobState) ProtoReflect() protoreflect.Message { // Deprecated: Use CompactionJobState.ProtoReflect.Descriptor instead. func (*CompactionJobState) Descriptor() ([]byte, []int) { - return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{10} + return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{11} } func (x *CompactionJobState) GetName() string { @@ -740,7 +795,7 @@ type CompactionJobPlan struct { func (x *CompactionJobPlan) Reset() { *x = CompactionJobPlan{} if protoimpl.UnsafeEnabled { - mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[11] + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -753,7 +808,7 @@ func (x *CompactionJobPlan) String() string { func (*CompactionJobPlan) ProtoMessage() {} func (x *CompactionJobPlan) ProtoReflect() protoreflect.Message { - mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[11] + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -766,7 +821,7 @@ func (x *CompactionJobPlan) ProtoReflect() protoreflect.Message { // Deprecated: Use CompactionJobPlan.ProtoReflect.Descriptor instead. func (*CompactionJobPlan) Descriptor() ([]byte, []int) { - return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{11} + return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{12} } func (x *CompactionJobPlan) GetName() string { @@ -824,7 +879,7 @@ type UpdateCompactionPlanRequest struct { func (x *UpdateCompactionPlanRequest) Reset() { *x = UpdateCompactionPlanRequest{} if protoimpl.UnsafeEnabled { - mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[12] + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -837,7 +892,7 @@ func (x *UpdateCompactionPlanRequest) String() string { func (*UpdateCompactionPlanRequest) ProtoMessage() {} func (x *UpdateCompactionPlanRequest) ProtoReflect() protoreflect.Message { - mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[12] + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -850,7 +905,7 @@ func (x *UpdateCompactionPlanRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use UpdateCompactionPlanRequest.ProtoReflect.Descriptor instead. func (*UpdateCompactionPlanRequest) Descriptor() ([]byte, []int) { - return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{12} + return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{13} } func (x *UpdateCompactionPlanRequest) GetTerm() uint64 { @@ -878,7 +933,7 @@ type UpdateCompactionPlanResponse struct { func (x *UpdateCompactionPlanResponse) Reset() { *x = UpdateCompactionPlanResponse{} if protoimpl.UnsafeEnabled { - mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[13] + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -891,7 +946,7 @@ func (x *UpdateCompactionPlanResponse) String() string { func (*UpdateCompactionPlanResponse) ProtoMessage() {} func (x *UpdateCompactionPlanResponse) ProtoReflect() protoreflect.Message { - mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[13] + mi := &file_metastore_v1_raft_log_raft_log_proto_msgTypes[14] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -904,7 +959,7 @@ func (x *UpdateCompactionPlanResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use UpdateCompactionPlanResponse.ProtoReflect.Descriptor instead. func (*UpdateCompactionPlanResponse) Descriptor() ([]byte, []int) { - return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{13} + return file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP(), []int{14} } func (x *UpdateCompactionPlanResponse) GetPlanUpdate() *CompactionPlanUpdate { @@ -955,7 +1010,7 @@ var file_metastore_v1_raft_log_raft_log_proto_rawDesc = []byte{ 0x64, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6c, 0x61, 0x6e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x0a, 0x70, 0x6c, 0x61, 0x6e, - 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x22, 0x9f, 0x02, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x70, 0x61, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x22, 0xe2, 0x02, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6c, 0x61, 0x6e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x35, 0x0a, 0x08, 0x6e, 0x65, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x4e, 0x65, 0x77, @@ -973,101 +1028,110 @@ var file_metastore_v1_raft_log_raft_log_proto_rawDesc = []byte{ 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x52, 0x0d, 0x63, 0x6f, 0x6d, 0x70, 0x6c, - 0x65, 0x74, 0x65, 0x64, 0x4a, 0x6f, 0x62, 0x73, 0x22, 0x77, 0x0a, 0x10, 0x4e, 0x65, 0x77, 0x43, - 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, 0x32, 0x0a, 0x05, - 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x61, - 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, - 0x12, 0x2f, 0x0a, 0x04, 0x70, 0x6c, 0x61, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, + 0x65, 0x74, 0x65, 0x64, 0x4a, 0x6f, 0x62, 0x73, 0x12, 0x41, 0x0a, 0x0c, 0x65, 0x76, 0x69, 0x63, + 0x74, 0x65, 0x64, 0x5f, 0x6a, 0x6f, 0x62, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, + 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x45, 0x76, 0x69, 0x63, 0x74, 0x65, + 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x52, 0x0b, + 0x65, 0x76, 0x69, 0x63, 0x74, 0x65, 0x64, 0x4a, 0x6f, 0x62, 0x73, 0x22, 0x77, 0x0a, 0x10, 0x4e, + 0x65, 0x77, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, + 0x32, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x04, 0x70, 0x6c, 0x61, - 0x6e, 0x22, 0x7c, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x43, 0x6f, 0x6d, + 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, + 0x61, 0x74, 0x65, 0x12, 0x2f, 0x0a, 0x04, 0x70, 0x6c, 0x61, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1b, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, + 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x04, + 0x70, 0x6c, 0x61, 0x6e, 0x22, 0x7c, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, + 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, 0x32, 0x0a, + 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, + 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, + 0x65, 0x12, 0x2f, 0x0a, 0x04, 0x70, 0x6c, 0x61, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1b, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x04, 0x70, 0x6c, + 0x61, 0x6e, 0x22, 0x4a, 0x0a, 0x14, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, 0x32, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, - 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x2f, - 0x0a, 0x04, 0x70, 0x6c, 0x61, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x72, - 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x04, 0x70, 0x6c, 0x61, 0x6e, 0x22, - 0x4a, 0x0a, 0x14, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, 0x32, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, - 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, - 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x96, 0x01, 0x0a, 0x16, - 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, 0x32, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, - 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, - 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x48, 0x0a, 0x10, 0x63, 0x6f, - 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, - 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, 0x42, 0x6c, 0x6f, - 0x63, 0x6b, 0x73, 0x52, 0x0f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, 0x42, 0x6c, - 0x6f, 0x63, 0x6b, 0x73, 0x22, 0x85, 0x02, 0x0a, 0x12, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, - 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, - 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, - 0x29, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x65, - 0x76, 0x65, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x63, 0x6f, 0x6d, 0x70, 0x61, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x39, 0x0a, 0x06, 0x73, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x21, 0x2e, 0x6d, 0x65, 0x74, - 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x6c, - 0x65, 0x61, 0x73, 0x65, 0x5f, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73, 0x5f, 0x61, 0x74, 0x18, - 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x45, 0x78, 0x70, 0x69, - 0x72, 0x65, 0x73, 0x41, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x64, 0x64, 0x65, 0x64, 0x5f, 0x61, - 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x64, 0x64, 0x65, 0x64, 0x41, 0x74, - 0x12, 0x1a, 0x0a, 0x08, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x73, 0x18, 0x07, 0x20, 0x01, - 0x28, 0x0d, 0x52, 0x08, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x73, 0x22, 0xdf, 0x01, 0x0a, - 0x11, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x50, 0x6c, - 0x61, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x12, 0x14, - 0x0a, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x73, - 0x68, 0x61, 0x72, 0x64, 0x12, 0x29, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, - 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, - 0x23, 0x0a, 0x0d, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, - 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x42, 0x6c, - 0x6f, 0x63, 0x6b, 0x73, 0x12, 0x38, 0x0a, 0x0a, 0x74, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, - 0x65, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, - 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, - 0x65, 0x73, 0x52, 0x0a, 0x74, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x73, 0x22, 0x72, - 0x0a, 0x1b, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, - 0x04, 0x74, 0x65, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, - 0x6d, 0x12, 0x3f, 0x0a, 0x0b, 0x70, 0x6c, 0x61, 0x6e, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, + 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x96, + 0x01, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x43, 0x6f, 0x6d, 0x70, + 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, 0x32, 0x0a, 0x05, 0x73, 0x74, 0x61, + 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, + 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, + 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x48, 0x0a, + 0x10, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, + 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, 0x64, + 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x52, 0x0f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x65, + 0x64, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x22, 0x4a, 0x0a, 0x14, 0x45, 0x76, 0x69, 0x63, 0x74, + 0x65, 0x64, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, + 0x32, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, + 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, + 0x61, 0x74, 0x65, 0x22, 0x85, 0x02, 0x0a, 0x12, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x29, + 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6c, 0x65, 0x76, + 0x65, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x39, 0x0a, 0x06, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x21, 0x2e, 0x6d, 0x65, 0x74, 0x61, + 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x28, 0x0a, 0x10, 0x6c, 0x65, + 0x61, 0x73, 0x65, 0x5f, 0x65, 0x78, 0x70, 0x69, 0x72, 0x65, 0x73, 0x5f, 0x61, 0x74, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x45, 0x78, 0x70, 0x69, 0x72, + 0x65, 0x73, 0x41, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x61, 0x64, 0x64, 0x65, 0x64, 0x5f, 0x61, 0x74, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x61, 0x64, 0x64, 0x65, 0x64, 0x41, 0x74, 0x12, + 0x1a, 0x0a, 0x08, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, + 0x0d, 0x52, 0x08, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x73, 0x22, 0xdf, 0x01, 0x0a, 0x11, + 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x50, 0x6c, 0x61, + 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x12, 0x14, 0x0a, + 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x73, 0x68, + 0x61, 0x72, 0x64, 0x12, 0x29, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x63, + 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x23, + 0x0a, 0x0d, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x73, 0x18, + 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x73, 0x12, 0x38, 0x0a, 0x0a, 0x74, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, + 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, + 0x73, 0x52, 0x0a, 0x74, 0x6f, 0x6d, 0x62, 0x73, 0x74, 0x6f, 0x6e, 0x65, 0x73, 0x22, 0x72, 0x0a, + 0x1b, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x74, 0x65, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x74, 0x65, 0x72, 0x6d, + 0x12, 0x3f, 0x0a, 0x0b, 0x70, 0x6c, 0x61, 0x6e, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, + 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6c, 0x61, 0x6e, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x0a, 0x70, 0x6c, 0x61, 0x6e, 0x55, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x22, 0x5f, 0x0a, 0x1c, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6d, 0x70, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x3f, 0x0a, 0x0b, 0x70, 0x6c, 0x61, 0x6e, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6c, 0x61, 0x6e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x0a, 0x70, 0x6c, 0x61, 0x6e, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x22, 0x5f, 0x0a, 0x1c, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x6f, 0x6d, 0x70, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6c, 0x61, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x0b, 0x70, 0x6c, 0x61, 0x6e, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, - 0x6f, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x6c, 0x61, - 0x6e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x0a, 0x70, 0x6c, 0x61, 0x6e, 0x55, 0x70, 0x64, - 0x61, 0x74, 0x65, 0x2a, 0xa2, 0x01, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6f, 0x6d, 0x6d, - 0x61, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, - 0x41, 0x4e, 0x44, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x23, 0x0a, - 0x1f, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x41, 0x44, - 0x44, 0x5f, 0x42, 0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4d, 0x45, 0x54, 0x41, 0x44, 0x41, 0x54, 0x41, - 0x10, 0x01, 0x12, 0x2b, 0x0a, 0x27, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, - 0x4e, 0x44, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, - 0x4e, 0x5f, 0x50, 0x4c, 0x41, 0x4e, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, - 0x27, 0x0a, 0x23, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, - 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, - 0x4e, 0x5f, 0x50, 0x4c, 0x41, 0x4e, 0x10, 0x03, 0x42, 0x9d, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, - 0x2e, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x42, 0x0c, 0x52, 0x61, 0x66, 0x74, 0x4c, - 0x6f, 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, - 0x72, 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, - 0x72, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0xa2, 0x02, - 0x03, 0x52, 0x58, 0x58, 0xaa, 0x02, 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0xca, 0x02, - 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0xe2, 0x02, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4c, - 0x6f, 0x67, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, - 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x65, 0x2a, 0xa2, 0x01, 0x0a, 0x0b, 0x52, 0x61, 0x66, 0x74, 0x43, 0x6f, 0x6d, 0x6d, 0x61, + 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, + 0x4e, 0x44, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x23, 0x0a, 0x1f, + 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x41, 0x44, 0x44, + 0x5f, 0x42, 0x4c, 0x4f, 0x43, 0x4b, 0x5f, 0x4d, 0x45, 0x54, 0x41, 0x44, 0x41, 0x54, 0x41, 0x10, + 0x01, 0x12, 0x2b, 0x0a, 0x27, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, + 0x44, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, + 0x5f, 0x50, 0x4c, 0x41, 0x4e, 0x5f, 0x55, 0x50, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, 0x27, + 0x0a, 0x23, 0x52, 0x41, 0x46, 0x54, 0x5f, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x55, + 0x50, 0x44, 0x41, 0x54, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x41, 0x43, 0x54, 0x49, 0x4f, 0x4e, + 0x5f, 0x50, 0x4c, 0x41, 0x4e, 0x10, 0x03, 0x42, 0x9d, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, 0x2e, + 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x42, 0x0c, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, + 0x67, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x66, 0x61, 0x6e, 0x61, 0x2f, 0x70, 0x79, 0x72, + 0x6f, 0x73, 0x63, 0x6f, 0x70, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x6d, 0x65, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, + 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0xa2, 0x02, 0x03, + 0x52, 0x58, 0x58, 0xaa, 0x02, 0x07, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0xca, 0x02, 0x07, + 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0xe2, 0x02, 0x13, 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, + 0x67, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x07, + 0x52, 0x61, 0x66, 0x74, 0x4c, 0x6f, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1083,7 +1147,7 @@ func file_metastore_v1_raft_log_raft_log_proto_rawDescGZIP() []byte { } var file_metastore_v1_raft_log_raft_log_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_metastore_v1_raft_log_raft_log_proto_msgTypes = make([]protoimpl.MessageInfo, 14) +var file_metastore_v1_raft_log_raft_log_proto_msgTypes = make([]protoimpl.MessageInfo, 15) var file_metastore_v1_raft_log_raft_log_proto_goTypes = []any{ (RaftCommand)(0), // 0: raft_log.RaftCommand (*AddBlockMetadataRequest)(nil), // 1: raft_log.AddBlockMetadataRequest @@ -1096,40 +1160,43 @@ var file_metastore_v1_raft_log_raft_log_proto_goTypes = []any{ (*AssignedCompactionJob)(nil), // 8: raft_log.AssignedCompactionJob (*UpdatedCompactionJob)(nil), // 9: raft_log.UpdatedCompactionJob (*CompletedCompactionJob)(nil), // 10: raft_log.CompletedCompactionJob - (*CompactionJobState)(nil), // 11: raft_log.CompactionJobState - (*CompactionJobPlan)(nil), // 12: raft_log.CompactionJobPlan - (*UpdateCompactionPlanRequest)(nil), // 13: raft_log.UpdateCompactionPlanRequest - (*UpdateCompactionPlanResponse)(nil), // 14: raft_log.UpdateCompactionPlanResponse - (*v1.BlockMeta)(nil), // 15: metastore.v1.BlockMeta - (v1.CompactionJobStatus)(0), // 16: metastore.v1.CompactionJobStatus - (*v1.CompactedBlocks)(nil), // 17: metastore.v1.CompactedBlocks - (*v1.Tombstones)(nil), // 18: metastore.v1.Tombstones + (*EvictedCompactionJob)(nil), // 11: raft_log.EvictedCompactionJob + (*CompactionJobState)(nil), // 12: raft_log.CompactionJobState + (*CompactionJobPlan)(nil), // 13: raft_log.CompactionJobPlan + (*UpdateCompactionPlanRequest)(nil), // 14: raft_log.UpdateCompactionPlanRequest + (*UpdateCompactionPlanResponse)(nil), // 15: raft_log.UpdateCompactionPlanResponse + (*v1.BlockMeta)(nil), // 16: metastore.v1.BlockMeta + (v1.CompactionJobStatus)(0), // 17: metastore.v1.CompactionJobStatus + (*v1.CompactedBlocks)(nil), // 18: metastore.v1.CompactedBlocks + (*v1.Tombstones)(nil), // 19: metastore.v1.Tombstones } var file_metastore_v1_raft_log_raft_log_proto_depIdxs = []int32{ - 15, // 0: raft_log.AddBlockMetadataRequest.metadata:type_name -> metastore.v1.BlockMeta + 16, // 0: raft_log.AddBlockMetadataRequest.metadata:type_name -> metastore.v1.BlockMeta 4, // 1: raft_log.GetCompactionPlanUpdateRequest.status_updates:type_name -> raft_log.CompactionJobStatusUpdate - 16, // 2: raft_log.CompactionJobStatusUpdate.status:type_name -> metastore.v1.CompactionJobStatus + 17, // 2: raft_log.CompactionJobStatusUpdate.status:type_name -> metastore.v1.CompactionJobStatus 6, // 3: raft_log.GetCompactionPlanUpdateResponse.plan_update:type_name -> raft_log.CompactionPlanUpdate 7, // 4: raft_log.CompactionPlanUpdate.new_jobs:type_name -> raft_log.NewCompactionJob 8, // 5: raft_log.CompactionPlanUpdate.assigned_jobs:type_name -> raft_log.AssignedCompactionJob 9, // 6: raft_log.CompactionPlanUpdate.updated_jobs:type_name -> raft_log.UpdatedCompactionJob 10, // 7: raft_log.CompactionPlanUpdate.completed_jobs:type_name -> raft_log.CompletedCompactionJob - 11, // 8: raft_log.NewCompactionJob.state:type_name -> raft_log.CompactionJobState - 12, // 9: raft_log.NewCompactionJob.plan:type_name -> raft_log.CompactionJobPlan - 11, // 10: raft_log.AssignedCompactionJob.state:type_name -> raft_log.CompactionJobState - 12, // 11: raft_log.AssignedCompactionJob.plan:type_name -> raft_log.CompactionJobPlan - 11, // 12: raft_log.UpdatedCompactionJob.state:type_name -> raft_log.CompactionJobState - 11, // 13: raft_log.CompletedCompactionJob.state:type_name -> raft_log.CompactionJobState - 17, // 14: raft_log.CompletedCompactionJob.compacted_blocks:type_name -> metastore.v1.CompactedBlocks - 16, // 15: raft_log.CompactionJobState.status:type_name -> metastore.v1.CompactionJobStatus - 18, // 16: raft_log.CompactionJobPlan.tombstones:type_name -> metastore.v1.Tombstones - 6, // 17: raft_log.UpdateCompactionPlanRequest.plan_update:type_name -> raft_log.CompactionPlanUpdate - 6, // 18: raft_log.UpdateCompactionPlanResponse.plan_update:type_name -> raft_log.CompactionPlanUpdate - 19, // [19:19] is the sub-list for method output_type - 19, // [19:19] is the sub-list for method input_type - 19, // [19:19] is the sub-list for extension type_name - 19, // [19:19] is the sub-list for extension extendee - 0, // [0:19] is the sub-list for field type_name + 11, // 8: raft_log.CompactionPlanUpdate.evicted_jobs:type_name -> raft_log.EvictedCompactionJob + 12, // 9: raft_log.NewCompactionJob.state:type_name -> raft_log.CompactionJobState + 13, // 10: raft_log.NewCompactionJob.plan:type_name -> raft_log.CompactionJobPlan + 12, // 11: raft_log.AssignedCompactionJob.state:type_name -> raft_log.CompactionJobState + 13, // 12: raft_log.AssignedCompactionJob.plan:type_name -> raft_log.CompactionJobPlan + 12, // 13: raft_log.UpdatedCompactionJob.state:type_name -> raft_log.CompactionJobState + 12, // 14: raft_log.CompletedCompactionJob.state:type_name -> raft_log.CompactionJobState + 18, // 15: raft_log.CompletedCompactionJob.compacted_blocks:type_name -> metastore.v1.CompactedBlocks + 12, // 16: raft_log.EvictedCompactionJob.state:type_name -> raft_log.CompactionJobState + 17, // 17: raft_log.CompactionJobState.status:type_name -> metastore.v1.CompactionJobStatus + 19, // 18: raft_log.CompactionJobPlan.tombstones:type_name -> metastore.v1.Tombstones + 6, // 19: raft_log.UpdateCompactionPlanRequest.plan_update:type_name -> raft_log.CompactionPlanUpdate + 6, // 20: raft_log.UpdateCompactionPlanResponse.plan_update:type_name -> raft_log.CompactionPlanUpdate + 21, // [21:21] is the sub-list for method output_type + 21, // [21:21] is the sub-list for method input_type + 21, // [21:21] is the sub-list for extension type_name + 21, // [21:21] is the sub-list for extension extendee + 0, // [0:21] is the sub-list for field type_name } func init() { file_metastore_v1_raft_log_raft_log_proto_init() } @@ -1259,7 +1326,7 @@ func file_metastore_v1_raft_log_raft_log_proto_init() { } } file_metastore_v1_raft_log_raft_log_proto_msgTypes[10].Exporter = func(v any, i int) any { - switch v := v.(*CompactionJobState); i { + switch v := v.(*EvictedCompactionJob); i { case 0: return &v.state case 1: @@ -1271,7 +1338,7 @@ func file_metastore_v1_raft_log_raft_log_proto_init() { } } file_metastore_v1_raft_log_raft_log_proto_msgTypes[11].Exporter = func(v any, i int) any { - switch v := v.(*CompactionJobPlan); i { + switch v := v.(*CompactionJobState); i { case 0: return &v.state case 1: @@ -1283,7 +1350,7 @@ func file_metastore_v1_raft_log_raft_log_proto_init() { } } file_metastore_v1_raft_log_raft_log_proto_msgTypes[12].Exporter = func(v any, i int) any { - switch v := v.(*UpdateCompactionPlanRequest); i { + switch v := v.(*CompactionJobPlan); i { case 0: return &v.state case 1: @@ -1295,6 +1362,18 @@ func file_metastore_v1_raft_log_raft_log_proto_init() { } } file_metastore_v1_raft_log_raft_log_proto_msgTypes[13].Exporter = func(v any, i int) any { + switch v := v.(*UpdateCompactionPlanRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_metastore_v1_raft_log_raft_log_proto_msgTypes[14].Exporter = func(v any, i int) any { switch v := v.(*UpdateCompactionPlanResponse); i { case 0: return &v.state @@ -1313,7 +1392,7 @@ func file_metastore_v1_raft_log_raft_log_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_metastore_v1_raft_log_raft_log_proto_rawDesc, NumEnums: 1, - NumMessages: 14, + NumMessages: 15, NumExtensions: 0, NumServices: 0, }, diff --git a/api/gen/proto/go/metastore/v1/raft_log/raft_log_vtproto.pb.go b/api/gen/proto/go/metastore/v1/raft_log/raft_log_vtproto.pb.go index 75da6e84ba..544877e0c1 100644 --- a/api/gen/proto/go/metastore/v1/raft_log/raft_log_vtproto.pb.go +++ b/api/gen/proto/go/metastore/v1/raft_log/raft_log_vtproto.pb.go @@ -153,6 +153,13 @@ func (m *CompactionPlanUpdate) CloneVT() *CompactionPlanUpdate { } r.CompletedJobs = tmpContainer } + if rhs := m.EvictedJobs; rhs != nil { + tmpContainer := make([]*EvictedCompactionJob, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.EvictedJobs = tmpContainer + } if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -241,6 +248,23 @@ func (m *CompletedCompactionJob) CloneMessageVT() proto.Message { return m.CloneVT() } +func (m *EvictedCompactionJob) CloneVT() *EvictedCompactionJob { + if m == nil { + return (*EvictedCompactionJob)(nil) + } + r := new(EvictedCompactionJob) + r.State = m.State.CloneVT() + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *EvictedCompactionJob) CloneMessageVT() proto.Message { + return m.CloneVT() +} + func (m *CompactionJobState) CloneVT() *CompactionJobState { if m == nil { return (*CompactionJobState)(nil) @@ -531,6 +555,23 @@ func (this *CompactionPlanUpdate) EqualVT(that *CompactionPlanUpdate) bool { } } } + if len(this.EvictedJobs) != len(that.EvictedJobs) { + return false + } + for i, vx := range this.EvictedJobs { + vy := that.EvictedJobs[i] + if p, q := vx, vy; p != q { + if p == nil { + p = &EvictedCompactionJob{} + } + if q == nil { + q = &EvictedCompactionJob{} + } + if !p.EqualVT(q) { + return false + } + } + } return string(this.unknownFields) == string(that.unknownFields) } @@ -632,6 +673,25 @@ func (this *CompletedCompactionJob) EqualMessageVT(thatMsg proto.Message) bool { } return this.EqualVT(that) } +func (this *EvictedCompactionJob) EqualVT(that *EvictedCompactionJob) bool { + if this == that { + return true + } else if this == nil || that == nil { + return false + } + if !this.State.EqualVT(that.State) { + return false + } + return string(this.unknownFields) == string(that.unknownFields) +} + +func (this *EvictedCompactionJob) EqualMessageVT(thatMsg proto.Message) bool { + that, ok := thatMsg.(*EvictedCompactionJob) + if !ok { + return false + } + return this.EqualVT(that) +} func (this *CompactionJobState) EqualVT(that *CompactionJobState) bool { if this == that { return true @@ -1034,6 +1094,18 @@ func (m *CompactionPlanUpdate) MarshalToSizedBufferVT(dAtA []byte) (int, error) i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.EvictedJobs) > 0 { + for iNdEx := len(m.EvictedJobs) - 1; iNdEx >= 0; iNdEx-- { + size, err := m.EvictedJobs[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0x2a + } + } if len(m.CompletedJobs) > 0 { for iNdEx := len(m.CompletedJobs) - 1; iNdEx >= 0; iNdEx-- { size, err := m.CompletedJobs[iNdEx].MarshalToSizedBufferVT(dAtA[:i]) @@ -1299,6 +1371,49 @@ func (m *CompletedCompactionJob) MarshalToSizedBufferVT(dAtA []byte) (int, error return len(dAtA) - i, nil } +func (m *EvictedCompactionJob) MarshalVT() (dAtA []byte, err error) { + if m == nil { + return nil, nil + } + size := m.SizeVT() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBufferVT(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *EvictedCompactionJob) MarshalToVT(dAtA []byte) (int, error) { + size := m.SizeVT() + return m.MarshalToSizedBufferVT(dAtA[:size]) +} + +func (m *EvictedCompactionJob) MarshalToSizedBufferVT(dAtA []byte) (int, error) { + if m == nil { + return 0, nil + } + i := len(dAtA) + _ = i + var l int + _ = l + if m.unknownFields != nil { + i -= len(m.unknownFields) + copy(dAtA[i:], m.unknownFields) + } + if m.State != nil { + size, err := m.State.MarshalToSizedBufferVT(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = protohelpers.EncodeVarint(dAtA, i, uint64(size)) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *CompactionJobState) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil @@ -1666,6 +1781,12 @@ func (m *CompactionPlanUpdate) SizeVT() (n int) { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } } + if len(m.EvictedJobs) > 0 { + for _, e := range m.EvictedJobs { + l = e.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + } n += len(m.unknownFields) return n } @@ -1744,6 +1865,20 @@ func (m *CompletedCompactionJob) SizeVT() (n int) { return n } +func (m *EvictedCompactionJob) SizeVT() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.State != nil { + l = m.State.SizeVT() + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } + n += len(m.unknownFields) + return n +} + func (m *CompactionJobState) SizeVT() (n int) { if m == nil { return 0 @@ -2491,6 +2626,40 @@ func (m *CompactionPlanUpdate) UnmarshalVT(dAtA []byte) error { return err } iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field EvictedJobs", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.EvictedJobs = append(m.EvictedJobs, &EvictedCompactionJob{}) + if err := m.EvictedJobs[len(m.EvictedJobs)-1].UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -2977,6 +3146,93 @@ func (m *CompletedCompactionJob) UnmarshalVT(dAtA []byte) error { } return nil } +func (m *EvictedCompactionJob) UnmarshalVT(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: EvictedCompactionJob: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: EvictedCompactionJob: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field State", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.State == nil { + m.State = &CompactionJobState{} + } + if err := m.State.UnmarshalVT(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := protohelpers.Skip(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return protohelpers.ErrInvalidLength + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.unknownFields = append(m.unknownFields, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *CompactionJobState) UnmarshalVT(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/api/metastore/v1/raft_log/raft_log.proto b/api/metastore/v1/raft_log/raft_log.proto index 13b4830d06..67ac279b93 100644 --- a/api/metastore/v1/raft_log/raft_log.proto +++ b/api/metastore/v1/raft_log/raft_log.proto @@ -46,6 +46,7 @@ message CompactionPlanUpdate { repeated AssignedCompactionJob assigned_jobs = 2; repeated UpdatedCompactionJob updated_jobs = 3; repeated CompletedCompactionJob completed_jobs = 4; + repeated EvictedCompactionJob evicted_jobs = 5; } message NewCompactionJob { @@ -67,6 +68,10 @@ message CompletedCompactionJob { metastore.v1.CompactedBlocks compacted_blocks = 2; } +message EvictedCompactionJob { + CompactionJobState state = 1; +} + // CompactionJobState is produced in response to // the compaction worker status update request. // diff --git a/pkg/experiment/metastore/compaction/README.md b/pkg/experiment/metastore/compaction/README.md index cb5413d396..c76eecd2a2 100644 --- a/pkg/experiment/metastore/compaction/README.md +++ b/pkg/experiment/metastore/compaction/README.md @@ -259,7 +259,14 @@ reports from workers, as jobs that cause workers to crash would yield no reports To avoid infinite reassignment loops, the scheduler keeps track of reassignments (failures) for each job. If the number of failures exceeds a set threshold, the job is not reassigned and remains at the bottom of the queue. Once the cause of -failure is resolved, the limit can be temporarily increased to reprocess these jobs. +failure is resolved, the error limit can be temporarily increased to reprocess these jobs. + +The scheduler queue has a size limit. Typically, the only scenario in which this limit is reached is when the compaction +process is not functioning correctly (e.g., due to a bug in the compaction procedure), preventing blocks from being +compacted and resulting in many jobs remaining in a failed state. Once the queue size limit is reached, failed jobs are +evicted, meaning the corresponding blocks will never be compacted. This may cause read amplification of the data queries +and bloat the metadata index. Therefore, the limit should be large enough. The recommended course of action is to roll +back or fix the bug and restart the compaction process, temporarily increasing the error limit if necessary. ### Job Completion diff --git a/pkg/experiment/metastore/compaction/compaction.go b/pkg/experiment/metastore/compaction/compaction.go index e79eb93118..b8a576f69a 100644 --- a/pkg/experiment/metastore/compaction/compaction.go +++ b/pkg/experiment/metastore/compaction/compaction.go @@ -55,6 +55,9 @@ type Schedule interface { UpdateJob(*raft_log.CompactionJobStatusUpdate) *raft_log.CompactionJobState // AssignJob is called on behalf of the worker to request a new job. AssignJob() (*raft_log.AssignedCompactionJob, error) + // EvictJob is called on behalf of the planner to evict jobs that cannot + // be assigned to workers, and free up resources for new jobs. + EvictJob() *raft_log.CompactionJobState // AddJob is called on behalf of the planner to add a new job to the schedule. // The scheduler may decline the job by returning a nil state. AddJob(*raft_log.CompactionJobPlan) *raft_log.CompactionJobState diff --git a/pkg/experiment/metastore/compaction/scheduler/metrics.go b/pkg/experiment/metastore/compaction/scheduler/metrics.go index a8de0458bb..529dd4d67a 100644 --- a/pkg/experiment/metastore/compaction/scheduler/metrics.go +++ b/pkg/experiment/metastore/compaction/scheduler/metrics.go @@ -13,6 +13,7 @@ type statsCollector struct { completedTotal *prometheus.Desc assignedTotal *prometheus.Desc reassignedTotal *prometheus.Desc + evictedTotal *prometheus.Desc // Gauge showing the job queue status breakdown. jobs *prometheus.Desc @@ -52,6 +53,11 @@ func newStatsCollector(s *Scheduler) *statsCollector { "The total number of jobs reassigned.", variableLabels, nil, ), + evictedTotal: prometheus.NewDesc( + schedulerQueueMetricsPrefix+"evicted_jobs_total", + "The total number of jobs evicted.", + variableLabels, nil, + ), } } @@ -61,6 +67,7 @@ func (c *statsCollector) Describe(ch chan<- *prometheus.Desc) { ch <- c.completedTotal ch <- c.assignedTotal ch <- c.reassignedTotal + ch <- c.evictedTotal } func (c *statsCollector) Collect(ch chan<- prometheus.Metric) { @@ -101,6 +108,7 @@ func (c *statsCollector) collectMetrics() []prometheus.Metric { stats.completedTotal = q.stats.completedTotal stats.assignedTotal = q.stats.assignedTotal stats.reassignedTotal = q.stats.reassignedTotal + stats.evictedTotal = q.stats.evictedTotal level := strconv.Itoa(i) metrics = append(metrics, @@ -112,6 +120,7 @@ func (c *statsCollector) collectMetrics() []prometheus.Metric { prometheus.MustNewConstMetric(c.completedTotal, prometheus.CounterValue, float64(stats.completedTotal), level), prometheus.MustNewConstMetric(c.assignedTotal, prometheus.CounterValue, float64(stats.assignedTotal), level), prometheus.MustNewConstMetric(c.reassignedTotal, prometheus.CounterValue, float64(stats.reassignedTotal), level), + prometheus.MustNewConstMetric(c.evictedTotal, prometheus.CounterValue, float64(stats.evictedTotal), level), ) } diff --git a/pkg/experiment/metastore/compaction/scheduler/schedule.go b/pkg/experiment/metastore/compaction/scheduler/schedule.go index e96e21c53a..bd6d88c15c 100644 --- a/pkg/experiment/metastore/compaction/scheduler/schedule.go +++ b/pkg/experiment/metastore/compaction/scheduler/schedule.go @@ -21,11 +21,11 @@ type schedule struct { // Read-only. scheduler *Scheduler // Uncommitted schedule updates. - updates map[string]*raft_log.CompactionJobState - addedJobs int + updates map[string]*raft_log.CompactionJobState + added int + evicted int // Modified copy of the job queue. copied []priorityJobQueue - level int } func (p *schedule) AssignJob() (*raft_log.AssignedCompactionJob, error) { @@ -42,6 +42,7 @@ func (p *schedule) AssignJob() (*raft_log.AssignedCompactionJob, error) { State: state, Plan: plan, } + p.evicted++ return assigned, nil } @@ -91,26 +92,35 @@ func (p *schedule) newStateForStatusReport(status *raft_log.CompactionJobStatusU return nil } +func (p *schedule) EvictJob() *raft_log.CompactionJobState { + limit := p.scheduler.config.MaxQueueSize + size := uint64(p.scheduler.queue.size() - p.evicted) + if limit == 0 || size < limit { + return nil + } + for level := 0; level < len(p.scheduler.queue.levels); level++ { + // We evict the job from our copy of the queue: each job is only + // accessible once. + pq := p.queueLevelCopy(level) + if pq.Len() != 0 { + job := heap.Pop(pq).(*jobEntry) + if p.isFailed(job) { + p.evicted++ + return job.CompactionJobState + } + heap.Push(pq, job) + } + } + return nil +} + // AddJob creates a state for the newly planned job. // // The method must be called after the last AssignJob and UpdateJob calls. // It returns an empty state if the queue size limit is reached. -// -// TODO(kolesnikovae): Implement displacement policy. -// When the scheduler queue is full, no new jobs can be added. Currently, -// it's possible that all jobs fail and can't be retried, and consequently, -// can't leave the queue, blocking the entire compaction process until the -// failure or queue limit is increased. Additionally, it's possible for a -// job to never be completed and thus remain in the queue indefinitely. -// -// One way to implement this is to evict the job with the highest number of -// failures (exceeding a configurable threshold, in addition to MaxFailures). -// This way, we can easily remove the job least likely to succeed. -// However, this needs to be handled explicitly in UpdateSchedule; at this -// point, we can only identify candidates for eviction. func (p *schedule) AddJob(plan *raft_log.CompactionJobPlan) *raft_log.CompactionJobState { if limit := p.scheduler.config.MaxQueueSize; limit > 0 { - if size := uint64(p.addedJobs + p.scheduler.queue.size()); size >= limit { + if size := uint64(p.added + p.scheduler.queue.size()); size >= limit { return nil } } @@ -122,22 +132,22 @@ func (p *schedule) AddJob(plan *raft_log.CompactionJobPlan) *raft_log.Compaction Token: p.token, } p.updates[state.Name] = state - p.addedJobs++ + p.added++ return state } func (p *schedule) nextAssignment() *raft_log.CompactionJobState { // We don't need to check the job ownership here: the worker asks // for a job assigment (new ownership). - for p.level < len(p.scheduler.queue.levels) { + for level := 0; level < len(p.scheduler.queue.levels); { // We evict the job from our copy of the queue: each job is only // accessible once. When we reach the bottom of the queue (the first // failed job, or the last job in the queue), we move to the next // level. Note that we check all in-progress jobs if there are not // enough unassigned jobs in the queue. - pq := p.queueLevelCopy(p.level) + pq := p.queueLevelCopy(level) if pq.Len() == 0 { - p.level++ + level++ continue } @@ -158,7 +168,8 @@ func (p *schedule) nextAssignment() *raft_log.CompactionJobState { case metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS: if p.isFailed(job) { // We reached the bottom of the queue: only failed jobs left. - p.level++ + heap.Push(pq, job) + level++ continue } if p.isAbandoned(job) { @@ -185,7 +196,7 @@ func (p *schedule) assignJob(e *jobEntry) *raft_log.CompactionJobState { } func (p *schedule) isAbandoned(job *jobEntry) bool { - return p.now.UnixNano() > job.LeaseExpiresAt + return !p.isFailed(job) && p.now.UnixNano() > job.LeaseExpiresAt } func (p *schedule) isFailed(job *jobEntry) bool { diff --git a/pkg/experiment/metastore/compaction/scheduler/schedule_test.go b/pkg/experiment/metastore/compaction/scheduler/schedule_test.go index 235e1e87d7..c2aade383e 100644 --- a/pkg/experiment/metastore/compaction/scheduler/schedule_test.go +++ b/pkg/experiment/metastore/compaction/scheduler/schedule_test.go @@ -343,9 +343,9 @@ func TestSchedule_Add(t *testing.T) { func TestSchedule_QueueSizeLimit(t *testing.T) { store := new(mockscheduler.MockJobStore) config := Config{ + MaxQueueSize: 2, MaxFailures: 3, LeaseDuration: 10 * time.Second, - MaxQueueSize: 2, } scheduler := NewScheduler(config, store, nil) @@ -367,3 +367,137 @@ func TestSchedule_QueueSizeLimit(t *testing.T) { assert.Nil(t, s.AddJob(plans[2])) }) } + +func TestSchedule_AssignEvict(t *testing.T) { + store := new(mockscheduler.MockJobStore) + config := Config{ + MaxQueueSize: 2, + MaxFailures: 3, + LeaseDuration: 10 * time.Second, + } + + scheduler := NewScheduler(config, store, nil) + plans := []*raft_log.CompactionJobPlan{ + {Name: "3"}, + } + for _, p := range plans { + store.On("GetJobPlan", mock.Anything, p.Name).Return(p, nil) + } + + states := []*raft_log.CompactionJobState{ + {Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + {Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + {Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 0}, + } + for _, s := range states { + scheduler.queue.put(s) + } + + test.AssertIdempotent(t, func(t *testing.T) { + updatedAt := time.Second * 20 + s := scheduler.NewSchedule(nil, &raft.Log{Index: 2, AppendedAt: time.Unix(0, int64(updatedAt))}) + // Eviction is only possible when no jobs are available for assignment. + assert.Nil(t, s.EvictJob()) + // Assign all the available jobs. + update, err := s.AssignJob() + require.NoError(t, err) + assert.Equal(t, "3", update.State.Name) + update, err = s.AssignJob() + require.NoError(t, err) + assert.Nil(t, update) + // Now that no jobs can be assigned, we can try eviction. + assert.NotNil(t, s.EvictJob()) + assert.Nil(t, s.EvictJob()) + }) +} + +func TestSchedule_Evict(t *testing.T) { + store := new(mockscheduler.MockJobStore) + config := Config{ + MaxQueueSize: 2, + MaxFailures: 3, + LeaseDuration: 10 * time.Second, + } + + scheduler := NewScheduler(config, store, nil) + states := []*raft_log.CompactionJobState{ + {Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + {Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + {Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + } + for _, s := range states { + scheduler.queue.put(s) + } + + test.AssertIdempotent(t, func(t *testing.T) { + updatedAt := time.Second * 20 + s := scheduler.NewSchedule(nil, &raft.Log{Index: 2, AppendedAt: time.Unix(0, int64(updatedAt))}) + // Eviction is only possible when no jobs are available for assignment. + update, err := s.AssignJob() + require.NoError(t, err) + assert.Nil(t, update) + assert.NotNil(t, s.EvictJob()) + assert.NotNil(t, s.EvictJob()) + assert.Nil(t, s.EvictJob()) + }) +} + +func TestSchedule_NoEvict(t *testing.T) { + store := new(mockscheduler.MockJobStore) + config := Config{ + MaxQueueSize: 5, + MaxFailures: 3, + LeaseDuration: 10 * time.Second, + } + + scheduler := NewScheduler(config, store, nil) + states := []*raft_log.CompactionJobState{ + {Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + {Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + {Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + } + for _, s := range states { + scheduler.queue.put(s) + } + + test.AssertIdempotent(t, func(t *testing.T) { + updatedAt := time.Second * 20 + s := scheduler.NewSchedule(nil, &raft.Log{Index: 2, AppendedAt: time.Unix(0, int64(updatedAt))}) + // Eviction is only possible when no jobs are available for assignment. + update, err := s.AssignJob() + require.NoError(t, err) + assert.Nil(t, update) + // Eviction is only possible when the queue size limit is reached. + assert.Nil(t, s.EvictJob()) + }) +} + +func TestSchedule_NoEvictNoQueueSizeLimit(t *testing.T) { + store := new(mockscheduler.MockJobStore) + config := Config{ + MaxQueueSize: 0, + MaxFailures: 3, + LeaseDuration: 10 * time.Second, + } + + scheduler := NewScheduler(config, store, nil) + states := []*raft_log.CompactionJobState{ + {Name: "1", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + {Name: "2", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + {Name: "3", Status: metastorev1.CompactionJobStatus_COMPACTION_STATUS_IN_PROGRESS, Token: 1, LeaseExpiresAt: 0, Failures: 3}, + } + for _, s := range states { + scheduler.queue.put(s) + } + + test.AssertIdempotent(t, func(t *testing.T) { + updatedAt := time.Second * 20 + s := scheduler.NewSchedule(nil, &raft.Log{Index: 2, AppendedAt: time.Unix(0, int64(updatedAt))}) + // Eviction is only possible when no jobs are available for assignment. + update, err := s.AssignJob() + require.NoError(t, err) + assert.Nil(t, update) + // Eviction is not possible if the queue size limit is not set. + assert.Nil(t, s.EvictJob()) + }) +} diff --git a/pkg/experiment/metastore/compaction/scheduler/scheduler.go b/pkg/experiment/metastore/compaction/scheduler/scheduler.go index 5de49cb587..bf39286730 100644 --- a/pkg/experiment/metastore/compaction/scheduler/scheduler.go +++ b/pkg/experiment/metastore/compaction/scheduler/scheduler.go @@ -51,7 +51,7 @@ type Config struct { func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Uint64Var(&c.MaxFailures, prefix+"compaction-max-failures", 3, "") f.DurationVar(&c.LeaseDuration, prefix+"compaction-job-lease-duration", 15*time.Second, "") - f.Uint64Var(&c.MaxQueueSize, prefix+"compaction-max-job-queue-size", 2000, "") + f.Uint64Var(&c.MaxQueueSize, prefix+"compaction-max-job-queue-size", 10000, "") } type Scheduler struct { @@ -95,6 +95,17 @@ func (sc *Scheduler) UpdateSchedule(tx *bbolt.Tx, update *raft_log.CompactionPla sc.mu.Lock() defer sc.mu.Unlock() + for _, job := range update.EvictedJobs { + name := job.State.Name + if err := sc.store.DeleteJobPlan(tx, name); err != nil { + return err + } + if err := sc.store.DeleteJobState(tx, name); err != nil { + return err + } + sc.queue.evict(name) + } + for _, job := range update.NewJobs { if err := sc.store.StoreJobPlan(tx, job.Plan); err != nil { return err diff --git a/pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go b/pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go index 21981a4542..ce399ef4d5 100644 --- a/pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go +++ b/pkg/experiment/metastore/compaction/scheduler/scheduler_queue.go @@ -40,11 +40,24 @@ func (q *schedulerQueue) put(state *raft_log.CompactionJobState) { func (q *schedulerQueue) delete(name string) *raft_log.CompactionJobState { if e, exists := q.jobs[name]; exists { delete(q.jobs, name) - return q.level(e.CompactionLevel).delete(e) + level := q.level(e.CompactionLevel) + level.delete(e) + level.stats.completedTotal++ + return e.CompactionJobState } return nil } +// evict is identical to delete, but it updates the eviction stats. +func (q *schedulerQueue) evict(name string) { + if e, exists := q.jobs[name]; exists { + delete(q.jobs, name) + level := q.level(e.CompactionLevel) + level.delete(e) + level.stats.evictedTotal++ + } +} + func (q *schedulerQueue) size() int { var size int for _, level := range q.levels { @@ -90,6 +103,7 @@ type queueStats struct { completedTotal uint32 assignedTotal uint32 reassignedTotal uint32 + evictedTotal uint32 // Gauges. Updated periodically. assigned uint32 unassigned uint32 diff --git a/pkg/experiment/metastore/compaction/scheduler/scheduler_test.go b/pkg/experiment/metastore/compaction/scheduler/scheduler_test.go index 6bf5f9ed2a..91bec3239b 100644 --- a/pkg/experiment/metastore/compaction/scheduler/scheduler_test.go +++ b/pkg/experiment/metastore/compaction/scheduler/scheduler_test.go @@ -19,7 +19,9 @@ func TestScheduler_UpdateSchedule(t *testing.T) { store.On("StoreJobState", mock.Anything, &raft_log.CompactionJobState{Name: "1"}).Return(nil).Once() store.On("StoreJobState", mock.Anything, &raft_log.CompactionJobState{Name: "2"}).Return(nil).Once() store.On("DeleteJobPlan", mock.Anything, "3").Return(nil).Once() + store.On("DeleteJobPlan", mock.Anything, "4").Return(nil).Once() store.On("DeleteJobState", mock.Anything, "3").Return(nil).Once() + store.On("DeleteJobState", mock.Anything, "4").Return(nil).Once() scheduler := NewScheduler(Config{}, store, nil) scheduler.queue.put(&raft_log.CompactionJobState{Name: "1", Token: 1}) @@ -37,6 +39,9 @@ func TestScheduler_UpdateSchedule(t *testing.T) { CompletedJobs: []*raft_log.CompletedCompactionJob{{ State: &raft_log.CompactionJobState{Name: "3"}, }}, + EvictedJobs: []*raft_log.EvictedCompactionJob{{ + State: &raft_log.CompactionJobState{Name: "4"}, + }}, }) require.NoError(t, err) diff --git a/pkg/experiment/metastore/compaction_raft_handler.go b/pkg/experiment/metastore/compaction_raft_handler.go index 51822b2398..487d275fd7 100644 --- a/pkg/experiment/metastore/compaction_raft_handler.go +++ b/pkg/experiment/metastore/compaction_raft_handler.go @@ -54,7 +54,7 @@ func (h *CompactionCommandHandler) GetCompactionPlanUpdate( // report from the worker. The plan will be used to update the schedule // after the Raft consensus is reached. planner := h.planner.NewPlan(cmd) - scheduler := h.scheduler.NewSchedule(tx, cmd) + schedule := h.scheduler.NewSchedule(tx, cmd) p := new(raft_log.CompactionPlanUpdate) // Any status update may translate to either a job lease refresh, or a @@ -63,7 +63,7 @@ func (h *CompactionCommandHandler) GetCompactionPlanUpdate( // assignments, therefore we try to update jobs' status first. var revoked int for _, status := range req.StatusUpdates { - switch state := scheduler.UpdateJob(status); { + switch state := schedule.UpdateJob(status); { case state == nil: // Nil state indicates that the job has been abandoned and // reassigned, or the request is not valid. This may happen @@ -95,19 +95,25 @@ func (h *CompactionCommandHandler) GetCompactionPlanUpdate( // ones. If we change it, we need to make sure that the Schedule // implementation allows doing this. for assigned := 0; assigned < capacity; assigned++ { - job, err := scheduler.AssignJob() + job, err := schedule.AssignJob() if err != nil { level.Error(h.logger).Log("msg", "failed to assign compaction job", "err", err) return nil, err } - if job == nil { - // No more jobs to assign. - break + if job != nil { + p.AssignedJobs = append(p.AssignedJobs, job) + continue } - p.AssignedJobs = append(p.AssignedJobs, job) } for created := 0; created < capacity; created++ { + // Evict jobs that cannot be assigned to workers. + if evicted := schedule.EvictJob(); evicted != nil { + level.Debug(h.logger).Log("msg", "planning to evict failed job", "job", evicted.Name) + p.EvictedJobs = append(p.EvictedJobs, &raft_log.EvictedCompactionJob{ + State: evicted, + }) + } plan, err := planner.CreateJob() if err != nil { level.Error(h.logger).Log("msg", "failed to create compaction job", "err", err) @@ -117,10 +123,13 @@ func (h *CompactionCommandHandler) GetCompactionPlanUpdate( // No more jobs to create. break } - state := scheduler.AddJob(plan) + state := schedule.AddJob(plan) if state == nil { // Scheduler declined the job. The only case when this may happen - // is when the scheduler queue is full. + // is when the scheduler queue is full; theoretically, this should + // not happen, because we evicted jobs before creating new ones. + // However, if all the jobs are healthy, we may end up here. + level.Warn(h.logger).Log("msg", "failed to create compaction job", "err", err) break } p.NewJobs = append(p.NewJobs, &raft_log.NewCompactionJob{