本文介绍了多线程读取大量文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我仍在围绕Java中的并发性工作。我理解(如果您订阅了OO Java 5并发模型),您实现了任务可调用一个 run() call()方法(分别),你应该尽可能多地并行化方法尽可能。

I'm still in the process of wrapping my brain around how concurrency works in Java. I understand that (if you're subscribing to the OO Java 5 concurrency model) you implement a Task or Callable with a run() or call() method (respectively), and it behooves you to parallelize as much of that implemented method as possible.

但我仍然不理解Java中并发编程的固有内容:

But I'm still not understanding something inherent about concurrent programming in Java:


  • 如何将任务 run()方法分配给适当数量的并发工作要执行吗?

  • How is a Task's run() method assigned the right amount of concurrent work to be performed?

作为一个具体的例子,如果我有一个I / O绑定 readMobyDick()怎么办? 将Herman Melville的 Moby Dick 的全部内容从本地系统上的文件读入内存的方法。我只想说我想要这个 readMobyDick()方法并发并由3个线程处理,其中:

As a concrete example, what if I have an I/O-bound readMobyDick() method that reads the entire contents of Herman Melville's Moby Dick into memory from a file on the local system. And let's just say I want this readMobyDick() method to be concurrent and handled by 3 threads, where:


  • 线程#1将书籍的前1/3读入内存

  • 线程#2将书籍的第二个1/3读入内存

  • 线程#3将书籍的最后1/3读入内存

我需要吗?将Moby Dick分成三个文件并将它们分别传递给自己的任务,或者只是从已实现的运行中调用 readMobyDick()( )方法和(某种程度上)执行者知道如何在线程之间打破工作。

Do I need to chunk Moby Dick up into three files and pass them each to their own task, or do I I just call readMobyDick() from inside the implemented run() method and (somehow) the Executor knows how to break the work up amongst the threads.

我是一个非常直观的学习者,因此非常感谢任何正确方法的代码示例!谢谢!

I am a very visual learner, so any code examples of the right way to approach this are greatly appreciated! Thanks!

推荐答案

你可能偶然选择了并行活动的最糟糕的例子!

You have probably chosen by accident the absolute worst example of parallel activities!

从单个机械磁盘并行读取实际上比使用单个线程读取要慢,因为事实上,当每个线程轮到运行时,你会将机械磁头弹回到磁盘的不同部分。这最好保留为单线程活动。

Reading in parallel from a single mechanical disk is actually slower than reading with a single thread, because you are in fact bouncing the mechanical head to different sections of the disk as each thread gets its turn to run. This is best left as a single threaded activity.

让我们再举一个例子,它类似于你的但实际上可以提供一些好处:假设我想搜索事件在一个巨大的单词列表中的某个单词(这个列表甚至可能来自磁盘文件,但就像我说的,由单个线程读取)。假设我可以在你的例子中使用3个线程,每个线程搜索巨大单词列表的1/3,并保留一个本地计数器,显示搜索单词出现的次数。

Let's take another example, which is similar to yours but can actually offer some benefit: assume I want to search for the occurrences of a certain word in a huge list of words (this list could even have come from a disk file, but like I said, read by a single thread). Assume I can use 3 threads like in your example, each searching on 1/3rd of the huge word list and keeping a local counter of how many times the searched word appeared.

在这种情况下,您需要将列表分为3个部分,将每个部分传递给其类型实现Runnable的不同对象,并在 run 方法中实现搜索。

In this case you'd want to partition the list in 3 parts, pass each part to a different object whose type implements Runnable and have the search implemented in the run method.

运行时本身不知道如何进行分区或类似的事情,你必须自己指定。还有许多其他分区策略,每个都有自己的优点和缺点,但我们现在可以坚持使用静态分区。

The runtime itself has no idea how to do the partitioning or anything like that, you have to specify it yourself. There are many other partitioning strategies, each with its own strengths and weaknesses, but we can stick to the static partitioning for now.

让我们看一些代码:

class SearchTask implements Runnable {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public void run() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
     }

     public int getCounter() { return localCounter; }
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words
// let's assume you have 30000 words

// create tasks
SearchTask task1 = new SearchTask(0, 10000, words, "John");
SearchTask task2 = new SearchTask(10000, 20000, words, "John");
SearchTask task3 = new SearchTask(20000, 30000, words, "John");

// create threads for each task
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task2);
Thread t3 = new Thread(task3);

// start threads
t1.start();
t2.start();
t3.start();

// wait for threads to finish
t1.join();
t2.join();
t3.join();

// collect results
int counter = 0;
counter += task1.getCounter();
counter += task2.getCounter();
counter += task3.getCounter();

这应该很好用。请注意,在实际情况下,您将构建更通用的分区方案。您也可以使用 ExecutorService 并实现 Callable 而不是 Runnable 如果你想返回一个结果。

This should work nicely. Note that in practical cases you would build a more generic partitioning scheme. You could alternatively use an ExecutorService and implement Callable instead of Runnable if you wish to return a result.

所以使用更高级结构的另一个例子是:

So an alternative example using more advanced constructs:

class SearchTask implements Callable<Integer> {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public Integer call() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
         return localCounter;
     }
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words
// let's assume you have 30000 words

// create tasks
List<Callable> tasks = new ArrayList<Callable>();
tasks.add(new SearchTask(0, 10000, words, "John"));
tasks.add(new SearchTask(10000, 20000, words, "John"));
tasks.add(new SearchTask(20000, 30000, words, "John"));

// create thread pool and start tasks
ExecutorService exec = Executors.newFixedThreadPool(3);
List<Future> results = exec.invokeAll(tasks);

// wait for tasks to finish and collect results
int counter = 0;
for(Future f: results) {
    counter += f.get();
}

这篇关于多线程读取大量文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-06 03:08