​在之前调研Sentinel的过程中,为了准备分享内容,自己就简单的写了一些测试代码,不过在测试中遇到了一些问题,其中有一个问题就是Sentinel流控在并发情况下限流并不精确,当时我还在想,这个我在做分享的时候该怎么来自圆其说呢,所以觉得比较有意思,在这里做一个记录。同时在排查这个问题的过程中,为了说清楚问题原因,我觉得有必要理一下它的责任链,所以副标题就是Sentinel的责任链。


一、问题起源

​在这里我直接上我的测试代码,我的本意是要起10个线程同时去请求这个资源,每个线程轮询10次。同时,对于这个资源我设置了限流规则为QPS=1。这也就意味着我这10个线程共100个请求,正确的结果应该是成功1个,阻塞99个。

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 流量控制演示
 */
public class FlowQpsDemo_Bug {

    private static final String SOURCE_KEY = "CESHI_KEY";

    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();

    public static void main(String[] args) throws InterruptedException {
        initFlowQpsRule();
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(10);
        for (int i = 0;i < 10;i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        start.await();
                    } catch (InterruptedException e) {
                    }
                    for (int i = 0;i < 10;i++) {
                        fun();
                    }
                    end.countDown();
                }
            }).start();
        }
        start.countDown();
        end.await();
        System.out.println("total=" + total.get() + " pass=" + pass.get() + " block=" + block.get());
    }

    public static void fun() {
        Entry entry = null;
        try {
            entry = SphU.entry(SOURCE_KEY);
            // todo 业务逻辑
            pass.incrementAndGet();
        } catch (BlockException e1) {
            // todo 流控处理
            block.incrementAndGet();
        } finally {
            total.incrementAndGet();
            if (entry != null) {
                entry.exit();
            }
        }
    }

    private static void initFlowQpsRule() {
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule rule1 = new FlowRule();
        rule1.setResource(SOURCE_KEY);
        // 采用qps策略,每秒允许通过1个请求
        rule1.setCount(1);
        rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule1.setLimitApp("default");
        rules.add(rule1);
        FlowRuleManager.loadRules(rules);
    }
}

​但是,实际运行结果却并非如此,运行好多次都是成功10个,阻塞90个。也有其他情况比如成功5、6个,阻塞90多个。这显然不是我想要的结果。并且考虑到我们在生产环境设置的限流往往是压测的极限值了,如果这时限流还不准确,那岂不是就失去了系统保护的作用了。

​为了解决这个问题,最直接的方式就是去走一遍它的源码,这一走就看到了问题。为了说明问题所在,有必要先介绍一下Sentinel内部责任链的使用。

二、Sentinel内部责任链的使用

​责任链由一系列节点组成,每个节点拥有一个next属性来指向下一个处理节点。当一个请求进来时,当前节点或者执行完后将请求交个next节点处理,或者先交个next节点处理完后自己再处理。这样就形成一条链,将请求一个节点一个节点的往下传递处理执行。

​在Sentinel中,对应的就是Slot,官方文档中翻译为功能插槽。最顶层的插槽定义就是com.alibaba.csp.sentinel.slotchain.ProcessorSlot,这是一个接口,下面有一个抽象类com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot实现了它,其中最主要的逻辑就是entry和exit两个方法,分别在请求进来和退出时调用。

​而AbstractLinkedProcessorSlot有一系列的子类,这一系列的子类分别完成不同的功能,列举几个重要的如下:

  • StatisticSlot:处理统计逻辑,它是其它限流、降级等插槽执行的基础;
  • ClusterBuilderSlot :用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • FlowSlot:用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • DegradeSlot:通过统计信息以及预设的规则,来做熔断降级;

​整个责任链的构建入口为DefaultSlotChainBuilder.build。其中,每个功能插槽都有对应的顺序,Sentinel在构建链的时候按照优先级从小到大的顺序进行串联构建链路。

三、不精确原因

​在这次采坑中,两个主角就是StatisticSlot和FlowSlot,根据源码中的@SpiOrder标注,StatisticSlot为-7000,FlowSlot为-2000,所以FlowSlot的优先级低于StatisticSlot。那么当请求进来的时候先执行的是StatisticSlot。

​首先看StatisticSlot的entry方法的执行,这里我删去了BlockException异常处理的逻辑,它大体上与上面异常的处理逻辑一样。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    try {
        // Do some checking.
        fireEntry(context, resourceWrapper, node, count, prioritized, args);

        // Request passed, add thread count and pass count.
        node.increaseThreadNum();
        // todo issue_1620 这个时候才会增加QPS,但是刚才前面的DefaultController.pass方法已经返回了true
        node.addPassRequest(count);

        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest(count);
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest(count);
        }

        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (PriorityWaitException ex) {
        node.increaseThreadNum();
        if (context.getCurEntry().getOriginNode() != null) {
            // Add count for origin node.
            context.getCurEntry().getOriginNode().increaseThreadNum();
        }

        if (resourceWrapper.getEntryType() == EntryType.IN) {
            // Add count for global inbound entry node for global statistics.
            Constants.ENTRY_NODE.increaseThreadNum();
        }
        // Handle pass event with registered entry callback handlers.
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (BlockException e) {
    } catch (Throwable e) {
        // Unexpected internal error, set error to current entry.
        context.getCurEntry().setError(e);
        throw e;
    }
}

