灵感来源于一个猪队友给我的题目
看到这个,我抓住的关键字是:任何子任务失败,要通知所有子任务执行取消逻辑。
这不就是消息广播吗?观察者模式!
干活
首先是收听者
package com.example.broadcast; /** * 每个节点即是广播者,也是收听者 */ public interface Listener { /** * 设置调度中心 */ void setCenter(DispatchCenter center); /** * 主动通知其它收听者 */ void notice(String msg); /** * 自己收到通知的处理逻辑 * @param msg */ void whenReceived(String msg); /** * 收听者标志:唯一 * @return */ String identify(); }
然后是调度中心
package com.example.broadcast; /** * 调度中心 */ public interface DispatchCenter { /** * 广播 * @param own 广播的时候,要排除自己 * @param msg 广播消息 */ void broadcast(String own, String msg); /** * 添加收听者 * @param listener */ void addListener(Listener listener); }
调度中心实现
package com.example.broadcast; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class DispatchCenterImpl implements DispatchCenter { private static final Map<String, Listener> MAP = new ConcurrentHashMap<>(); @Override public void broadcast(String own, String msg) { MAP.forEach((k,v) -> { // 不用给自己发通知 if (!k.equals(own)){ v.whenReceived(msg); } }); } @Override public void addListener(Listener listener) { listener.setCenter(this); MAP.put(listener.identify(), listener); } }
剩下三个收听者
package com.example.broadcast; import java.util.UUID; public class ListenerA implements Listener { private DispatchCenter center; private String identify; public ListenerA() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "收到消息:" + msg); } @Override public String identify() { return identify; } }
B和C除了类名不一样,其他都一样,不再赘述。目录如下
测试
package com.example.broadcast; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { DispatchCenter center = new DispatchCenterImpl(); ListenerA listenerA = new ListenerA(); ListenerB listenerB = new ListenerB(); ListenerC listenerC = new ListenerC(); center.addListener(listenerA); center.addListener(listenerB); center.addListener(listenerC); ExecutorService executorService = Executors.newFixedThreadPool(3); // A触发1条事件 executorService.submit(() -> { int i = 1; while (i > 0){ listenerA.notice(listenerA.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元"); i--; } }); // B触发2条事件 executorService.submit(() -> { int i = 2; while (i > 0){ listenerB.notice(listenerB.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元"); i--; } }); // C触发3条事件 executorService.submit(() -> { int i = 3; while (i > 0){ listenerC.notice(listenerC.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元"); i--; } }); executorService.shutdown(); } }
输出:
流程图
关于停止任务
修改三个收听者代码和测试类
package com.example.broadcast; import lombok.SneakyThrows; import java.util.Random; import java.util.UUID; public class ListenerA implements Listener,Runnable { private DispatchCenter center; private String identify; public ListenerA() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "收到消息:" + msg); } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { // 5秒之后,模拟发生异常 Thread.sleep(5000); notice(this.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元"); System.out.println(this.getClass().getName() + "程序异常,并已经传播了消息..."); } }
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerB implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean stopFlag = false; public ListenerB() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 停止当前线程 stopFlag = true; } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { while (!stopFlag){ Thread.sleep(1000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在执行任务"); } System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B Dead"); } }
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerC implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean stopFlag = false; public ListenerC() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 停止当前线程 stopFlag = true; } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { while (!stopFlag){ Thread.sleep(1000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在执行任务"); } System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C Dead"); } }
测试
package com.example.broadcast; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { DispatchCenter center = new DispatchCenterImpl(); ListenerA listenerA = new ListenerA(); ListenerB listenerB = new ListenerB(); ListenerC listenerC = new ListenerC(); center.addListener(listenerA); center.addListener(listenerB); center.addListener(listenerC); ExecutorService executorService = Executors.newFixedThreadPool(3); // A executorService.submit(listenerA); // B executorService.submit(listenerB); // C executorService.submit(listenerC); executorService.shutdown(); } }
这个是这么多年第一个发到首页的,就是想问下大家怎样解决这种情况下的线程停止问题