Note that it is documented that parallel streams use a ForkJoinPool and that ForkJoinPool and Semaphore belong to the same package - java.util.concurrent (so one would expect that they interoperate nicely)./* * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christian-fries.de. * * Created on 03.05.2014 */package net.finmath.experiments.concurrency;import java.util.concurrent.Semaphore;import java.util.stream.IntStream;/** * This is a test of Java 8 parallel streams. * * The idea behind this code is that the Semaphore concurrentExecutions * should limit the parallel executions of the outer forEach (which is an * <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example: * the parallel executions of the outer forEach should be limited due to a * memory constrain). * * Inside the execution block of the outer forEach we use another parallel stream * to create an inner forEach. The number of concurrent * executions of the inner forEach is not limited by us (it is however limited by a * system property "java.util.concurrent.ForkJoinPool.common.parallelism"). * * Problem: If the semaphore is used AND the inner forEach is active, then * the execution will be DEADLOCKED. * * Note: A practical application is the implementation of the parallel * LevenbergMarquardt optimizer in * {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html} * In one application the number of tasks in the outer and inner loop is very large (>1000) * and due to memory limitation the outer loop should be limited to a small (5) number * of concurrent executions. * * @author Christian Fries */public class ForkJoinPoolTest { public static void main(String[] args) { // Any combination of the booleans works, except (true,true) final boolean isUseSemaphore = true; final boolean isUseInnerStream = true; final int numberOfTasksInOuterLoop = 20; // In real applications this can be a large number (e.g. > 1000). final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000). final int concurrentExecusionsLimitInOuterLoop = 5; final int concurrentExecutionsLimitForStreams = 10; final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop); System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams)); System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism")); IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> { if(isUseSemaphore) { concurrentExecutions.acquireUninterruptibly(); } try { System.out.println(i + "" + concurrentExecutions.availablePermits() + "" + Thread.currentThread()); if(isUseInnerStream) { runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop); } else { try { Thread.sleep(10*numberOfTasksInInnerLoop); } catch (Exception e) { } } } finally { if(isUseSemaphore) { concurrentExecutions.release(); } } }); System.out.println("D O N E"); } /** * Runs code in a parallel forEach using streams. * * @param numberOfTasksInInnerLoop Number of tasks to execute. */ private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) { IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> { try { Thread.sleep(10); } catch (Exception e) { } }); }}推荐答案每当您将问题分解为任务时,这些任务可能会被其他任务阻塞,并尝试在有限线程池中执行它们,您就是面临池引起的死锁的风险.请参阅Java 并发实践 8.1.Any time you are decomposing a problem into tasks, where those tasks could be blocked on other tasks, and try and execute them in a finite thread pool, you are at risk for pool-induced deadlock. See Java Concurrency in Practice 8.1.这无疑是一个错误——在您的代码中.您正在用将阻塞等待同一池中其他任务结果的任务填满 FJ 池.有时你很幸运,事情不会死锁(就像不是所有的锁顺序错误都会导致死锁),但从根本上说,你在这里滑冰.This is unquestionably a bug -- in your code. You're filling up the FJ pool with tasks that are going to block waiting for the results of other tasks in the same pool. Sometimes you get lucky and things manage to not deadlock (just like not all lock-ordering errors result in deadlock all the time), but fundamentally you're skating on some very thin ice here. 这篇关于在嵌套的 Java 8 并行流操作中使用信号量可能会死锁.这是一个错误吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
08-03 21:36
查看更多