通过CyclicBarrier+线程池的方式,同步的方式分页分批次并发高效处理逻辑,将总页数分成多个批次并发执行每页逻辑,每个批次处理DO_MAX_SIZE个页,每个批次等待DO_MAX_SIZE个页数处理完成后才执行下一个批次,并等待所有批次执行完成才处理后续逻辑
以下代码只需要在TODO处添加上自己的逻辑就可以达到处理效果
/**
* 线程池初始化,也可用其它初始化方式
*/
private ExecutorService threadPool = new ThreadPoolExecutor(, , 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); /**
* 最大并发数量
*/
final static int DO_MAX_SIZE = ; /**
* 线程批量并发执行数据,分页分线程处理,每页处理分配一个线程处理相关逻辑
* 最大并发线程{@code DO_MAX_SIZE}
* @param total 总条数
* @author wl
* @date 2019-08-20 18:21
*/
private void executorMultiThreadPool (Long total) throws Exception {
int startPage = , endPage = , pageSize = ;
// 按每页pageSize条算出总页数
Long pageTotal = (total + pageSize - ) / pageSize;
// 循环总页数然后按一组DO_MAX_SIZE个页来并发处理
for (int i = ; i <= pageTotal; i++) {
endPage++;
if (i % DO_MAX_SIZE != && i != pageTotal) {
continue;
}
long timeStart = System.currentTimeMillis();
int cbNum = endPage - startPage + ;
// 参数parties+1是因为给主线程添加了await,需要等待整个批次完成才执行后续逻辑;
final CyclicBarrier cb = new CyclicBarrier(cbNum + );
// 当前批次下需要处理的页数
for (int page = startPage; page <= endPage; page++) {
final int pageNum = page;
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
// TODO: 处理当前页 pageNum 逻辑,此处可以查询pageNumb的数据,然后处理相关逻辑
} catch (Exception e) {
// TODO: 异常处理
} finally {
try {
// 设置等待
cb.await();
} catch (Exception e) {
logger.info(">>>>>Finally处理异常", e);
}
}
}
});
}
// 等待全部处理
try {
// 等待本批次全部处理完成
cb.await();
} catch (Exception e) {
throw e;
} finally {
logger.info(">>>>>线程池批次执行-批次完成用时[{}]", System.currentTimeMillis() - timeStart);
}
// 本批次处理完后,将结束页赋给开始
startPage = endPage + ;
}
logger.info(">>>>>线程池处理所有批次-执行完成");
}