封裝一個阻塞隊列,輕鬆實現排隊執行任務功能!

前言

我的以爲隊列的使用在項目開發中挺多地方能夠用到的,因此將如何封裝一個隊列的過程記錄下來,整體來講難度並不大,但畢竟能力有限,若是各位有好的建議或意見歡迎提出來,若是本文能幫到你的話,記得點贊哦。bash

需求背景

在項目開發中,會常常遇到一些須要排隊執行的功能,好比發動態時上傳多張圖片,須要一張一張的上傳,好比直播間動畫連需發送或者收到消息須要展現時,須要一個一個動畫去展現等等場景,這時候會容易想到用隊列去實現,可是我想很多小夥伴會直接弄一個 list,存着要執行的任務,而後經過遞歸的方式去遍歷列表實現,相似下面代碼:ide

//存着一些執行任務的列表
List<String> list = new ArrayList<>();
//執行任務
private void doTask() {
    if (list.size() > 0) {
        String task = list.get(0);
        doSoming(task);
        list.remove(0);
        doTask();
    }
}
複製代碼

首先這種方式實現是能夠完成所須要的功能的,面對一些簡單的場景來講比較容易想到並且實現也簡單。可是面對複雜一點的場景卻有着很多缺點,特別是在一些祖傳項目裏面,誰也說不許代碼有多亂,到時候維護是很困難的(這個本人已在實際項目中深有體會)。因此封裝一個隊列去完成這種功能,是一個比較實用並且必須的手段。函數

首先看看要封裝的隊列須要有什麼功能:

  1. 在實際中,咱們執行的任務大概能夠分兩種,一個是有明確的執行時間的,好比,要連續顯示10個動畫,每一個展現5秒這種。一個是沒明確的執行時間的,好比連續上傳10張圖片,每一個上傳任務的完成時間是須要等到上傳成功回調回來才知道的這種。因此隊列第一個功能是每一個任務均可以兼容這兩種狀況,並且固然是一個執行完再執行下一個,排隊執行。
  2. 既然要排隊執行,固然會有優先級之分,因此每一個任務都能設置優先級,隊列能夠根據優先級去排隊執行任務。

至於隊列選哪個,我這裏選擇的是 PriorityBlockingQueue(阻塞優先級隊列),這個隊列的特色是存儲的對象必須是實現Comparable接口,並且它是阻塞隊列,其餘特色或者不瞭解的同窗能夠自行去了解,接下來都是對它的封裝。oop

1. 定義一個枚舉類TaskPriority,定義任務的優先級。

public enum TaskPriority {
    LOW, //低
    DEFAULT,//普通
    HIGH, //高
}
複製代碼

優先級分爲3種,如註釋所示,他們的關係:LOW<DEFAULT<HIGH動畫

2.隊列任務執行時間肯定和不肯定兩種狀況的實現策略

  1. 針對任務執行時間肯定的這種狀況,比較簡單,能夠給這個任務設置一個時間 duration,等任務開始執行時,便開始阻塞等待,等到時間到達時,才放開執行下一個任務。
  2. 針對任務執行時間不肯定的狀況,咱們則須要在任務執行完成的回調裏面手動放開,因此這裏打算再用一個PriorityBlockingQueue隊列,由於它有這樣的一個特色:若是隊列是空的,調用它的 take()方法時,它就會一直阻塞在那裏,當列表不爲空時,這個方法就不會阻塞。 因此當任務開始執行時,調用take()方法,等到咱們的完成回調來時,再手動給它add一個值,阻塞就放開,再執行下一個任務。

3.肯定了任務兩種狀況的實現策略後,接下來定義一個接口,定義一下每一個任務須要作什麼事情

public interface ITask extends Comparable<ITask> {
    //將該任務插入隊列
    void enqueue();

    //執行具體任務的方法
    void doTask();

    //任務執行完成後的回調方法
    void finishTask();

    //設置任務優先級
    ITask setPriority(TaskPriority mTaskPriority);

    //獲取任務優先級
    TaskPriority getPriority();

    //當優先級相同 按照插入順序 先入先出 該方法用來標記插入順序
    void setSequence(int mSequence);

    //獲取入隊次序
    int getSequence();

    //每一個任務的狀態,就是標記完成和未完成
    boolean getStatus();
    
    //設置每一個任務的執行時間,該方法用於任務執行時間肯定的狀況
    ITask setDuration(int duration);
    
    //獲取每一個任務執行的時間
    int getDuration();

