😊 @ 作者: 一恍过去
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CCyclicBarrier实战应用——批量数据多线程协调异步处理(主线程执行事务回滚)
⏱️ @ 创作时间: 2023年12月03日
前言
通过CyclicBarrier
与CountDownLatch
配合开启多个子线程,由子线程完成数据的处理,最后由主线程进行数据库操作,由主线程进行事务的提交或者回滚;
如果需要由子线程处理完数据,并且由子线程进行事务提交或者回滚,参考:https://lhz1219.blog.csdn.net/article/details/134630794
1、概述
CyclicBarrier是一个同步器工具类,用来协调多个线程之间的同步,通过await()
进行阻塞,直到所有的线程都执行await()
后,所有的线程再继续执行。
2、方法说明:
- public viod await() /int await(long timeout,TimeUnit unit) :使当前线程一直等待,除非线程被中断或超出了指定的等待时间。
当线程会被阻塞,直到下面的情况之一发生才会返回:- 如果每执行一次
await()
计数加一,直到达到初始值。 - 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
- 如果超出了指定的等待时间,则该方法根本不会再进行阻塞。
- 如果每执行一次
3、代码实例
有用到hutool的工具包,pom如下:
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.0.7</version>
</dependency>
Controller:
@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {
@Resource
private CyclicService cyclicService;
/**
* CyclicBarrier实现多线程(多个子线程)异步处理数据,再主线程回归处理
*
* @return
*/
@GetMapping("/cyclic/handleData")
public String countDownHandleData() {
cyclicService.handleData();
return "success";
}
Sevice:
@Service
@Slf4j
public class CountDownService {
@Resource
private TestMapper testMapper;
@Resource
private ApplicationContext applicationContext;
/**
* 实现多线程(多个子线程)异步处理数据,再主线程回归处理
*/
@Transactional(rollbackFor = Exception.class)
public void handleData() {
List<TestEntity> testList = getData();
AtomicBoolean errorTag = new AtomicBoolean(false);
long start = System.currentTimeMillis();
// 使用多线程对list集合进行分批次处理,实际情况可以根据具体耗时来决定
// 比如:一万条数据,每条单独处理需要200ms,按批次一个线程处理200条数据,分为50个批次,实际情况根据业务来定
// 需要使用hutool工具类
List<List<TestEntity>> splitList = CollUtil.split(testList, 200);
// 设置CyclicBarrier大小,需要比实际子线程+1,业务主线程需要进行阻塞
CyclicBarrier cyclicBarrier = new CyclicBarrier(splitList.size() + 1);
// 简单创建一个线程池,这里的线程池可以自定义,为了方便直接使用
ExecutorService executorService = Executors.newCachedThreadPool();
splitList.forEach(list -> {
// 线程处理
executorService.execute(() -> {
try {
for (TestEntity entity : list) {
if (errorTag.get()) {
break;
}
// 对实体类的业务处理,此处模拟业务处理,耗时50ms
ThreadUtil.sleep(50);
// 模拟数据处理中,出现了异常
if (entity.getCount().equals(2000)) {
throw new RuntimeException("子线程执行异常");
}
}
} catch (Exception e) {
log.error("子线程异常:{}", e.getMessage(), e);
errorTag.set(true);
} finally {
// 子线程中,业务处理完成后,利用cyclicBarrier的特性,计数器加一操作
try {
cyclicBarrier.await();
} catch (Exception e) {
errorTag.set(true);
}
}
log.info("子线程执行完成");
});
});
executorService.shutdown();
try {
// 主线程阻塞,直到子线程执行完成
cyclicBarrier.await();
// 可以设置最大阻塞时间,防止线程一直挂起,当子线程时间大于当前时间后会抛出TimeOut异常
// cyclicBarrier.await(5, TimeUnit.SECONDS);
// 模拟执行主线程业务逻辑耗时,比如insert、update等
ThreadUtil.sleep(20);
} catch (Exception e) {
errorTag.set(true);
}
long end = System.currentTimeMillis();
log.info("数据处理完成,耗时:{}", (end - start) / 1000);
// 如果出现异常
if (errorTag.get()) {
throw new RuntimeException("异步业务执行出现异常");
}
log.info("主线程执行完成");
}
/**
* 模拟解析的excel等文件的数据
*/
private List<TestEntity> getData() {
List<TestEntity> list = new ArrayList<>();
// 此处模拟一万条数据
for (int i = 1; i <= 10000; i++) {
TestEntity entity = new TestEntity();
entity.setId(new Random().nextInt(999999999));
entity.setCount(i);
entity.setCommodityCode("code-" + i);
entity.setMoney(new Random().nextInt(1000000));
entity.setUserId("user-" + i);
list.add(entity);
}
return list;
}
}