我有一个包含数千行的数据文件。我正在读取它们并将它们保存在数据库中。我想在说50的批次中多线程处理此过程
行。在读取文件时,有10行被提交给ExecutorService。

ExecutorService executor = Executors.newFixedThreadPool(5);`

我可以在一段时间内循环执行以下操作,直到行结束...。
 Future<Integer> future = executor.submit(callableObjectThatSaves10RowsAtOneTime);

但是,如果处理10行需要花费时间,我不想将整个文件读入内存。我只想提交5,直到其中一个线程返回,然后再提交下一个。

假设一个线程需要20秒来保存10条记录,我不希望ExecutorService馈入数千行,因为读取过程仍在继续读取并提交给ExecutorService

实现此目标的最佳方法是什么?

最佳答案

您可以使用LinkedList<Future<?>>来存储期货,直到达到预定大小为止。以下是一些基本代码,可以帮助您实现大部分目标:

int threads = 5;
ExecutorService service = Executors.newFixedThreadPool(threads);
LinkedList<Future<?>> futures = new LinkedList<>();

//As long as there are rows to save:
while(moreRowsLeft()){
    //dump another callable onto the queue:
    futures.addLast(service.submit(new RowSavingCallable());

    //if the queue is "full", wait for the next one to finish before
    //reading any more of the file:
    while(futures.size() >= 2*threads) futures.removeFirst().get();
}

//All rows have been submitted but some may still be writing to the DB:
for(Future<?> f : futures) future.get();

//All rows have been saved at this point

您可能想知道为什么我允许期货数量达到机器上线程数量的两倍-这使得执行程序服务线程可以在数据库保存上进行工作,而主线程正在创建更多工作。这可以帮助隐藏与工作线程正在忙于执行数据库写入时使更多可调用项可用于处理有关的任何I / O成本。

07-24 17:07
查看更多