    //阻塞任務執行,該方法用於任務執行時間不肯定的狀況
    void blockTask() throws Exception;

    //解除阻塞任務,該方法用於任務執行時間不肯定的狀況
    void unLockBlock();
}
複製代碼

接口基本定義了一些基本須要的方法,由於PriorityBlockingQueue的特色,因此接口繼承了Comparable,用於實現優先級排隊功能。具體方法功能請看註釋。ui

4.封裝一下PriorityBlockingQueue的基本功能

public class BlockTaskQueue {
    private String TAG = "BlockTaskQueue";
    private AtomicInteger mAtomicInteger = new AtomicInteger();
    //阻塞隊列
    private final BlockingQueue<ITask> mTaskQueue = new PriorityBlockingQueue<>();

    private BlockTaskQueue() {
    }
    //單例模式
    private static class BlockTaskQueueHolder {
        private final static BlockTaskQueue INSTANCE = new BlockTaskQueue();
    }

    public static BlockTaskQueue getInstance() {
        return BlockTaskQueueHolder.INSTANCE;
    }

    /**
     * 插入時 由於每個Task都實現了comparable接口 因此隊列會按照Task複寫的compare()方法定義的優先級次序進行插入
     * 當優先級相同時,使用AtomicInteger原子類自增 來爲每個task 設置sequence,
     * sequence的做用是標記兩個相同優先級的任務入隊的次序
     */
    public <T extends ITask> int add(T task) {
        if (!mTaskQueue.contains(task)) {
            task.setSequence(mAtomicInteger.incrementAndGet());
            mTaskQueue.add(task);
            Log.d(TAG, "\n add task " + task.toString());
        }
        return mTaskQueue.size();
    }

    public <T extends ITask> void remove(T task) {
        if (mTaskQueue.contains(task)) {
            Log.d(TAG, "\n" + "task has been finished. remove it from task queue");
            mTaskQueue.remove(task);
        }
        if (mTaskQueue.size() == 0) {
            mAtomicInteger.set(0);
        }
    }

    public ITask poll() {
        return mTaskQueue.poll();
    }

    public ITask take() throws InterruptedException {
        return mTaskQueue.take();
    }

    public void clear() {
        mTaskQueue.clear();
    }

    public int size() {
        return mTaskQueue.size();
    }
}
複製代碼

這裏是一個簡單的封裝,具體解釋請看註釋。this

5. 寫一個類記錄下當前執行的任務信息,方便獲取時使用

public class CurrentRunningTask {
    private static ITask sCurrentShowingTask;

    public static void setCurrentShowingTask(ITask task) {
        sCurrentShowingTask = task;
    }

    public static void removeCurrentShowingTask() {
        sCurrentShowingTask = null;
    }

    public static ITask getCurrentShowingTask() {
        return sCurrentShowingTask;
    }

    public static boolean getCurrentShowingStatus() {
        return sCurrentShowingTask != null && sCurrentShowingTask.getStatus();
    }
}
複製代碼

有時候要獲取一些正在執行的任務的信息,因此這裏弄類一個類來將正在執行的任務存儲起來。spa

6. 基礎須要的東西都寫好後,下面開始封裝一個基礎的任務類了

public class BaseTask implements ITask {
    private final String TAG = getClass().getSimpleName();
    private TaskPriority mTaskPriority = TaskPriority.DEFAULT; //默認優先級
    private int mSequence;// 入隊次序
    private Boolean mTaskStatus = false; // 標誌任務狀態,是否仍在展現
    protected WeakReference<BlockTaskQueue> taskQueue;//阻塞隊列
    protected int duration = 0; //任務執行時間
    //此隊列用來實現任務時間不肯定的隊列阻塞功能
    private PriorityBlockingQueue<Integer> blockQueue;
    //構造函數
    public BaseTask() {
        taskQueue = new WeakReference<>(BlockTaskQueue.getInstance());
        blockQueue = new PriorityBlockingQueue<>();
    }
    //入隊實現
    @Override
    public void enqueue() {
        TaskScheduler.getInstance().enqueue(this);
    }
    //執行任務方法,此時標記爲設爲true,而且將當前任務記錄下來
    @Override
    public void doTask() {
        mTaskStatus = true;
        CurrentRunningTask.setCurrentShowingTask(this);
    }
    //任務執行完成,改變標記位,將任務在隊列中移除,而且把記錄清除
    @Override
    public void finishTask() {
        this.mTaskStatus = false;
        this.taskQueue.get().remove(this);
        CurrentRunningTask.removeCurrentShowingTask();
        Log.d(TAG, taskQueue.get().size() + "");
    }
    //設置任務優先級實現
    @Override
    public ITask setPriority(TaskPriority mTaskPriority) {
        this.mTaskPriority = mTaskPriority;
        return this;
    }
    //設置任務執行時間
    public ITask setDuration(int duration) {
        this.duration = duration;
        return this;
    }
    //獲取任務優先級
    @Override
    public TaskPriority getPriority() {
        return mTaskPriority;
    }
    //獲取任務執行時間
    @Override
    public int getDuration() {
        return duration;
    }
    //設置任務次序
    @Override
    public void setSequence(int mSequence) {
        this.mSequence = mSequence;
    }
    //獲取任務次序
    @Override
    public int getSequence() {
        return mSequence;
    }
    獲取任務狀態
    @Override
    public boolean getStatus() {
        return mTaskStatus;
    }
    //阻塞任務執行
    @Override
    public void blockTask() throws Exception {
        blockQueue.take(); //若是隊列裏面沒數據,就會一直阻塞
    }
    //解除阻塞
     @Override
     public void unLockBlock() {
        blockQueue.add(1); //往裏面隨便添加一個數據,阻塞就會解除
     }