​在上面这段代码中,StatisticSlot在entry方法中并不是先执行的自己的逻辑,首先调用的是fireEntry方法,进入到这个方法,在AbstractLinkedProcessorSlot类中,代码如下:

@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
    throws Throwable {
    if (next != null) {
        next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
    }
}

​可以看出,在StatisticSlot的entry方法中,它是先调用父类的fireEntry交由后面的节点执行,等后面如限流、降级节点执行完后返回到StatisticSlot的entry方法中后,它才执行对应的统计逻辑。

​那么这时可直接进到FlowSlot的entry方法中,代码如下:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

​可以看出,checkFlow就是限流规则的执行了。进到这个方法中,其逻辑就是调用FlowRuleChecker检查各项限流规则是否已经达到阈值,如果没有达到阈值则放行,如果达到阈值了则阻塞。在规则的检查中,一路跟到了DefaultController类的canPass方法中,其逻辑如下:

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    int curCount = avgUsedTokens(node);
    // 当前阈值 + acquireCount 大于规则设定的count,则返回false,否则返回true
    // 如果并发执行到这里,并没有加锁,所以多个线程都会返回true,限流失效
    if (curCount + acquireCount > count) {
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
            long currentTime;
            long waitInMs;
            currentTime = TimeUtil.currentTimeMillis();
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);

                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                throw new PriorityWaitException(waitInMs);
            }
        }
        return false;
    }
    return true;
}

​问题就出在这个判断中了,要知道前面的统计插槽一定是要等后面的功能插槽执行完后才有统计的依据,而在一个统计时间段开始的时候,10个请求同时进来,这时前面还没有统计数据,那么curCount + acquireCount > count这个条件就可以同时满足,这也就意味着这10个请求可以同时通过,所以就出现了上面最开始我遇到的情况,明明设置了QPS=1,但是还是有10个请求成功通过了,问题点就在这里。

​对于这个问题,我后来看了一下git上面有问题说明,对应两个链接如下:

https://github.com/alibaba/Sentinel/issues/1861
https://github.com/alibaba/Sentinel/issues/1620

​这个问题在1.6版本就提出了,但是目前1.8版本还是没有修复。

​针对这个问题,我的理解就是:Sentinel可能不需要做的那么完美,它可以不完美,但是不可以影响正常的业务系统执行,不能拉低正常业务系统的性能。

​同时这也是一个逻辑缺陷,采用了责任链模式,那么在前面的统计插槽中进行统计时,必然要以后面的其它功能插槽的执行结果为依据。而后面的其它功能插槽对规则的检查又依赖前面的统计数据,所以这个问题还真不好【完美】的去解决它。总不能在后面加锁让请求一个一个通过吧,当然这只是一个玩笑。

四、责任链构建

​上面提到了责任链构建的地方在DefaultSlotChainBuilder.build里面,方法很简单,如下:

@Override
public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();

    // Note: the instances of ProcessorSlot should be different, since they are not stateless.
    List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
    for (ProcessorSlot slot : sortedSlotList) {
        if (!(slot instanceof AbstractLinkedProcessorSlot)) {
            RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
            continue;
        }

        chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
    }

    return chain;
}

​其中DefaultProcessorSlotChain只是一个默认的头结点,里面只有first、end两个元素。通过它的SpiLoader.loadPrototypeInstanceListSorted方法将ProcessorSlot的各个插槽实现类加载进来,然后进行链接。

​其中SpiLoader.loadPrototypeInstanceListSorted对加载的ProcessorSlot进行排序,排序使用到了一个SpiOrderWrapper包装类,里面有一个order属性,加载排序如下:

public static <T> List<T> loadPrototypeInstanceListSorted(Class<T> clazz) {
    try {
        // Not use SERVICE_LOADER_MAP, to make sure the instances loaded are different.
        ServiceLoader<T> serviceLoader = ServiceLoaderUtil.getServiceLoader(clazz);

        List<SpiOrderWrapper<T>> orderWrappers = new ArrayList<>();
        for (T spi : serviceLoader) {
            int order = SpiOrderResolver.resolveOrder(spi);
            // Since SPI is lazy initialized in ServiceLoader, we use online sort algorithm here.
            SpiOrderResolver.insertSorted(orderWrappers, spi, order);
            RecordLog.debug("[SpiLoader] Found {} SPI: {} with order {}", clazz.getSimpleName(),
                    spi.getClass().getCanonicalName(), order);
        }
        List<T> list = new ArrayList<>(orderWrappers.size());
        for (int i = 0; i < orderWrappers.size(); i++) {
            list.add(orderWrappers.get(i).spi);
        }
        return list;
    } catch (Throwable t) {
        RecordLog.error("[SpiLoader] ERROR: loadPrototypeInstanceListSorted failed", t);
        t.printStackTrace();
        return new ArrayList<>();
    }
}

​上面两段就是Sentinel构建责任链的核心代码。整个代码还是非常漂亮,非常清晰。

五、结束语

​问:既然遇到了这个问题,那最后怎么解决呢?

​答:不解决,继续用。

12-26 21:56