責任鏈異步處理設計模型


簡介

Advoid coupling the sender of a reuest to its receiver by giving more than one object a chance to handle the request. Chain the receiving objects and pass the request along the chain until an object handles it.java

使多個對象都有機會處理請求,從而避免了請求的發送者和接收者之間的耦合關係。將這些對象連成一條鏈,並沿着這條鏈傳遞該請求,直到有一個對象處理它爲止。多線程


異步處理模型

當業務邏輯比較複雜且是異步請求時,咱們可使用責任鏈模型來劃分職責,分工明確,減輕業務邏輯複雜度;可使用異步模型來優化責任鏈的處理性能。異步


1.定義一個業務請求

import lombok.Data;

/**
 * <p>
 * Request
 * </p>
 *
 * @author: kancy
 * @date: 2019/10/21 9:42
 **/
@Data
public class Request {
    private long Id;
    private String name;
}


2.定義責任連接口

/**
 * <p>
 * ITaskProcesser
 * </p>
 *
 * @author: kancy
 * @date: 2019/10/21 9:40
 **/
public interface ITaskProcesser {
    void process(Request request);
}


3.定義責任鏈異步處理模板

import java.util.concurrent.LinkedBlockingQueue;

/**
 * <p>
 * AbstractThreadTaskProcesser
 * </p>
 *
 * @author: kancy
 * @date: 2019/10/21 9:40
 **/
public abstract class AbstractThreadTaskProcesser extends Thread implements ITaskProcesser {

    /**
     * 任務隊列
     */
    private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue();

    /**
     * 下一個處理器
     */
    private ITaskProcesser nextProcesser;

    /**
     * 是否中止線程
     */
    private volatile boolean isStop = false;

    /**
     * 線程是否運行
     */
    private boolean isRun = false;

    public AbstractThreadTaskProcesser() {
        setThreadTaskName();
    }

    public AbstractThreadTaskProcesser(ITaskProcesser nextProcesser) {
        setThreadTaskName();
        this.nextProcesser = nextProcesser;
    }

    /**
     * 責任鏈處理
     * @param request
     */
    @Override
    public void process(Request request) {
        // 添加請求任務
        addRequestToQueue(request);
        // 啓動線程
        if (!isRun){
            synchronized (this){
                if (!isRun){
                    super.start();
                    isRun = true;
                }
            }
        }
    }

