功能

judge 模块主要从transfer中接收数据,并从HBS中获取报警策略,然后进行阈值报警判断

  • 从HBS获取报警策略

  • 接收transfer 上报的数据,并存储最新几个点

  • 判断阈值,产生报警事件

  • 判断报警事件是否写入redis

  • 老旧报警数据的清理

配置文件


{ "debug": true, "debugHost": "nil", #用于调适,在log中打印指定host的具体策略 "remain": 11, #指定内存中存放每个metric数据点的个数。11个表示内存中存放10个点 "http": { "enabled": true, "listen": "0.0.0.0:6081" }, "rpc": { "enabled": true, "listen": "0.0.0.0:6080" }, "hbs": { "servers": ["127.0.0.1:6030"], # hbs最好放到lvs vip后面,所以此处最好配置为vip:port "timeout": 300, "interval": 60 }, "alarm": { "enabled": true, "minInterval": 300, # 连续两个报警之间至少相隔的秒数,维持默认即可 "queuePattern": "event:p%v", "redis": { "dsn": "127.0.0.1:6379", # 与alarm、sender使用一个redis "maxIdle": 5, "connTimeout": 5000, "readTimeout": 5000, "writeTimeout": 5000 } } }

处理逻辑

策略同步

1.judge启动之后会实例化RPC连接池,初始化RPCclient

  1. 向HBS请求监控策略,此时HBS返回的数据为
  • strategy

{ "id": 228, "result": { "hostStrategies": [ { "hostname": "open-falcon-1", "strategies": [ { "id": 5, "metric": "load.1min", "tags": { }, "func": "all(#3)", "operator": "\u003e", "rightValue": 40, "maxStep": 3, "priorit y": 0, "note": "", "tpl": { "id": 3, "name": "test", "parentId": 1, "actionId": 0, "creator": "root" } } ] }, { "hostname": "open-falcon-2", "strategies": [ { "id": 4, "metric": "proc.num", "tags": { "name": "mysql" }, "func": "all(#3)", "operator": "==", "rightValue": 0, "maxStep": 3, "priority": 0, "note": "", "tpl": { "id": 2, "name": "com", "parentId": 0, "actionId": 3, "creator": "root" } }, { "id": 2, "metric": "net.port.listen", "tags": { "port": "80" }, "func ": "all(#3)", "operator": "==", "rightValue": 0, "maxStep": 3, "priority": 0, "note": "", "tpl": { "id": 2, "name": "com", "parentId": 0, "actionId": 3, "creator": "root" } } ] } ] }, "error": null }
  • expression

{ "id": 229, "result": { "expressions": [ { "id": 1, "metric": "cpu.idle", "tags": { "module": "123" }, "func": "all(#3)", "operator": "==", "rightValue": 0, "maxStep": 3, "priority": 0, "note": "", "actionId": 2 } ] }, "error ": null }

3.规整缓存策略。judge实现可以更加快速的进行阈值判断,则需要在收到transfer数据之后尽快找到该数据对应的策略,所以对数据进行规整。分别用两个大Map存放规整后的数据,strategy和expression

  • strategy

key:endpoint/metric,value:[strategy1,strategy2,...]

  • expression

key: metric/tag_k=tag_v.value: [strategy1,strategy2,...],如果多条tag,则会生成多个key

如 each(metric=qps,project=adv,module=nginx),规整后的数据为:

    qps/project=adv

    qps/module=nginx

历史数据存储

  1. judge在启动之后,首先会初始化一个historyBigmap。用于存放所有metric remain数量的数据

  2. judge 接收transfer上报的数据。数据格式为

  3. judge 根据endpoint、metric、sorted(tags) 计算md5值,然后组成一个map1 ,格式为:key:md5值,value:具体采集数据

  4. 最后将map1的数据存到historyBigmap中,此时 key 为 mao1 的key的前两位即md5_v[0:2],value为[map1,map1map1..]

可见,最终在内存中存的数据为

 bigmap: {md5[0:2]:[map1,map1]}

报警逻辑

报警条件

  • all(#3)/min(#3)/max(#3)/sum(#3)/avg(#3) 最新3个数据点所有/最小/最大/和/平均

  • diff(#3) 当前数据点减去最新的3个历史点,拿到的差值,如果有任何一个超过阈值则告警

  • pdiff(#3) 当前数据点减去最新的3个历史点,再除以历史点,求得变动的波动率,如果任何一个波动率超过阈值则告警

判断告警

1.judge收到数据并规整完之后,会试着在bigmap中找对应的key

2.判断bigmap中的key对应的value是否有数据,有数据则追加,无数据则新建。此时还会做很多判断,如数据合法性校验、数据条数判断等

  1. 根据endpoint+metric 找到内存中对用的strategy和expression,此时结果可能是一个列表,接下来会对tags进行匹配

  2. 循环strategy或expression的tags列表,判断是否在数据的tags中,即如果策略里tags 是数据tags的子集,则该策略为该数据的告警策略

  3. 解析报警策略中的判断函数

  4. 数据和阈值进行比较,如果触发阈值,则产生一个告警事件

7.判断告警事件是否放入alarm队列,此时判断规则为:

* 检查的数据个数是否判断条件(例如检查最近5份数据,但是目前只有3份)

* 该策略是否被屏蔽(maxStep设置为0)

* 当前报警次数是否已经达到最大告警次数

* 距离上次报警是否满足一定的时间间隔

注意:judge 对阈值判断和告警 过程中,数据点不会重复用于和新数据点判断。如


10:01 10:02 10:04 10:05 10:06 10:07 10:08 10:09 10:10 10:11 10:12 10:13 4 5 3 9 2 1 9 7 6 3 5 4 阈值:cpu.idle < 10,minInterval:120 发现 10次上报的数据都触发了阈值,但是产生的告警时间点为:10:04,10:10 10:07由于和上次产生告警事件的间隔小于120s,所以不会产生告警事件
  1. 将告警事件根据告警的级别发送到alarm对应的level队列

历史脏数据清理

脏数据清理主要是为了清理那些很久没上报的enpoint和metric,如果我们修改endpoint,那么会产生新的endpint,并上报,judge会重新存储,这样导致老的数据在内存中越来越多,最终造成内存溢出

05-11 19:45