    /**
     * 排隊實現
     * 優先級的標準以下:
     * TaskPriority.LOW < TaskPriority.DEFAULT < TaskPriority.HIGH
     * 當優先級相同 按照插入次序排隊
     */
    @Override
    public int compareTo(ITask another) {
        final TaskPriority me = this.getPriority();
        final TaskPriority it = another.getPriority();
        return me == it ? this.getSequence() - another.getSequence() :
                it.ordinal() - me.ordinal();
    }
    //輸出一些信息
    @Override
    public String toString() {
        return "task name : " + getClass().getSimpleName() + " sequence : " + mSequence + " TaskPriority : " + mTaskPriority;
    }
}
複製代碼

代碼並不難,看註釋應該都會懂,這裏看下入隊方法,並無直接 add 到 taskQueue 裏面,並且經過 TaskScheduler 的 enqueue 方法入隊,TaskScheduler 是一個任務調度類,裏面封裝了入隊以及對隊列操做的一些功能,下面看看它是如何實現的。code

7. TaskScheduler

public class TaskScheduler {
    private final String TAG = "TaskScheduler";
    private BlockTaskQueue mTaskQueue = BlockTaskQueue.getInstance();
    private ShowTaskExecutor mExecutor;

    private static class ShowDurationHolder {
        private final static TaskScheduler INSTANCE = new TaskScheduler();
    }

    private TaskScheduler() {
        initExecutor();
    }

    private void initExecutor() {
        mExecutor = new ShowTaskExecutor(mTaskQueue);
        mExecutor.start();
    }

    public static TaskScheduler getInstance() {
        return ShowDurationHolder.INSTANCE;
    }

     public void enqueue(ITask task) {
        //由於TaskScheduler這裏寫成單例,若是isRunning改爲false的話,不判斷一下,就會一直都是false
        if (!mExecutor.isRunning()) {
            mExecutor.startRunning();
        }
        //按照優先級插入隊列 依次播放
        mTaskQueue.add(task);
    }

    public void resetExecutor() {
        mExecutor.resetExecutor();
    }

    public void clearExecutor() {
        mExecutor.clearExecutor();
    }
}
複製代碼

ShowTaskExecutor是一個任務排隊執行器,裏面主要是一個死循環,不斷的在隊列裏面取出任務,而且執行任務,下面看看它的實現。cdn

8.ShowTaskExecutor

public class ShowTaskExecutor {
    private final String TAG = "ShowTaskExecutor";
    private BlockTaskQueue taskQueue;
    private TaskHandler mTaskHandler;
    private boolean isRunning = true;
    private static final int MSG_EVENT_DO = 0;
    private static final int MSG_EVENT_FINISH = 1;

