靈感來源於一個豬隊友給個人題目java
看到這個,我抓住的關鍵字是:任何子任務失敗,要通知全部子任務執行取消邏輯。dom
這不就是消息廣播嗎?觀察者模式!ide
首先是收聽者測試
package com.example.broadcast; /** * 每一個節點便是廣播者,也是收聽者 */ public interface Listener { /** * 設置調度中心 */ void setCenter(DispatchCenter center); /** * 主動通知其它收聽者 */ void notice(String msg); /** * 本身收到通知的處理邏輯 * @param msg */ void whenReceived(String msg); /** * 收聽者標誌:惟一 * @return */ String identify(); }
而後是調度中心this
package com.example.broadcast; /** * 調度中心 */ public interface DispatchCenter { /** * 廣播 * @param own 廣播的時候,要排除本身 * @param msg 廣播消息 */ void broadcast(String own, String msg); /** * 添加收聽者 * @param listener */ void addListener(Listener listener); }
調度中心實現spa
package com.example.broadcast; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class DispatchCenterImpl implements DispatchCenter { private static final Map<String, Listener> MAP = new ConcurrentHashMap<>(); @Override public void broadcast(String own, String msg) { MAP.forEach((k,v) -> { // 不用給本身發通知 if (!k.equals(own)){ v.whenReceived(msg); } }); } @Override public void addListener(Listener listener) { listener.setCenter(this); MAP.put(listener.identify(), listener); } }
剩下三個收聽者線程
package com.example.broadcast; import java.util.UUID; public class ListenerA implements Listener { private DispatchCenter center; private String identify; public ListenerA() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "收到消息:" + msg); } @Override public String identify() { return identify; } }
B和C除了類名不同,其餘都同樣,再也不贅述。目錄以下3d
package com.example.broadcast; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { DispatchCenter center = new DispatchCenterImpl(); ListenerA listenerA = new ListenerA(); ListenerB listenerB = new ListenerB(); ListenerC listenerC = new ListenerC(); center.addListener(listenerA); center.addListener(listenerB); center.addListener(listenerC); ExecutorService executorService = Executors.newFixedThreadPool(3); // A觸發1條事件 executorService.submit(() -> { int i = 1; while (i > 0){ listenerA.notice(listenerA.getClass().getName() + "說:我有" + new Random().nextInt(1000000) + "元"); i--; } }); // B觸發2條事件 executorService.submit(() -> { int i = 2; while (i > 0){ listenerB.notice(listenerB.getClass().getName() + "說:我有" + new Random().nextInt(1000000) + "元"); i--; } }); // C觸發3條事件 executorService.submit(() -> { int i = 3; while (i > 0){ listenerC.notice(listenerC.getClass().getName() + "說:我有" + new Random().nextInt(1000000) + "元"); i--; } }); executorService.shutdown(); } }
輸出:code
當其中的B節點,發生了錯誤,除了把本身處理好以外blog
1. 向調度中心發送廣播請求,並攜帶須要的消息
2. 調度中心遍歷收聽者,挨個通知(執行)每個收聽者接受消息的邏輯
由於題目要求,【快速取消】全部子任務
關於線程中止的方法也有不少:
1. 優雅退出run方法
2. 暴力stop
3. run方法拋出異常
若是說要求,A異常了,B和C收到消息以後,線程當即中止,不能有一點遲疑,說實話我還沒想到該怎麼作。由於你要知道,實際上的任務的run方法內部,不太多是個while循環,人家可能就是個順序執行,因此中止標誌位的方式,並不適用。
而其它的方法,我也沒想到很好的。我只能寫個按照標誌位中止的「玩具」
修改三個收聽者代碼和測試類
package com.example.broadcast; import lombok.SneakyThrows; import java.util.Random; import java.util.UUID; public class ListenerA implements Listener,Runnable { private DispatchCenter center; private String identify; public ListenerA() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "收到消息:" + msg); } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { // 5秒以後,模擬發生異常 Thread.sleep(5000); notice(this.getClass().getName() + "說:我有" + new Random().nextInt(1000000) + "元"); System.out.println(this.getClass().getName() + "程序異常,並已經傳播了消息..."); } }
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerB implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean stopFlag = false; public ListenerB() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 中止當前線程 stopFlag = true; } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { while (!stopFlag){ Thread.sleep(1000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在執行任務"); } System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B Dead"); } }
package com.example.broadcast; import lombok.SneakyThrows; import java.util.UUID; public class ListenerC implements Listener,Runnable { private DispatchCenter center; private String identify; private volatile Boolean stopFlag = false; public ListenerC() { identify = UUID.randomUUID().toString(); } @Override public void setCenter(DispatchCenter center) { this.center = center; } @Override public void notice(String msg) { center.broadcast(identify, msg); } @Override public void whenReceived(String msg) { System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg); // 中止當前線程 stopFlag = true; } @Override public String identify() { return identify; } @SneakyThrows @Override public void run() { while (!stopFlag){ Thread.sleep(1000); System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在執行任務"); } System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C Dead"); } }
測試
package com.example.broadcast; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { DispatchCenter center = new DispatchCenterImpl(); ListenerA listenerA = new ListenerA(); ListenerB listenerB = new ListenerB(); ListenerC listenerC = new ListenerC(); center.addListener(listenerA); center.addListener(listenerB); center.addListener(listenerC); ExecutorService executorService = Executors.newFixedThreadPool(3); // A executorService.submit(listenerA); // B executorService.submit(listenerB); // C executorService.submit(listenerC); executorService.shutdown(); } }
這個是這麼多年第一個發到首頁的,就是想問下你們怎樣解決這種狀況下的線程中止問題