• 也就是说如果缓存池满 了,消息追加调用将会被阻塞,直到有空闲的内存块,这样的话只要 Kafka 集群负载很高或者网络稍有波动,Sender 线程从缓冲池捞取消息的速度赶不上客户端发送的速度,就会造成客户端(也就是 Cobar 执行线程)发送被阻塞,这样的话可能导致线上几乎所有接口调用雪崩,系统不可用,导致严重的灾难!

    而原来的设计看似复杂,但实际上符合软件设计中的分层原则,这样的设计有两个好处,如下图示:

    高性能队列的实现思路

    1.队列的表示:数组 or 链表?

    队列(Queue)是一种线性表,是一种先进先出的数据结构,主要由数组和链表组成,队列只允许在后端(称为 rear)进行插入操作,在前端(称为 front)进行删除操作

    这两者优缺点都很明显,但总的来说数组的执行效率更高,为啥,这里简单介绍下 CPU 的运行原理,由于内存太慢,所以在 CPU 和 内存间设置了多道缓存(L1,L2,L3三级缓存,其中 L1,L2 缓存为每个 CPU 核独有的, L3是共享的)以提升 CPU 的执行效率,CPU 执行取数时,先从 L1查找,没有再从 L2查找,L2 没有则从 L3,查找,L3还是没有的话就会从内存加载。

    但需要注意的是,CPU 从内存加载数据时并不是以字节为单位加载,它是以 cacheline 的形式来加载的,cacheline 是从内存加载或写入的最小单位,在 X86 架构中,一个cacheline 由内存里连续的 64 个 byte组成的,而数组是在内存里连续分配的,所以它一次性能被加载多个数据到 cache 中,而链表中 node 的空间是分散在非连续的内存地址空间中,所以总的来说数组由于利用了 cache line 的连续加载特性对缓存更友好,性能会更好。

    这应该是不少人支持使用链表的一个重要原因了,如果空间不够大,需要扩容怎么办,对于链表来说,很简单,在 rear 结点后新增一个节点,将 rear 结点的 next 指针指向它即可,非常方便,但对于数组来说就没那么容易了,它需要先生成一个原来 n(一般是 2) 倍大小的新数组,再把老数组里的数据给移过去,如下图所示

    如果光从扩容这一角度来看,确实链表更优秀,但我们不要忘了消费者消费完后是要把链表对应的节点给释放掉的,在高并发下,就会造成频繁的 GC,造成严重的性能影响

    估计有人就会反驳了,如果数组中的元素被消费完了,难道不要被移除?这样的话岂不是也会存在高并发下的频繁 GC?总不能一开始给这个数组分配一个无限大的空间吧,这样的话就成了无界队列,这样的话还没等你数组填满就 OOM 了。

    这是个好问题,实际上对于数组来说,我们可以使用一个小 trick,既可以让它变成有界(即固定大小,无需扩容)数组,也可以避免频繁 GC,更可以避免数组扩容带来的性能问题,怎么做,将线性数组改造成循环数组(RingBuffer)

    如图示,假设数组的初始化大小为 7,当生产者把数组的七个元素都填满时(此时 0,1,2三个元素已经被消费者消费完了),如果生产者还想再填充数据,由于 0,1,2对应的三位元素已经被消费了,属于过期无效的元素了,所以生产者可以从头开始往里填充元素,只要不超过消费者的进度即可,同理,如果消费者对应的指针达到数组的末端,下一次消费也就回到数组下标 0 开始消费,只要不超过生产者进度即可。

    我们将将数组的首尾拼接就形成了一个 ringbuffer

    有人会说绕圈了怎么定位数组的具体下标?对数组大小取模即可,生产者/消费者对应的数组下标都是累加的,以以上情况为例,当前生产者的下标为 6,下一个下标就是 7,而当前数组大小为 7,于是 7 对应的数组下标即为 7%7 = 0,与实际相符。但需要注意的是取模操作是个很昂贵的操作,所以我们可以用位运算来代替,但位运算要求数组的大小为 2^n(想想为什么),于是取模操作可以用 index & (2^n -1 ) 来代替(index 为 生产者/消费者对应的下标)。

    综上,设计一个这样大小为 2^n(这里有两层含义,一是大小为 2^n,二是数组有界) 且由数组表示的 ringbuffer 有「对缓存友好」,「对 GC 友好」两个重要特性,在高并发下对性能的提升是非常有帮助的。

    2. 无锁

    只选好 ringbuffer 是不够的,在高并发下,多个生产者/消费者极有可能争用 ringbuffer 的 同一个 index,如下图示:

    为了避免这种情况,最容易想到的是加锁,但显然加锁会存在严重的性能问题:

    综上所述使用锁是不行的,一种有效的方式是使用 CAS 自旋不断尝试获取对应的 index,这种方式也就是我们所说的无锁的方式

    也许有人担心 CAS 性能比较低,我们可以用 JDK 8 之后对 atomic 类引入的 unsafe 的 getAndAddInt 方法来替换,如下:

    public final int incrementAndGet() {

     return unsafe.getAndAddInt(this, valueOffset, 1) + 1;

    }

    它使用的是 CPU 的 fetch-and-add 指令,性能上比 CAS 要强很多(实测是 6 倍于 CAS 的性能,见文末参考链接),skywalking 的队列就是用的 getAndInt 来代替 CAS,达到了很高的性能。

    可以看到在 RingBufferFields 前后分别填充了 7 个 long 类型的变量,这 14 个变量没有任何实际用wt途,只是用来做缓存行填充之用。

    总结

    disruptor 是 LMAX 公司开源的一款高性能队列,每秒支持 600w 的吞吐量,相当于用 1 微秒的延迟就获取了 100K+ 的吞吐量,性能极高,简单总结一下它主要用到了以下三个手段:

    另外 disruptor 提供了阻塞(publishEvent) 和非阻塞(tryPublishEvent)两种方法,针对我们文章开头 cobar 的使用场景,建议使用 tryPublishEvent 这种非阻塞的方式来向 ringbuffer 投递事件,不然一旦阻塞会导致线上 cobar 执行线程停顿的重大故障!

    disruptor 到底有多强悍,可能看听数据大家没有感觉,那我们来看一张针对 log4j2 的压测图

    log4j2 总共有三种工作机制,全局异步(Loggers all async),Appender 异步( Async Appender)以及同步(Sync),可以看到全局异步最快,几乎是 Appender 异步的 10 倍以上,为什么同样提异步,性能相差这么大,因为全局异步用的是 disruptor ,而 Appender 异步用的是 ArrayBlockingQueue,可以看到 disruptor 被称为高性能队列之王的名头可不是盖的。

    另外除了 disruptor,大家也可以看看 skywaking 的队列,它的性能虽然没有 disruptor 这么强,但对一般的业务场景也是足够的(在我司 cobar 压测 5.5w/s,使用了 skywalking 后仅损失了 1.8%,也是很强悍的),它也是阻塞的,另外它在队列满的时候可以选择「阻塞」,「覆盖」,「忽略」三种策略,我们选择了覆盖。

    更多精品文章,欢迎大家扫码关注「码海」

    06-29 03:08