使多個對象都有機會處理請求,從而避免了請求的發送者和接收者之間的耦合關係。將這些對象連成一條鏈,並沿着這條鏈傳遞該請求,直到有一個對象處理它爲止。多線程
當業務邏輯比較複雜且是異步請求時,咱們可使用責任鏈模型來劃分職責,分工明確,減輕業務邏輯複雜度;可使用異步模型來優化責任鏈的處理性能。異步
import lombok.Data; /** * <p> * Request * </p> * * @author: kancy * @date: 2019/10/21 9:42 **/ @Data public class Request { private long Id; private String name; }
/** * <p> * ITaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 9:40 **/ public interface ITaskProcesser { void process(Request request); }
import java.util.concurrent.LinkedBlockingQueue; /** * <p> * AbstractThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 9:40 **/ public abstract class AbstractThreadTaskProcesser extends Thread implements ITaskProcesser { /** * 任務隊列 */ private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue(); /** * 下一個處理器 */ private ITaskProcesser nextProcesser; /** * 是否中止線程 */ private volatile boolean isStop = false; /** * 線程是否運行 */ private boolean isRun = false; public AbstractThreadTaskProcesser() { setThreadTaskName(); } public AbstractThreadTaskProcesser(ITaskProcesser nextProcesser) { setThreadTaskName(); this.nextProcesser = nextProcesser; } /** * 責任鏈處理 * @param request */ @Override public void process(Request request) { // 添加請求任務 addRequestToQueue(request); // 啓動線程 if (!isRun){ synchronized (this){ if (!isRun){ super.start(); isRun = true; } } } } /** * 異步線程處理:真正的在處理業務邏輯 */ @Override public void run() { while (!isStop){ try { // 取出一個任務 Request request = queue.take(); // 處理當前任務須要作的事情 boolean result = doRequest(request); // 處理成功後,將任務請求交給下一個任務處理器 if (result && nextProcesser != null){ nextProcesser.process(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 防止子類開啓線程,有父類維護,使用時啓動任務線程 */ @Override public synchronized void start() { // 關閉入口 throw new UnsupportedOperationException("自動啓動線程,無需手動開啓"); } /** * 子類處理請求 * @param request * return boolean 是否執行下一個處理器 */ protected abstract boolean doRequest(Request request); /** * 中止處理器 */ protected void stopProcesser(){ isStop = true; } /** * 獲取任務隊列 */ protected LinkedBlockingQueue<Request> getTaskQueue() { return queue; } /** * 獲取下一個處理器 * @return */ protected ITaskProcesser getNextProcesser() { return nextProcesser; } /** * 添加請求到任務隊列 * @param request */ private void addRequestToQueue(Request request){ queue.add(request); } /** * 線程線程名稱 */ private void setThreadTaskName() { setName(String.format("%s_Thread_%s", getClass().getSimpleName(), getId())); } }
FirstThreadTaskProcesseride
/** * <p> * FirstThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 9:40 **/ public class FirstThreadTaskProcesser extends AbstractThreadTaskProcesser { public FirstThreadTaskProcesser(ITaskProcesser nextProcesser) { super(nextProcesser); } /** * 子類處理請求 * * @param request */ @Override protected boolean doRequest(Request request) { System.out.println(Thread.currentThread().getName() + " 開始處理!"); return true; } }
PrintRequestThreadTaskProcesser性能
/** * <p> * PrintRequestThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 10:06 **/ public class PrintRequestThreadTaskProcesser extends AbstractThreadTaskProcesser { public PrintRequestThreadTaskProcesser(ITaskProcesser nextProcesser) { super(nextProcesser); } /** * 子類處理請求 * * @param request */ @Override protected boolean doRequest(Request request) { System.out.println("request: " + request); return true; } }
BusinessThreadTaskProcesser測試
/** * <p> * BusinessThreadTaskProcesser * 針對耗時比較久的邏輯,能夠用線程池多線程來優化性能 * </p> * * @author: kancy * @date: 2019/10/21 12:28 **/ public class BusinessThreadTaskProcesser implements ITaskProcesser { private final ExecutorService executorService = Executors.newFixedThreadPool(10); private final ITaskProcesser nextProcesser; public BusinessThreadTaskProcesser(ITaskProcesser nextProcesser) { this.nextProcesser = nextProcesser; } @Override public void process(Request request) { BusinessRequestHandler handler = new BusinessRequestHandler(request); executorService.submit(handler); // callNextProcesser(request); } class BusinessRequestHandler implements Runnable{ private Request request; public BusinessRequestHandler(Request request) { this.request = request; } @Override public void run() { try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " 處理成功!"); callNextProcesser(request); } catch (InterruptedException e) { e.printStackTrace(); } } } private void callNextProcesser(Request request) { if (nextProcesser != null){ nextProcesser.process(request); } } }
LastThreadTaskProcesser優化
/** * <p> * FirstThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 9:40 **/ public class LastThreadTaskProcesser extends AbstractThreadTaskProcesser { /** * 子類處理請求 * * @param request */ @Override protected boolean doRequest(Request request) { System.out.println(Thread.currentThread().getName() + " 處理完成!"); return true; } }
import java.util.concurrent.TimeUnit; /** * <p> * ChainTests * </p> * * @author: kancy * @date: 2019/10/21 9:39 **/ public class ChainTests { public static void main(String[] args) { ITaskProcesser lastThreadTaskProcesser = new LastThreadTaskProcesser(); BusinessThreadTaskProcesser businessThreadTaskProcesser = new BusinessThreadTaskProcesser(lastThreadTaskProcesser); ITaskProcesser printRequestThreadTaskProcesser = new PrintRequestThreadTaskProcesser(businessThreadTaskProcesser); ITaskProcesser firstThreadTaskProcesser = new FirstThreadTaskProcesser(printRequestThreadTaskProcesser); Thread r1 = new CreateRequestThread(firstThreadTaskProcesser); r1.start(); Thread r2 = new CreateRequestThread(firstThreadTaskProcesser); r2.start(); Thread r3 = new CreateRequestThread(firstThreadTaskProcesser); r3.start(); } static class CreateRequestThread extends Thread { ITaskProcesser iTaskProcesser; public CreateRequestThread(ITaskProcesser iTaskProcesser) { this.iTaskProcesser = iTaskProcesser; } @Override public void run() { for (int i = 0; i < 10; i++) { try { iTaskProcesser.process(new Request()); TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
注:本文只作拋磚引玉,在實際開發過程當中,能夠自行擴展。this