一、前面我们简单的说了一下,Python中的协程原理。这里补充Java的协程实现过程。有需要可以查看python之协程。
二、Java协程,其实做Java这么久我也没有怎么听过Java协程的东西,但是一直有有听到微线程/协程的概念,这不在学习Python的时候接触到了协程一词。然后返回来去了解Java的协程问题,但是看了很多资料,发现官网以及很多地方都没有涉及到协程的东西,没有办法,只能通过强大的社区来学习协程的相关东西。
三、这里主要关注的是:quasar。
1)协程的目的:当我们在使用多线程的时候,如果存在长时间的I/O操作。这个时候线程一直处于阻塞状态,如果线程很多的时候,会存在很多线程处于空闲状态,造成了资源应用不彻底。相对的协程不一样了,在单线程中多个任务来回自行如果出现长时间的I/O操作,让其让出目前的协程调度,执行下一个任务。当然可能所有任务,全部卡在同一个点上,但是这只是针对于单线程而言,当所有数据正常返回时,会同时处理当前的I/O操作。
2)多线程测试(这里使用100万个线程,来测试内存占用)
for (int i = 0; i < 1000000; i++) {
new Thread(() -> {
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
结果:
直接卡死了,内存溢出
可想而知,如果存在100万个线程,开销是有多大。
3)协程测试
a、阿里云搜索到的依赖包
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.7.9</version>
<classifier>jdk8</classifier>
</dependency>
b、测试内存占用量
public static void main(String[] args) throws Exception {
//使用阻塞队列来获取结果。
LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < 1000000; i++) {
int finalI = i;
//这里的Fiber有点像Callable,可以返回数据
Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
//这里用于测试内存占用量
Fiber.sleep(100000);
System.out.println("in-" + finalI + "-" + LocalDateTime.now().format(formatter));
return finalI;
});
//开始执行
fiber.start();
//加入队列
fiberQueue.add(fiber);
}
while (true) {
//阻塞
Fiber<Integer> fiber = fiberQueue.take();
System.out.println("out-" + fiber.get() + "-" + LocalDateTime.now().format(formatter));
}
}
结果:
堆:
估计:1个G左右。
内存:
估计:1个G左右,也就是每一个fiber占用1Kb左右。
c、正常测试
修改一下参数:
public static void main(String[] args) throws Exception {
//使用阻塞队列来获取结果。
LinkedBlockingQueue<Fiber<Integer>> fiberQueue = new LinkedBlockingQueue<>();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < 10; i++) {
int finalI = i;
//这里的Fiber有点像Callable,可以返回数据
Fiber<Integer> fiber = new Fiber<>((SuspendableCallable<Integer>) () -> {
//这里用于测试内存占用量
Fiber.sleep(1000);
System.out.println("in-" + finalI + "-" + LocalDateTime.now().format(formatter));
return finalI;
});
//开始执行
fiber.start();
//加入队列
fiberQueue.add(fiber);
}
while (true) {
//阻塞
Fiber<Integer> fiber = fiberQueue.take();
System.out.println("out-" + fiber.get() + "-" + LocalDateTime.now().format(formatter));
}
}
结果:
4)可以看出并发的状态还是很不错的,当然这只是多个任务执行而已。
四、通过上面的测试,可以看出,quasar中Fiber,很像Callable的用法、而且在内存占用上面减少了很多,当然,堆的数量确实不少,但是可以接受。
还是要说明:协程的方式更多用来做I/O密集型的操作。计算密集型的还是使用线程更加合理。
五、原理,我估么着看了一下源码,复杂度很高,这里不做深究。
1、Quasar里的Fiber其实是一个continuation,他可以被Quasar定义的scheduler调度,一个continuation记录着运行实例的状态,而且会被随时中断,并且也会随后在他被中断的地方恢复。Quasar其实是通过修改bytecode来达到这个目的,所以运行Quasar程序的时候,你需要先通过java-agent在运行时修改你的代码,当然也可以在编译期间这么干。golang的内置了自己的调度器,Quasar则默认使用ForkJoinPool这个JDK7以后才有的,具有work-stealing功能的线程池来当调度器。work-stealing非常重要,因为你不清楚哪个Fiber会先执行完,而work-stealing可以动态的从其他的等等队列偷一个context过来,这样可以最大化使用CPU资源。
2、那这里你会问了,Quasar怎么知道修改哪些字节码呢,其实也很简单,Quasar会通过java-agent在运行时扫描哪些方法是可以中断的,同时会在方法被调用前和调度后的方法内插入一些continuation逻辑,如果你在方法上定义了@Suspendable注解,那Quasar会对调用该注解的方法做类似下面的事情。
3、这里假设你在方法f上定义了@Suspendable,同时去调用了有同样注解的方法g,那么所有调用f的方法会插入一些字节码,这些字节码的逻辑就是记录当前Fiber栈上的状态,以便在未来可以动态的恢复。(Fiber类似线程也有自己的栈)。在suspendable方法链内Fiber的父类会调用Fiber.park,这样会抛出SuspendExecution异常,从而来停止线程的运行,好让Quasar的调度器执行调度。这里的SuspendExecution会被Fiber自己捕获,业务层面上不应该捕获到。如果Fiber被唤醒了(调度器层面会去调用Fiber.unpark),那么f会在被中断的地方重新被调用(这里Fiber会知道自己在哪里被中断),同时会把g的调用结果(g会return结果)插入到f的恢复点,这样看上去就好像g的return是f的local variables了,从而避免了callback嵌套。
4、上面啰嗦了一大堆,其实简单点讲就是,想办法让运行中的线程栈停下来,好让Quasar的调度器介入。JVM线程中断的条件只有两个,一个是抛异常,另外一个就是return。这里Quasar就是通过抛异常的方式来达到的,所以你会看到我上面的代码会抛出SuspendExecution。但是如果你真捕获到这个异常,那就说明有问题了,所以一般会这么写。