问题描述
我想尝试Java 8中的ForkJoinPool,所以我写了一个小程序来搜索名称在给定目录中包含特定关键字的所有文件.
I wanted to try out ForkJoinPool in Java 8 so i wrote a small program for searching all the files whose name contains a specific keyword in a given directory.
程序:
public class DirectoryService {
public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
List<String> files = pool.invoke(task);
pool.shutdown();
System.out.println("Total no of files with hello" + files.size());
}
}
class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private String path;
public FileSearchRecursiveTask(String path) {
this.path = path;
}
@Override
protected List<String> compute() {
File mainDirectory = new File(path);
List<String> filetedFileList = new ArrayList<>();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
if(mainDirectory.isDirectory()) {
System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
if(mainDirectory.canRead()) {
File[] fileList = mainDirectory.listFiles();
for(File file : fileList) {
System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
if(file.isDirectory()) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
recursiveTasks.add(task);
task.fork();
} else {
if (file.getName().contains("hello")) {
System.out.println(file.getName());
filetedFileList.add(file.getName());
}
}
}
}
for(FileSearchRecursiveTask task : recursiveTasks) {
filetedFileList.addAll(task.join());
}
}
return filetedFileList;
}
}
当目录中没有太多子目录和文件时,该程序运行良好,但是如果目录确实很大,则会抛出OutOfMemoryError.
This program works fine when directory doesn't have too many sub-directories and files but if its really big then it throws OutOfMemoryError.
我的理解是最大线程数(包括补偿线程)是有界的,那么为什么会出现此错误?我的程序中缺少任何内容吗?
My understanding is that max number of threads (including compensation threads) are bounded so why their is this error? Am i missing anything in my program?
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
推荐答案
您不应分叉所有新任务.基本上,只要有可能另一个工作线程可以接替该分叉的工作并在本地进行评估,就应该进行分叉.然后,一旦您完成了一项任务,就不要马上拨打join()
.虽然底层框架将启动补偿线程以确保您的工作将继续进行,而不是仅阻塞所有线程来等待子任务,但这将创建大量可能超出系统功能的线程.
You should not fork new tasks beyond all recognition. Basically, you should fork as long as there’s a chance that another worker thread can pick up the forked job and evaluate locally otherwise. Then, once you have forked a task, don’t call join()
right afterwards. While the underlying framework will start compensation threads to ensure that your jobs will proceed instead of just having all threads blocked waiting for a sub-task, this will create that large amount of threads that may exceed the system’s capabilities.
这是您代码的修订版:
public class DirectoryService {
public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR"));
List<String> files = task.invoke();
System.out.println("Total no of files with hello " + files.size());
}
}
class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private static final int TARGET_SURPLUS = 3;
private File path;
public FileSearchRecursiveTask(File file) {
this.path = file;
}
@Override
protected List<String> compute() {
File directory = path;
if(directory.isDirectory() && directory.canRead()) {
System.out.println(Thread.currentThread() + " - Directory is " + directory.getName());
return scan(directory);
}
return Collections.emptyList();
}
private List<String> scan(File directory)
{
File[] fileList = directory.listFiles();
if(fileList == null || fileList.length == 0) return Collections.emptyList();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
List<String> filteredFileList = new ArrayList<>();
for(File file: fileList) {
System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
if(file.isDirectory())
{
if(getSurplusQueuedTaskCount() < TARGET_SURPLUS)
{
FileSearchRecursiveTask task = new FileSearchRecursiveTask(file);
recursiveTasks.add(task);
task.fork();
}
else filteredFileList.addAll(scan(file));
}
else if(file.getName().contains("hello")) {
filteredFileList.add(file.getAbsolutePath());
}
}
for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
FileSearchRecursiveTask task = recursiveTasks.get(ix);
if(task.tryUnfork()) task.complete(scan(task.path));
}
for(FileSearchRecursiveTask task: recursiveTasks) {
filteredFileList.addAll(task.join());
}
return filteredFileList;
}
}
进行处理的方法已被分解为接收目录作为参数的方法,因此我们能够在本地将其用于不一定与FileSearchRecursiveTask
实例相关联的任意目录.
The method doing the processing has been factored out into a method receiving the directory as parameter, so we are able to use it locally for arbitrary directories not necessarily being associated with a FileSearchRecursiveTask
instance.
然后,该方法使用 getSurplusQueuedTaskCount()
来确定尚未由其他工作线程处理的本地排队任务的数量.确保有一些有助于工作平衡.但是,如果此数目超过阈值,则将在本地完成处理,而不会派生更多工作.
Then, the method uses getSurplusQueuedTaskCount()
to determine the number of locally enqueued tasks which have not been picked up by other worker threads. Ensuring that there are some helps work balancing. But if this number exceeds the threshold, the processing will be done locally without forking more jobs.
在本地处理之后,它遍历任务并使用 tryUnfork()
来标识尚未被其他工作线程窃取的作业,并在本地进行处理.向后迭代以从最年轻的工作开始,这增加了找到工作的机会.
After the local processing, it iterates over the tasks and uses tryUnfork()
to identify jobs which have not been stolen by other worker threads and process them locally. Iterating backwards to start this with the youngest jobs raises the chances to find some.
此后,它包含所有正在由另一个工作线程完成或正在处理的所有子工作的join()
.
Only afterwards, it join()
s with all sub-jobs which are now either, completed or currently processed by another worker thread.
请注意,我将启动代码更改为使用默认池.这使用了"CPU核心数"减去一个工作线程,再加上启动线程,即本例中的main
线程.
Note that I changed the initiating code to use the default pool. This uses "number of CPU cores" minus one worker threads, plus the initiating thread, i.e. the main
thread in this example.
这篇关于ForkJoinPool-为什么程序抛出OutOfMemoryError?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!