我有一个程序可以处理通过网络传入的大量数据流(不是java.util.stream
,而是InputStream
)。流由对象组成,每个对象都有一种子流标识符。现在,整个处理是在单个线程中完成的,但是这会占用大量CPU时间,并且每个子流都可以轻松地独立进行处理,因此我正在考虑对其进行多线程处理。
但是,每个子流都需要保持大量的状态,包括各种缓冲区,哈希映射等。由于子流是彼此独立的,因此没有特殊的原因使其并发或同步。此外,每个子流都要求按照其到达的顺序处理其对象,这意味着每个子流可能应该有一个线程(但是可能有一个线程处理多个子流)。
我正在考虑几种解决方法,但它们并不十分优雅。
为所有任务创建一个ThreadPoolExecutor
。每个任务将包含下一个要处理的对象以及对Processor
实例的引用,该实例保留所有状态。这将确保必要的事前发生关系,从而确保处理线程将看到此子流的最新状态。据我所知,这种方法无法确保在同一线程中处理同一子流的下一个对象。此外,还需要保证对象将按照它们进入的顺序进行处理,这将需要Processor
对象的额外同步,从而引入不必要的延迟。
手动创建多个单线程执行程序,以及一种将子流标识符映射到执行程序的哈希映射。这种方法需要手动管理执行器,在新的子流开始或结束时创建或关闭执行器,并在它们之间相应地分配任务。
创建一个自定义执行程序,该执行程序处理任务的特殊子类,每个子类都有一个子流ID。该执行程序将其用作提示,以使用与上一个具有相同ID的线程相同的线程来执行此任务。但是,我看不到实现这种执行程序的简便方法。不幸的是,似乎不可能扩展任何现有的执行程序类,并且从头实现执行程序有点过头了。
创建单个ThreadPoolExecutor
,而不是为每个传入的对象创建任务,而是为每个子流创建单个长期运行的任务,该任务将阻塞并发队列,等待下一个对象。然后根据对象的子流ID将对象放入队列中。这种方法需要与子流一样多的线程,因为任务将被阻塞。子流的预期数量约为30-60,因此可以接受。
或者,继续执行第4步,但限制线程数,将多个子流分配给单个任务。这是2到4之间的混合体。据我所知,这是其中的最佳方法,但仍需要在任务之间进行某种手动子流分配,并需要一些方法来关闭额外的任务,例如子流结束。
确保每个子流在自己的线程中进行处理而又没有大量容易出错的代码的最佳方法是什么?这样以下伪代码将起作用:
// loop {
Item next = stream.read();
int id = next.getSubstreamID();
Processor processor = getProcessor(id);
SubstreamTask task = new SubstreamTask(processor, next, id);
executor.submit(task); // This makes sure that the task will
// be executed in the same thread as the
// previous task with the same ID.
// } // loop
最佳答案
我建议使用一组单线程执行器。如果可以为子流设计一致的哈希策略,则可以将子流映射到各个线程。例如
final ExecutorsService[] es = ...
public void submit(int id, Runnable run) {
es[(id & 0x7FFFFFFF) % es.length].submit(run);
}
密钥可以是
String
或long
,但是可以使用某种方式来标识子流。如果您知道特定的子流非常昂贵,则可以为其分配一个专用线程。