    public ShowTaskExecutor(BlockTaskQueue taskQueue) {
        this.taskQueue = taskQueue;
        mTaskHandler = new TaskHandler();
    }
    //開始遍歷任務隊列
    public void start() {
        AsyncTask.THREAD_POOL_EXECUTOR.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    while (isRunning) { //死循環
                        ITask iTask;
                        iTask = taskQueue.take(); //取任務
                        if (iTask != null) {
                            //執行任務
                            TaskEvent doEvent = new TaskEvent();
                            doEvent.setTask(iTask);
                            doEvent.setEventType(TaskEvent.EventType.DO);
                            mTaskHandler.obtainMessage(MSG_EVENT_DO, doEvent).sendToTarget();
                            //一直阻塞,直到任務執行完
                            if (iTask.getDuration()!=0) {
                                TimeUnit.MICROSECONDS.sleep(iTask.getDuration());
                            }else {
                                iTask.blockTask();
                            }
                            //完成任務
                            TaskEvent finishEvent = new TaskEvent();
                            finishEvent.setTask(iTask);
                            finishEvent.setEventType(TaskEvent.EventType.FINISH);
                            mTaskHandler.obtainMessage(MSG_EVENT_FINISH, finishEvent).sendToTarget();
                        }
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        });
    }
    //根據不一樣的消息回調不一樣的方法。
    private static class TaskHandler extends Handler {
        TaskHandler() {
            super(Looper.getMainLooper());
        }

        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            TaskEvent taskEvent = (TaskEvent) msg.obj;
            if (msg.what == MSG_EVENT_DO && taskEvent.getEventType() == TaskEvent.EventType.DO) {
                taskEvent.getTask().doTask();
            }
            if (msg.what == MSG_EVENT_FINISH && taskEvent.getEventType() == TaskEvent.EventType.FINISH) {
                taskEvent.getTask().finishTask();
            }
        }
    }

    public void startRunning() {
        isRunning = true;
    }

    public void pauseRunning() {
        isRunning = false;
    }

    public boolean isRunning() {
        return isRunning;
    }

    public void resetExecutor() {
        isRunning = true;
        taskQueue.clear();
    }

    public void clearExecutor() {
        pauseRunning();
        taskQueue.clear();
    }
}
複製代碼
public class TaskEvent {
    private WeakReference<ITask> mTask;
    int mEventType;

    public ITask getTask() {
        return mTask.get();
    }

    public void setTask(ITask mTask) {
        this.mTask = new WeakReference<>(mTask);
    }

    public int getEventType() {
        return mEventType;
    }

    public void setEventType(int mEventType) {
        this.mEventType = mEventType;
    }

    public static class EventType {
        public static final int DO = 0X00;
        public static final int FINISH = 0X01;
    }
}
複製代碼

好了整個隊列已經封裝好了,下面看看如何使用吧:

使用方法

  1. 定義一個Task,繼承 BaseTask,並實現對應的方法
public class LogTask extends BaseTask {
    String name;

    public LogTask(String name) {
        this.name = name;
    }

    //執行任務方法,在這裏實現你的任務具體內容
    @Override
    public void doTask() {
        super.doTask();
        Log.i("LogTask", "--doTask-" + name);
        
        //若是這個Task的執行時間是不肯定的,好比上傳圖片,那麼在上傳成功後須要手動調用
        //unLockBlock方法解除阻塞,例如:
        uploadImage(new UploadListener{
           void onSuccess(){
                unLockBlock();
            }
        });
    }

    //任務執行完的回調,在這裏你能夠作些釋放資源或者埋點之類的操做
    @Override
    public void finishTask() {
        super.finishTask();
        Log.i("LogTask", "--finishTask-" + name);
    }
}
複製代碼
  1. 而後依次入隊使用
findViewById(R.id.btn1).setOnClickListener(new View.OnClickListener() {
    @Override
    public void onClick(View v) {
        new LogTask("DEFAULT")
                .setDuration(5000) //設置了時間,表明這個任務時間是肯定的,若是不肯定,則不用設置
                .setPriority(TaskPriority.DEFAULT) //設置優先級,默認是DEFAULT
                .enqueue(); //入隊
    }
});
findViewById(R.id.btn2).setOnClickListener(new View.OnClickListener() {
    @Override
    public void onClick(View v) {
        new LogTask("LOW")
                .setDuration(4000)
                .setPriority(TaskPriority.LOW)
                .enqueue();
    }
});
findViewById(R.id.btn3).setOnClickListener(new View.OnClickListener() {
    @Override
    public void onClick(View v) {
        new LogTask("HIGH")
                .setDuration(3000)
                .setPriority(TaskPriority.HIGH)
                .enqueue();
    }
});
複製代碼

就這樣就能夠了,是否是很簡單,隨便依次點擊按鈕多下,你會發現任務都在根據優先級排隊執行了。

獲取當前運行任務的方法:

LogTask task = (LogTask) CurrentRunningTask.getCurrentShowingTask();
複製代碼
相關文章
相關標籤/搜索