本文原创地址:https://www.cnblogs.com/xijiu/p/14235551.html 转载请注明
赛题
实现一个分布式统计和过滤的链路追踪
赛题背景
本题目是另外一种采样方式(tail-based Sampling),只要请求的链路追踪数据中任何节点出现重要数据特征(错慢请求),这个请求的所有链路数据都采集。目前开源的链路追踪产品都没有实现tail-based Sampling,主要的挑战是:任何节点出现符合采样条件的链路数据,那就需要把这个请求的所有链路数据采集。即使其他链路数据在这条链路节点数据之前还是之后产生,即使其他链路数据在分布式系统的成百上千台机器上产生。
整体流程
用户需要实现两个程序,一个是数量流(橙色标记)的处理程序,该机器可以获取数据源的http地址,拉取数据后进行处理,一个是后端程序(蓝色标记),和客户端数据流处理程序通信,将最终数据结果在后端引擎机器上计算。具体描述可直接打开赛题地址 https://tianchi.aliyun.com/competition/entrance/231790/information。此处不再赘述
解题
题目分析
可将整体流程粗略分为三大部分
- 一、front 读取 http stream 至 front 节点
- 网络io
- 二、front 节点处理数据
- cpu处理
- 三、将 bad traces 同步至 backend,backend 经过汇总计算完成上报
- 网络传输 + cpu
遵循原则:各部分协调好,可抽象为生成、消费模型,切勿产生数据饥饿;理想效果是stream流完,计算也跟着马上结束
方案一 (trace粒度)
因题目中明确表明某个 trace 的数据出现的位置前后不超过2万上,故每2万行是非常重要的特征
- ① 按行读取字符流
BufferedReader.readLine()
- ② 分析计算
- 在某个 trace 首次出现时(p),记录其结束位置 p + 2w,并将其放入待处理队列(queue)中
- 如果当前 trace 为 badTrace,将其放入 BadTraceSet 中
- 每处理一行,均从 queue 队列中拿出 firstElement,判断是否与之相等,如果相等,且为 badTrace,那么进入第3步
- ③ 向 backend 发送数据 注:后续所有涉及网络交互的部门均为 netty 异步交互
- 将当前 trace 对应的所有数据按照 startTime 排序
- 将排好序的数据与该 trace 最后结束位置 endPosition 一并发送至 backend 节点
- ④ backend 通知 front2 发送数据
- backend 接收到从 front1 发送过来的 trace 数据,向 front2 发送通知,要求其发送该 trace 的全部数据
- 注:此处交互有多种情况,在步骤5时,会具体说明
- ⑤ front2 将数据发送至 backend 此处列举所有可能发生的情况
- 场景 1:front1 主动上报 traceA 后,发现 front2 已经上报该 traceA 数据,结束
- 场景 2:front1 主动上报 traceA 后,front2 未上报,front2 发现该trace在已就绪集合中,排序、上报,结束
- 场景 3:front1 主动上报 traceA 后,front2 未上报,且 front2 的已就绪集合没有该 trace,在错误集合中发现该 trace,结束 注:因该 trace 存在于 badTraceSet 中,故将来某个时刻 front2 一定会主动上报该 trace
- 场景 4:front1 主动上报 traceA 后,front2 未上报,且 front2 的已就绪集合没有该 trace,那么等待行数超限后,检查该 trace 是否存在于 badTraceSet 中,如果已存在,结束
- 场景 5:front1 主动上报 traceA 后,front2 未上报,且 front2 的已就绪集合没有该 trace,那么等待行数超限后,检查该 trace 是否存在于 badTraceSet 中,如果不存在,排序、上报,结束 注:即便是 front2 中不存在该trace的信息,也需要上报
- ⑥ 结果计算
- 在收集到 front1 跟 front2 数据后,对2个有序集合进行 merge sort 后,计算其 MD5
方案分析
此方案的跑分大致在 25s 左右,成绩不甚理想,总结原因大致可分为以下几种
- 交互场景较为复杂
- 需要维护一块缓存区域
- 如果该缓存区域通过行数来失效过期数据的话,那么需要额外的分支计算来判断过期数据,拖慢整体响应时间
- 如果通过缓存大小来自动失效过期数据的话,那么大小设置很难平衡,如果太小,则可能会失效一些正常数据,导致最终结果不正确,如果太大,又会导致程序反应变慢,带来一系列不可控的问题
基于上述原因,为了充分利用 2万行的数据特征,引入方案二
方案二 (batch粒度)
因题目中明确表明某个trace的数据出现的位置前后不超过2万上,故每2万行数据可作为一个批次存储,过期数据自动删除
① 按行读取字符流
BufferedReader.readLine()
② 每2万行数据作为一个batch,并分配唯一的batchId(自增即可),此处涉及大量cpu计算,分为2部分
- 在每行的 tag 部分寻找
error=1
或http.status_code!=200
的数据并将其暂存 - 将 traceId 相同的 span 数据放入预先开辟好空间的数据结构
List<Map<String, List<String>>>
中,方便后续 backend 节点拿取数据 - 注:此处下载数据与处理数据并行执行,交由2个线程处理,一切为了提速
- 在每行的 tag 部分寻找
③ 上报 badTraceId
- 每切割 2万行,统计所有的 badTraceId,与 batchId 一并上报至 backend
- 因同一个 span 数据可能分布在2个 front 节点中,所以有可能 front1 存在错误数据,front2 却全部正确,2个 front 又不能直接通信,所以此时需要同步至 backend,由 backend 统计全量的 badTraceIds
- front 收到 backend 的通知后,进行当前批次错误 trace 数据的统计,因当前批次的数据有可能出现在上一个批次跟下一个批次,故一定要等到处理每行数据的线程已经处理完 currentBatchNum+1 个线程后,方能执行操作
④ 通知2个 front 节点发送指定 traceIds 的全量数据
- backend 主动向2个 front 发送获取指定 traceIds 的全量数据通知
- front 将 span 数据排好序后上报至 backend
- backend 执行二路归并排序后,计算整体 span 的 md5 值,反复循环,直至数据流读取完毕 注:因2个 front 节点为2核4g,backend 节点为单核2g,为减少 backend 压力,将部分排序工作下放至 front 节点
⑤ 计算结果
- 归并排序,计算最终结果
方案总结
当前方案耗时在20s左右,统计发现字符流的读取耗时15s,其他耗时5s,且监控发现各个缓冲区没有发现饥饿、过剩的情况,所以当前方案的瓶颈还是卡在字符流的读取、以及cpu判断上,所以一套面向字节流处理的方案呼之欲出
- 跟读
BufferedReader
源码,发现其将字节流转换字符流做了大量的工作,也是耗时的源头,故需要将当前方案改造为面向字节的操作
方案三 (面向字节)
大层面的设计思想与方案二一致,不过面向字节处理的话,从读取流、截断行、判断是否为bad trace、数据组装等均需为字节操作
- 好处:预分配内存,面向字节,程序性能提高
- 弊端:编码复杂,需自定义数据协议
① 读取字节流
- 程序在初始化时,预先分配10个字节数据
byte[]
,每个数组存放10M数据 - io 与 cpu 分离,将读取数据任务交个某个独立线程,核心线程处理cpu
- 程序在初始化时,预先分配10个字节数据
② 数据处理
- 用固定内存结构
int[20000]
替换之前动态分配内存的数据结构体List<Map<String, List<String>>>
,只记录每行开始的 position - 同时将 bad trace id 存入预先分配好的数组中
- 用固定内存结构
③ 上报 badTraceId
- 同方案二
④ 通知2个 front 节点发送指定 traceIds 的全量数据
- backend 主动向2个 front 发送获取指定 traceIds 的全量数据通知
- 因在步骤二时,并没有针对 trace 进行数据聚合,所以在搜集数据时,需要遍历
int[20000]
,将符合要求的 trace 数据放入自定义规范的byte[]
注:刚开始设计的(快排+归并排序)的方案效果不明显,且线上的评测环境的2个 front 节点压力很大,再考虑到某个 trace 对应的 span 数据只有几十条,故此处将所有的排序操作都下放给 backend 节点,从而减轻 front 压力 - 因 span 为变长数据,故自定义规范
byte[]
存储数据的设计如下- 预先分配10M
byte[]
,来存储一个批次中的所有 bad trace 对应 span 数据 - 用2个 byte 存放下一个 span 数据的长度
- 存储 span 数据
- 最后返回
byte[]
及有效长度
- 预先分配10M
⑤ 计算结果
- 排序,计算最终结果
线程并发
- A-Thread: slot 粒度,读取 http stream 线程
- B-Thread: block 粒度,处理 slot 中的 block,将 block 数据按行切割、抓取 bad trace id 等
- C-Thread: block 粒度,响应 backend 拉取数据的请求
阻塞场景
- A-Thread 读取 http stream 中数据后,将其放入下一个缓存区,如果下一个缓冲区还没有被线程C消费,那么A-Thread 将被阻塞
- B-Thread 处理数据,如果B-Thread下一个要处理的
byte[]
数据A线程还未下载完毕,那么B-Thread将被阻塞(io阻塞) - C-Thread 为拼接 bad trace 的线程,需要 previous、current、next 3个 batch 都 ready后,才能组织数据,当B-Thread还未处理完next batch 数据时,C-Thread将被阻塞
解决思路
- A-B 同步:10个 slot 分配10个
Semaphore
,为 A-Thread 与 B-Thread 同步服务,A-Thread 产生数据后,对应 slot 的信号量+1,B-Thread 消费数据之前,需要semaphore.acquire()
- B-C 同步:通过
volatile
及纳秒级睡眠Thread.sleep(0, 2)
实现高效响应。实际测试,某些场景中,该组合性能超过Semaphore
;C-Thread 发现 B-Thread 还未产出 next batch 的数据,那么进入等待状态 - A-C 同步:同样利用
volatile
及纳秒级睡眠Thread.sleep(0, 2)
JVM调参
打印gc输出日志时发现,程序会发生3-5次 full gc,导致性能欠佳,分析内存使用场景发现,流式输出的数据模型,在内存中只会存在很短的一段时间便会失效,真正流入老年代的内存是很小的,故应调大新生代占比
java -Dserver.port=$SERVER_PORT -Xms3900m -Xmx3900m -Xmn3500m -jar tailbaseSampling-1.0-SNAPSHOT.jar &
直接分配约 4g 的空间,其中新生代占 3.5g,通过观测 full gc 消失;此举可使评测快2-3s
方案总结
此方案最优成绩跑到了5.7s,性能有了较大提升,总结快的原因如下:
- 面向字节
- 内存预分配;避免临时开辟内存
- 使用轻量级锁
- 避免程序阻塞或饥饿
奇技淫巧
快速读取字节数组
因java语言设计缘故,凡事读取比 int 小的数据类型,统一转为 int 后操作,试想以下代码
while ((byteNum = input.read(data)) != -1) {
for (int i = 0; i < byteNum; i++) {
if (data[i] == 10) {
count++;
}
}
}
大量的字节对比操作,每次对比,均把一个 byte 转换为 4个 byte,效率可想而知
一个典型的提高字节数组对比效率的例子,采用万能的Unsafe
,一次性获取8个byte long val = unsafe.getLong(lineByteArr, pos + Unsafe.ARRAY_BYTE_BASE_OFFSET);
然后比较2个 long 值是否相等,提速是成倍增长的,那么怎么用到本次赛题上呢?
span数据是类似这样格式的
193081e285d91b5a|1593760002553450|1e86d0a94dab70d|28b74c9f5e05b2af|508|PromotionCenter|DoGetCommercialStatus|192.168.102.13|http.status_code=200&component=java-web-servlet&span.kind=server&bizErr=4-failGetOrder&http.method=GET
用"|"切割后,倒数第二位是ip,且格式固定为192.168.***.***
,如果采用Unsafe
,每次读取一个 int 时,势必会落在192.168.
中间,有4种可能192.
、92.1
、2.16
、.168
,故可利用此特性,直接进行 int 判断
int val = unsafe.getInt(data, beginPos + Unsafe.ARRAY_BYTE_BASE_OFFSET);
if (val == 775043377 || val == 825111097 || val == 909192754 || val == 943075630) {
}
此“技巧”提速1-2秒
大循环遍历
提供2种遍历字节数组方式,哪种效率更高
方式1
byte[] data = new byte[1024 * 1024 * 2]; int byteNum; while ((byteNum = input.read(data)) != -1) { for (int i = 0; i < byteNum; i++) { if (data[i] == 10) { count++; } } }
方式2
byte[] data = new byte[1024 * 1024 * 2]; int byteNum; int beginIndex; int endIndex; int beginPos; while ((byteNum = input.read(data)) != -1) { beginIndex = 0; endIndex = byteNum; beginPos = 0; while (beginIndex < endIndex) { int i; for (i = beginPos; i < endIndex; i++) { if (data[i] == 124) { beginPos = i + 1; times++; break; } else { if (data[i] == 10) { count++; beginIndex = i + 1; beginPos = i + 1; break; } } } if (i >= byteNum) { break; } } }
两种方式达到的效果一样,都是寻找换行符。方式2不同的是,每次找到换行符都 break 掉当前循环,然后从之前位置继续循环。其实这个小点卡了我1个星期,就是将字符流转换为字节流时,性能几乎没有得到提高,换成方式2后,性能至少提高一倍。为什么会呈现这样一种现象,我还没找到相关资料,有知道的同学,还望不吝赐教哈
结束
最终比赛成绩贴上哈