一、实时计算平台背景
首先了解一下实时计算平台的背景。电信行业的业务系统非常复杂,所以它的数据源也是非常多的,目前实时计算平台接入了 30 多种数据源,这 30 多种数据源相对于总的数据种类来说是比较小的。即使这样,我们的数据量也达到了万亿级别,每天有 600TB 的数据增量,而且我们接入的数据源种类和大小还在持续增长。平台的用户来自于全国 31 个省份公司以及联通集团的各个子公司,尤其是在节假日会有大量用户去做规则的订阅。用户想要获取数据,需要在平台上进行订阅,我们会将数据源封装成标准化的场景,目前我们有 26 种标准化场景,支撑了 5000 多个规则的订阅。
订阅场景有三大类。
- 用户行为类的场景有 14 个,比如位置类的场景,是关于用户进入某个区域后停留了多长时间,还有终端登网,是关于用户连接了哪个网络,4G 还是 5G 网,以及漫游、语音、产品订购等各种场景;
- 用户使用类的场景有 6 个,比如用户使用了多少流量、账户余额是多少、是否有欠费停机等;
- 用户触网类的场景大概有 10 个,比如业务的办理、充值缴费,还有新入户入网等。
对于实时计算平台来说,实时性的要求是很高的。数据从产生到进入我们的系统,大概有 5~20 秒的延迟,经过系统正常处理之后大概有 3~10 秒的延迟,我们允许的最大延迟是 5 分钟,所以必须做好实时计算平台端到端的延迟的监控。
用户定义的场景符合要求的数据最少要下发一次,这个是有严格要求的,不能漏发,而 Flink 很好地满足了这个需求;数据的准确性要求需要达到 95%,平台实时下发的数据会保存一份到 HDFS 上,每天我们会抽取部分订阅和离线数据,按照相同的规则进行数据生成以及数据质量比对,如果差异过大就需要找到原因并保证后续下发数据的质量。
本次分享更多的是 Flink 在电信行业的一次企业的深度实践:如何使用 Flink 更好地支撑我们的需求。
通用的平台无法支撑我们的特殊场景,比如以位置类的场景为例,我们会在地图上画多个电子围栏,当用户进入围栏并在围栏里面停留一段时间,并且满足用户设定性别、年龄、职业、收入等用户特征,这样的数据才进行下发。
我们的平台可以认为是一个实时场景中台,将数据进行实时的清洗处理,封装成支持多个条件组合的复杂场景,集约化地提供标准实时能力的同时,更进一步靠近业务,提供给业务方简单易用、门槛很低的接入方式。业务方通过调用标准化的接口,经过网关认证鉴权,才可以订阅我们的场景。订阅成功之后会将 Kafka 的一些连接信息和数据的 Schema 返回给订阅用户,用户订阅的筛选条件跟数据流进行匹配,匹配成功之后会以 Kafka 的形式再进行数据下发。以上就是我们的实时计算平台与下游系统交互的流程。
二、实时计算平台演进与实践
2020 年以前,我们的平台是使用 Kafka + Spark Streaming 来实现的,而且是采购厂商的第三方的平台,遇到了很多问题和瓶颈,难以满足我们日常的需求。现在很多企业包括我们都正在进行数字化改革,系统的自研比例也越来越高,再加上需求的驱动,自研、可灵活定制、可控的系统是迫在眉睫了。所以 2020 年我们开始接触了 Flink,并实现了基于 Flink 的实时计算平台,这个过程中我们也体会到了开源的魅力,今后也会更多地参与到社区中来。
我们的既往平台存在很多问题。首先是三方黑盒平台,我们使用厂商的第三方平台,过多依赖了外部系统;并且在大并发下外部系统的负载会非常高,不能灵活定制个性化的需求;Kafka 的负载也特别高,因为每一个规则的订阅都会对应多个 topic,所以随着规则的增加,topic 和分区的数量也会呈线性增长,导致延时比较高。每个订阅场景会对应多个实时流,而每个实时流都会占用内存和 CPU,场景过多会导致资源消耗增长以及资源负载过高;再就是支撑的体量小,支撑场景的订阅数有限的,比如逢年过节用户订阅的数量剧增,经常需要紧急救火,无法满足日益增长的需求;此外,监控粒度也不够,无法灵活定制监控,无法进行端到端的监控,采用人肉的排查比较多。
基于上述问题,我们全面自研了基于 Flink 的实时计算平台,根据每个场景的特点进行最优的定制,最大化资源的使用效率。同时我们利用 Flink 的状态减少外部依赖,降低了程序的复杂度,提升程序的性能。通过灵活定制实现了资源的优化,相同体量的需求下大大节约了资源。同时为了保证系统的低延迟率,我们进行了端到端的监控,比如增加了数据的积压、延迟、数据断传监控。
整个平台的架构比较简单,采用了 Flink on Yarn 的运行方式,外部只依赖 HBase,数据是以 Kafka 接入并由 Kafka 下发。
Flink 的集群是独立搭建的,它独享了 550 台服务器,没有和离线计算混用,因为它对稳定性要求比较高,需要日均处理 1.5 万亿数据,近 600TB 的数据增量。
我们对场景深度定制的主要原因是数据量大,同一个场景的订阅又非常多,而且每个订阅的条件又是不一样的。从 Kafka 读取一条数据的时候,这条数据要匹配多个规则,匹配中后才会下发到规则对应的 topic 里面。所以不管有多少订阅,只从 Kafka 中读取数据一次,这样能够降低对 Kafka 的消耗。
手机打电话或者上网都会连接到基站,相同基站的数据会按一定的时长窗口和固定消息进行压缩,比如三秒钟一个窗口,或者消息达到了 1000 再进行触发,这样下游接收到的消息就会有量级的降低。然后是围栏匹配,外部系统的压力是基于基站规模的,而不是基于消息数目。再就是充分利用了 Flink 的状态,当人员进入和滞留的时候会存入状态,用 RocksDB 状态后端减少了外部依赖,简化了系统的复杂度。此外,我们还实现了亿级标签的关联不依赖外部系统。通过数据压缩、围栏匹配、进入驻留、标签关联后,我们才开始正式匹配规则。
用户订阅场景后,订阅的规则会以 Flink CDC 的方式同步到实时计算平台,这样可以保证延迟比较低。由于人群的进入滞留会存入到状态,基于 RocksDB 的状态后端数据量比较大,我们会通过解析状态的数据进行问题排查,比如用户到底有没有在围栏之中。
我们自定义了 HASH 算法,在不依赖于外部系统的情况下,实现了亿级标签的关联。
在大并发下,如果每个用户都要关联外部系统获取标签的信息,那么外部系统的压力会非常大,尤其是在我们联通这么大数据量的情况下,依赖于外部系统建设的成本也很高,这些标签都是离线标签,数据相对比较稳定,比如有日更的有月更的。所以我们对用户使用自定义的哈希算法,比如有个手机号,按照哈希算法它被分配到 index 为 0 的 task_0 实例中,再通过离线计算将标签文件中的手机号也按照相同的哈希算法分配到编号为 0 的 0_tag 中。
task_0 实例在 open 方法中获取自己的 index 编号,即 index=0,然后拼接出标签文件名 0.tag,并将文件加载到自己的内存中。Task_0 实例接收到手机号后就可以从本地内存获取到此手机号的标签数据,不会造成内存的冗余浪费,提升了系统性能,减少了外部依赖。
有标签更新的时候, open 方法也会自动加载新的标签,并刷新到自己的内存中。
上图是我们做的端到端的时延监控。因为我们的业务对延迟要求比较高,所以我们进行了事件时间的打标,比如进出 Kafka 时间的打标,这里的事件就是消息。对于算子的延迟监控,我们根据打标的时间和当前的时间计算出延迟,这里并不是每条消息来了之后都去计算,而是采用抽样的方式。
对积压断传也做了监控,是通过采集 Kafka offset 进行前后对比来判断的,另外还有对数据延迟的监控,利用事件的时间和当前的时间来计算延迟,可以监控上游系统的数据延迟。
上图是端到端延迟监控和反压监控的图表。可以看到端到端的延迟正常是在 2~6 秒之间,也符合我们的预期,因为定位的条件是比较复杂的。我们还对反压进行了监控,通过监控算子 input channel 的使用率来定位每个算子产生的反压,比如第二个图出现了严重的反压且持续了一段时间,这时候我们需要定位到具体的算子,然后去排查原因,来保证系统的低延迟。
上图是我们对 Kafka 集群中的每个 topic 分区的 offset,以及对每个消费者消费到的位置进行采集来定位它的断传和积压。
首先制定一个 source 来获取 topic 列表和消费者组列表,再这些列表下发到下游,下游的算子可以采用分布式的方式去采集 offset 值,也是利用了 Flink 的特性。最后写入 Clickhouse 中进行分析。
Flink 日常监控主要包括以下几类:
- Flink 作业的监控、告警接入联通统一告警天眼平台;
- 作业的运行状态、checkpoint 的异常耗时;
- 算子的时延、反压、流量、条数;
- taskmanager CPU、内存的使用率,JVM GC 等指标的监控。
三、基于 Flink 的集群治理
我们还基于 Flink 搭建了我们的集群治理平台。搭建这个平台的背景是我们的总集群节点达到了 1 万多台,单集群最大有 900 个节点,总共 40 多个集群,总数据量单副本达到了 100 个 PB,每天有 60 万个作业运行,单个集群的 NameNode 的文件数最大达到了 1.5 亿。
随着公司业务的高速发展,数据的需求越来越复杂,所需要的算力也越来越大,集群的规模也越来越大,承载的数据产品也越来越多,导致 Hadoop 集群面临很大的挑战:
- 文件数比较多对 nameNode 造成很大的压力,影响存储系统的稳定。
- 小文件特别多,导致读取同样数据量的时候需要扫描更多文件,导致更多 NameNode RPC。
- 空文件多,需要扫描更多的文件,导致更多的 RPC。
- 平均文件比较小,从宏观上也体现出了小文件数比较多的。
- 生产上会持续产生文件,作业输出的文件要进行调优。
- 冷数据多,缺少清理的机制,浪费存储资源。
- 资源负载高,而且扩容成本又太大,扩容了也无法支撑太长时间。
- 作业耗时长影响产品的交付。
- 作业消耗资源大,占用太多的 CPU、核数和内存。
- 作业存在数据倾斜,导致执行时间非常长。
针对这些挑战,我们搭建了基于 Flink 的集群治理架构,通过采集资源队列的信息,解析 NameNode 的元数据文件 Fsimage,采集计算引擎的作业等信息等,然后对集群做 HDFS 画像、作业画像,数据血缘、冗余计算画像、RPC 画像以及资源画像。
资源画像:我们会同时对多个集群的多个资源队列的情况比如它的 IO、 metric 等进行分钟级的采集,可以实时查看整个集群和细分队列的资源使用趋势。
存储画像:我们以无侵入的方式对多集群的分布式存储进行全局性的多维度的分析。比如文件数到底分布在哪里,小文件分布在哪里,空文件分布在哪里。对于冷数据的分布,我们对每个数据库每张表的分区目录也做了精细化的画像。
作业画像:对多集群全产品线不同计算引擎的作业,我们进行实时采集,从时间维度、队列维度以及作业提交来源等多个维度,从耗时耗资源,数据倾斜、大吞吐量、高 RPC 的作业等多方面进行洞察,找出有问题的作业,筛选出那些待优化的作业。
数据血缘:通过分析生产环境 10 万级别的 SQL 语句,绘制出无侵入的、全局的、高精准的数据血缘关系。并在任意周期内提供了数据表级/账户级的调用频次、数据表的依赖关系、产线加工的流程变更、加工故障的影响范围和垃圾表的洞察等功能。
此外我们还做了用户操作审计和元数据方面的一些画像。
上图是集群治理存储大屏。除了一些宏观指标比如总文件数、空文件数、空文件夹,还有冷目录数、冷数据量和小文件的占比等。我们还对冷数据进行分析,比如哪些数据最后访问在某个月的数据量有多大,由此可以看到冷数据的时间分布;还有比如 10 兆以下、50 兆以下、100 兆以下的文件分布在哪些租户上。除了这些指标,还可以精确定位到哪个库、哪张表、哪个分区存在小文件。
上图是集群治理的效果展示,可以看到资源负载达到 100% 的时长也明显缩短,文件数降低了 60% 以上,RPC 负载也大幅降低。每年会有千万级别的成本节约,解决了长时间的资源紧张问题,降低了扩容的机器数。
四、未来规划
目前我们还没有一个完善的实时流管理平台,且监控比较分散,研发通用的管理和监控平台势在必行。
面对日益增长的需求,深度定制化虽然节约了资源,提升了支撑的规模,但是它的开发效率并不理想。针对数据量不大的场景,我们也考虑了使用 Flink SQL 来搭建通用的平台,以此来提升研发效率。
最后,我们还会继续探索 Flink 在数据湖中的应用。
更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~