Skip to content

Commit dad817d

Browse files
author
Cairry
committed
🚀 Optimized alarm event grouping and processing logic
1 parent 0136efd commit dad817d

File tree

4 files changed

+52
-85
lines changed

4 files changed

+52
-85
lines changed

alert/consumer/consumer.go

Lines changed: 43 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"regexp"
77
"runtime/debug"
8-
"sort"
98
"sync"
109
"time"
1110
"watchAlert/alert/mute"
@@ -23,6 +22,10 @@ const (
2322

2423
// 默认处理时间间隔
2524
DefaultProcessTime = 1
25+
26+
// 状态前缀
27+
RecoverStatePrefix = "Recover_"
28+
FiringStatePrefix = "Firing_"
2629
)
2730

2831
type (
@@ -46,12 +49,12 @@ type (
4649

4750
RulesGroup struct {
4851
RuleID string // 规则组 ID
49-
Groups []EventsGroup
52+
Groups map[string]EventsGroup
5053
}
5154

5255
AlertGroups struct {
53-
Rules []RulesGroup // 告警事件列表, 根据规则划分组
54-
lock sync.Mutex
56+
Rules map[string]RulesGroup
57+
lock sync.RWMutex
5558
}
5659
)
5760

@@ -62,41 +65,37 @@ func (ag *AlertGroups) AddAlert(stateId string, alert *models.AlertCurEvent, fau
6265

6366
// 获取通知对象 ID 列表 用于事件分组
6467
noticeObjIds := ag.getNoticeId(alert, faultCenter)
68+
if len(noticeObjIds) == 0 {
69+
return // 如果没有通知对象ID,则跳过
70+
}
6571

6672
for _, noticeObjId := range noticeObjIds {
67-
// 查找 Rule 位置
68-
rulePos := ag.getRuleNodePos(stateId)
69-
70-
// Rule 存在时的处理,找到对应的规则组
71-
if rulePos < len(ag.Rules) && ag.Rules[rulePos].RuleID == stateId {
72-
rule := &ag.Rules[rulePos]
73-
74-
// 查找 Group 位置
75-
groupPos := ag.getGroupNodePos(rule, noticeObjId)
76-
77-
if groupPos < len(rule.Groups) && (rule.Groups)[groupPos].NoticeID == noticeObjId {
78-
// 追加事件
79-
(rule.Groups)[groupPos].Events = append((rule.Groups)[groupPos].Events, alert)
80-
} else {
81-
// 实例化新的 EventGroup
82-
rule.Groups = append(rule.Groups, EventsGroup{
83-
NoticeID: noticeObjId,
84-
Events: []*models.AlertCurEvent{alert},
85-
})
86-
}
87-
continue
88-
} else {
89-
// 实例化新的 RuleGroup
90-
ag.Rules = append(ag.Rules, RulesGroup{
73+
// 检查 Rule 是否存在
74+
rule, exists := ag.Rules[stateId]
75+
if !exists {
76+
// 创建新的 RuleGroup
77+
ag.Rules[stateId] = RulesGroup{
9178
RuleID: stateId,
92-
Groups: []EventsGroup{
93-
{
94-
NoticeID: noticeObjId,
95-
Events: []*models.AlertCurEvent{alert},
96-
},
97-
},
98-
})
79+
Groups: make(map[string]EventsGroup),
80+
}
81+
rule = ag.Rules[stateId]
9982
}
83+
84+
// 检查 Group 是否存在
85+
group, groupExists := rule.Groups[noticeObjId]
86+
if !groupExists {
87+
// 创建新的 EventsGroup
88+
rule.Groups[noticeObjId] = EventsGroup{
89+
NoticeID: noticeObjId,
90+
Events: []*models.AlertCurEvent{},
91+
}
92+
group = rule.Groups[noticeObjId]
93+
}
94+
95+
// 添加事件到对应组
96+
group.Events = append(group.Events, alert)
97+
// 更新 group 映射
98+
ag.Rules[stateId].Groups[noticeObjId] = group
10099
}
101100
}
102101

@@ -120,32 +119,6 @@ func (ag *AlertGroups) getNoticeId(alert *models.AlertCurEvent, faultCenter mode
120119
return faultCenter.NoticeIds
121120
}
122121

123-
// getRuleNodePos 获取 Rule 点位
124-
func (ag *AlertGroups) getRuleNodePos(ruleId string) int {
125-
// Rules 切片排序
126-
sort.Slice(ag.Rules, func(i, j int) bool {
127-
return ag.Rules[i].RuleID < ag.Rules[j].RuleID
128-
})
129-
130-
// 查找Rule位置
131-
return sort.Search(len(ag.Rules), func(i int) bool {
132-
return ag.Rules[i].RuleID >= ruleId
133-
})
134-
}
135-
136-
// getGroupNodePos 获取 Event 点位
137-
func (ag *AlertGroups) getGroupNodePos(rule *RulesGroup, groupId string) int {
138-
// Groups 切片排序
139-
sort.Slice(rule.Groups, func(i, j int) bool {
140-
return rule.Groups[i].NoticeID < rule.Groups[j].NoticeID
141-
})
142-
143-
// 查找Group位置
144-
return sort.Search(len(rule.Groups), func(i int) bool {
145-
return (rule.Groups)[i].NoticeID >= groupId
146-
})
147-
}
148-
149122
func NewConsumerWork(ctx *ctx.Context) ConsumeInterface {
150123
return &Consume{
151124
ctx: ctx,
@@ -213,14 +186,16 @@ func (c *Consume) executeTask(faultCenter models.FaultCenter, taskChan chan stru
213186
// 获取故障中心的所有告警事件
214187
data, err := c.ctx.Redis.Alert().GetAllEvents(models.BuildAlertEventCacheKey(faultCenter.TenantId, faultCenter.ID))
215188
if err != nil {
216-
logc.Error(c.ctx.Ctx, fmt.Sprintf("从 Redis 中获取事件信息错误, faultCenterKey: %s, err: %s", models.BuildAlertEventCacheKey(faultCenter.TenantId, faultCenter.ID), err.Error()))
189+
logc.Errorf(c.ctx.Ctx, "从 Redis 中获取事件信息错误, faultCenterKey: %s, err: %s", models.BuildAlertEventCacheKey(faultCenter.TenantId, faultCenter.ID), err.Error())
217190
return
218191
}
219192

220193
// 事件过滤
221194
filterEvents := c.filterAlertEvents(faultCenter, data)
222195
// 事件分组
223-
var alertGroups AlertGroups
196+
alertGroups := AlertGroups{
197+
Rules: make(map[string]RulesGroup),
198+
}
224199
c.alarmGrouping(faultCenter, &alertGroups, filterEvents)
225200
// 发送事件
226201
c.sendAlerts(faultCenter, &alertGroups)
@@ -297,13 +272,10 @@ func (c *Consume) alarmGrouping(faultCenter models.FaultCenter, alertGroups *Ale
297272
for _, alert := range alerts {
298273
// 状态+规则 = 状态 ID
299274
var stateId string
300-
switch alert.IsRecovered {
301-
case true:
302-
stateId = "Recover_" + alert.RuleId
303-
case false:
304-
stateId = "Firing_" + alert.RuleId
305-
default:
306-
stateId = "Unknown_" + alert.RuleId
275+
if alert.IsRecovered {
276+
stateId = RecoverStatePrefix + alert.RuleId
277+
} else {
278+
stateId = FiringStatePrefix + alert.RuleId
307279
}
308280

309281
alertGroups.AddAlert(stateId, alert, faultCenter)
@@ -335,7 +307,7 @@ func (c *Consume) processAlertGroup(faultCenter models.FaultCenter, noticeId str
335307
g.Go(func() error { return process.HandleAlert(c.ctx, "alarm", faultCenter, noticeId, alerts) })
336308

337309
if err := g.Wait(); err != nil {
338-
logc.Error(c.ctx.Ctx, fmt.Sprintf("Alert group processing failed: %v", err))
310+
logc.Errorf(c.ctx.Ctx, "Alert group processing failed: %v", err)
339311
}
340312
}
341313

alert/eval/query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func metrics(ctx *ctx.Context, datasourceId, datasourceType string, rule models.
145145
// 更新恢复时最新值
146146
cache, err := ctx.Redis.Alert().GetEventFromCache(event.TenantId, event.FaultCenterId, event.Fingerprint)
147147
if err == nil {
148-
if !cache.IsRecovered || cache.Status != models.StateRecovered {
148+
if !cache.IsRecovered && cache.Status != models.StateRecovered {
149149
event.Labels["value"] = v.GetValue()
150150
process.PushEventToFaultCenter(ctx, &event)
151151
}

alert/process/handle.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,17 @@ func withRuleGroupByAlerts(ctx *ctx.Context, timeInt int64, alerts []*models.Ale
128128
return alerts
129129
}
130130

131-
var aggregatedAlert *models.AlertCurEvent
132131
for i := range alerts {
133132
alert := alerts[i]
134-
if !strings.Contains(alert.Annotations, "聚合") {
135-
alert.Annotations += fmt.Sprintf("\n聚合 %d 条告警\n", len(alerts))
136-
}
137-
aggregatedAlert = alert
138-
139133
if !alert.IsRecovered {
140134
alert.LastSendTime = timeInt
141135
ctx.Redis.Alert().PushAlertEvent(alert)
142136
}
143137
}
144138

139+
aggregatedAlert := alerts[0]
140+
aggregatedAlert.Annotations += fmt.Sprintf("\n聚合 %d 条消息,详情请前往 WatchAlert 查看\n", len(alerts))
141+
145142
return []*models.AlertCurEvent{aggregatedAlert}
146143
}
147144

internal/models/alert_current_event.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,6 @@ func (alert *AlertCurEvent) TransitionStatus(newStatus AlertStatus) error {
7777
return err
7878
}
7979

80-
// 更新状态
81-
alert.Status = newStatus
82-
8380
return nil
8481
}
8582

@@ -120,19 +117,20 @@ func (alert *AlertCurEvent) validateTransition(newState AlertStatus) error {
120117
func (alert *AlertCurEvent) handleStateTransition(newState AlertStatus) error {
121118
now := time.Now().Unix()
122119

120+
// 更新状态
121+
alert.Status = newState
122+
123123
switch newState {
124124
case StatePreAlert:
125125
alert.FirstTriggerTime = now
126126
alert.LastEvalTime = now
127+
127128
case StateAlerting:
128129
case StateRecovered:
129-
if alert.IsRecovered == true && alert.Status == StateRecovered {
130-
return nil
131-
}
132-
133130
alert.LastSendTime = 0
134131
alert.RecoverTime = now
135132
alert.IsRecovered = true
133+
136134
case StateSilenced:
137135
}
138136

0 commit comments

Comments
 (0)