    /**
     * 異步線程處理:真正的在處理業務邏輯
     */
    @Override
    public void run() {
        while (!isStop){
            try {
                // 取出一個任務
                Request request = queue.take();
                // 處理當前任務須要作的事情
                boolean result = doRequest(request);
                // 處理成功後,將任務請求交給下一個任務處理器
                if (result && nextProcesser != null){
                    nextProcesser.process(request);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 防止子類開啓線程,有父類維護,使用時啓動任務線程
     */
    @Override
    public synchronized void start() {
        // 關閉入口
        throw new UnsupportedOperationException("自動啓動線程,無需手動開啓");
    }

    /**
     * 子類處理請求
     * @param request
     * return boolean 是否執行下一個處理器
     */
    protected abstract boolean doRequest(Request request);

    /**
     * 中止處理器
     */
    protected void stopProcesser(){
        isStop = true;
    }

    /**
     * 獲取任務隊列
     */
    protected LinkedBlockingQueue<Request> getTaskQueue() {
        return queue;
    }

    /**
     * 獲取下一個處理器
     * @return
     */
    protected ITaskProcesser getNextProcesser() {
        return nextProcesser;
    }

    /**
     * 添加請求到任務隊列
     * @param request
     */
    private void addRequestToQueue(Request request){
        queue.add(request);
    }

    /**
     * 線程線程名稱
     */
    private void setThreadTaskName() {
        setName(String.format("%s_Thread_%s", getClass().getSimpleName(), getId()));
    }
}


4.編寫具體任務處理器

  • FirstThreadTaskProcesseride

    /**
     * <p>
     * FirstThreadTaskProcesser
     * </p>
     *
     * @author: kancy
     * @date: 2019/10/21 9:40
     **/
    public class FirstThreadTaskProcesser extends AbstractThreadTaskProcesser {
    
        public FirstThreadTaskProcesser(ITaskProcesser nextProcesser) {
            super(nextProcesser);
        }
    
        /**
         * 子類處理請求
         *
         * @param request
         */
        @Override
        protected boolean doRequest(Request request) {
            System.out.println(Thread.currentThread().getName() + " 開始處理!");
            return true;
        }
    }
  • PrintRequestThreadTaskProcesser性能

    /**
     * <p>
     * PrintRequestThreadTaskProcesser
     * </p>
     *
     * @author: kancy
     * @date: 2019/10/21 10:06
     **/
    public class PrintRequestThreadTaskProcesser extends AbstractThreadTaskProcesser {
    
        public PrintRequestThreadTaskProcesser(ITaskProcesser nextProcesser) {
            super(nextProcesser);
        }
    
        /**
         * 子類處理請求
         *
         * @param request
         */
        @Override
        protected boolean doRequest(Request request) {
            System.out.println("request: " + request);
            return true;
        }
    }
  • BusinessThreadTaskProcesser測試

    /**
     * <p>
     * BusinessThreadTaskProcesser
     * 針對耗時比較久的邏輯,能夠用線程池多線程來優化性能
     * </p>
     *
     * @author: kancy
     * @date: 2019/10/21 12:28
     **/
    public class BusinessThreadTaskProcesser implements ITaskProcesser {
        private final ExecutorService executorService = Executors.newFixedThreadPool(10);
    
        private final ITaskProcesser nextProcesser;
    
        public BusinessThreadTaskProcesser(ITaskProcesser nextProcesser) {
            this.nextProcesser = nextProcesser;
        }
    
        @Override
        public void process(Request request) {
            BusinessRequestHandler handler = new BusinessRequestHandler(request);
            executorService.submit(handler);
            // callNextProcesser(request);
        }
    
        class BusinessRequestHandler implements Runnable{
            private Request request;
    
            public BusinessRequestHandler(Request request) {
                this.request = request;
            }
    
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName() + " 處理成功!");
                    callNextProcesser(request);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void callNextProcesser(Request request) {
            if (nextProcesser != null){
                nextProcesser.process(request);
            }
        }
    
    }
  • LastThreadTaskProcesser優化

    /**
     * <p>
     * FirstThreadTaskProcesser
     * </p>
     *
     * @author: kancy
     * @date: 2019/10/21 9:40
     **/
    public class LastThreadTaskProcesser extends AbstractThreadTaskProcesser {
    
        /**
         * 子類處理請求
         *
         * @param request
         */
        @Override
        protected boolean doRequest(Request request) {
            System.out.println(Thread.currentThread().getName() + " 處理完成!");
            return true;
        }
    }


5.編寫測試類

import java.util.concurrent.TimeUnit;

/**
 * <p>
 * ChainTests
 * </p>
 *
 * @author: kancy
 * @date: 2019/10/21 9:39
 **/
public class ChainTests {

    public static void main(String[] args) {
        ITaskProcesser lastThreadTaskProcesser = new LastThreadTaskProcesser();
        BusinessThreadTaskProcesser businessThreadTaskProcesser = new BusinessThreadTaskProcesser(lastThreadTaskProcesser);
        ITaskProcesser printRequestThreadTaskProcesser = new PrintRequestThreadTaskProcesser(businessThreadTaskProcesser);
        ITaskProcesser firstThreadTaskProcesser = new FirstThreadTaskProcesser(printRequestThreadTaskProcesser);


        Thread r1 = new CreateRequestThread(firstThreadTaskProcesser);
        r1.start();
        Thread r2 = new CreateRequestThread(firstThreadTaskProcesser);
        r2.start();
        Thread r3 = new CreateRequestThread(firstThreadTaskProcesser);
        r3.start();

    }

    static class CreateRequestThread extends Thread {
        ITaskProcesser iTaskProcesser;

        public CreateRequestThread(ITaskProcesser iTaskProcesser) {
            this.iTaskProcesser = iTaskProcesser;
        }
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    iTaskProcesser.process(new Request());
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

注:本文只作拋磚引玉,在實際開發過程當中,能夠自行擴展。this

相關文章
相關標籤/搜索