AsyncTask源碼解析

前言

使用AsyncTask可以很容易的實如今子線程執行耗時操做,而後在主線程中更新進度,任務完成後能在主線程中收到結果,其提供瞭如下幾個主要方,咱們先從一個示例開始java

  • onPreExecute 在子線程執行任務前被調用,主線程調用
  • doInBackground 在線程池中執行,子線程調用
  • onPostExecute 在子線程執行完畢後調用,若是Task被取消了將不會被調用
  • publishProgress 用於在doInBackground中調用觸發onProgressUpdate,通常子線程調用
  • onProgressUpdate 由publishProgress觸發

示例

假設應用程序須要進行更新,須要下載新版本APK,在下載過程當中須要實時更新進度,這時咱們就能夠使用AsyncTask,代碼以下,注意:須要在清單文件中加上網絡和寫文件權限android

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        val pb = findViewById<ProgressBar>(R.id.pb)
        DownloadTask(pb).execute("http://down.360safe.com/360mse/360mse_nh00002.apk")
    }
}
class DownloadTask(pb: ProgressBar): AsyncTask<String, Int, Unit>() {
    private val mPb = WeakReference(pb)
    private val mTag = "download"
    override fun doInBackground(vararg params: String?) {
        val url = URL(params[0])
        val path = "${Environment.getExternalStorageDirectory()}/360safe.apk"
        val conn = url.openConnection() as HttpURLConnection
        if (conn.responseCode.toString().startsWith("20")) {
            var inputStream: InputStream? = null
            var outputStream: OutputStream? = null
            try {
                inputStream = conn.inputStream
                outputStream = BufferedOutputStream(FileOutputStream(File(path)))
                val totalSize = conn.contentLength
                val size = 1024
                val buffer = ByteArray(size)
                var downloadedSize = 0
                var length = inputStream.read(buffer)
                while (length != -1) {
                    downloadedSize += size
                    outputStream.write(buffer, 0, length)
                    onProgressUpdate(totalSize, downloadedSize)
                    length = inputStream.read(buffer)
                }
            } catch (e: Exception) {
                e.printStackTrace()
            } finally {
                inputStream?.close()
                outputStream?.close()
            }
        }
    }
    override fun onProgressUpdate(vararg values: Int?) {
        mPb.get()?.max = values[0]!!
        mPb.get()?.progress = values[1]!!
    }
    override fun onPostExecute(result: Unit?) {
        Log.d(mTag, "下載完成")
    }
}
複製代碼

這樣就能在子線程下載APK,而後實時在主線程更新進度,下面看看AsyncTask的源碼bash

源碼

首先來看看AsyncTask的構造方法網絡

public AsyncTask() {
    this((Looper) null);
}
@hide
public AsyncTask(@Nullable Handler handler) {
    this(handler != null ? handler.getLooper() : null);
}
@hide
public AsyncTask(@Nullable Looper callbackLooper) {
    mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
        ? getMainHandler()
        : new Handler(callbackLooper);
    mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            mTaskInvoked.set(true);
            Result result = null;
            try {
                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                result = doInBackground(mParams);
                Binder.flushPendingCommands();
            } catch (Throwable tr) {
                mCancelled.set(true);
                throw tr;
            } finally {
                postResult(result);
            }
            return result;
        }
    };
    mFuture = new FutureTask<Result>(mWorker) {
        @Override
        protected void done() {
            try {
                postResultIfNotInvoked(get());
            } catch (InterruptedException e) {
                android.util.Log.w(LOG_TAG, e);
            } catch (ExecutionException e) {
                throw new RuntimeException("An error occurred while executing doInBackground()",
                        e.getCause());
            } catch (CancellationException e) {
                postResultIfNotInvoked(null);
            }
        }
    };
}
複製代碼

能夠看出雖然有三個構造器可是咱們只能使用無參構造方法,其它兩個構造方法都是隱藏的,其實參數不管是Handler也好Looper也好,最終只是想要一個Looper作爲內部Handler的入參,這裏因爲callbackLooper爲null,因此會調用getMainHandler並將結果賦值給mHandler,而後新建了mWorkermFuture,先來看看getMainLooperide

private static Handler getMainHandler() {
    synchronized (AsyncTask.class) {
        if (sHandler == null) {
            sHandler = new InternalHandler(Looper.getMainLooper());
        }
        return sHandler;
    }
}
複製代碼

繼續看看InternalHandleroop

private static class InternalHandler extends Handler {
    public InternalHandler(Looper looper) {
        super(looper);
    }
    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
    @Override
    public void handleMessage(Message msg) {
        AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
        switch (msg.what) {
            case MESSAGE_POST_RESULT:
                // There is only one result
                result.mTask.finish(result.mData[0]);
                break;
            case MESSAGE_POST_PROGRESS:
                result.mTask.onProgressUpdate(result.mData);
                break;
        }
    }
}
複製代碼

這裏能夠看出當進度改變或者任務結束的時候都會發送消息過來在這裏回調,而後看看AsyncTask的execute方法post

public final AsyncTask<Params, Progress, Result> execute(Params... params) {
    return executeOnExecutor(sDefaultExecutor, params);
}
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;
    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }
    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}
複製代碼

又調用了executeOnExecutor將params和一個SerialExecutor實例傳入ui

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec, Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }
    mStatus = Status.RUNNING;
    onPreExecute();
    mWorker.mParams = params;
    exec.execute(mFuture);
    return this;
}
複製代碼

該方法主要作了如下幾件事情this

  1. 判斷了當前Task的狀態若是正在運行或者已經運行結束了就直接拋出異常
  2. 將當前Task的狀態改成運行中,調用onPreExecute該方法是一個空方法交由子類實現能夠在執行任務以前經過該方法作一些操做,
  3. 將參數賦值給mWorker.mParams
  4. mFuture丟給SerialExecutor進行執行
  5. 返回當前AsyncTask實例

接下來看看SerialExecutor.execute作了什麼url

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;
    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }
    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}
複製代碼

首先往mTasks裏面添加了一個Runnable實例,而後判斷當前是否有任務在執行,若是沒有就調用scheduleNext執行任務,該方法會從隊列中取出第一個Runnable實例丟給THREAD_POOL_EXECUTOR進行執行,而當一個Runnable執行完畢後又會調用scheduleNext執行下一個Runnable,因此說同一時間只會有一個Runnable執行,下面來看看THREAD_POOL_EXECUTOR

public static final Executor THREAD_POOL_EXECUTOR;
static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);
    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
        new LinkedBlockingQueue<Runnable>(128);
複製代碼

咱們看到THREAD_POOL_EXECUTOR是一個核心線程爲2-4,最大線程數爲處理器數*2+1,線程超時時間30秒,容許核心線程超時的線程池,那麼咱們有個疑問剛纔不是說是串行執行Runnable的嗎?既然是一個個執行的搞個SingleThreadExecutor不就好了?答案是若是咱們調用execute方法確實是串行執行的,可是咱們也能夠直接調用executeOnExecutor方法將THREAD_POOL_EXECUTOR當作入參這樣咱們的Task就能夠並行執行的了,下面代碼調用到了mFuture.run,咱們直接來看看FutureTask的構造方法

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
複製代碼

這裏的callable其實就是在AsyncTask的構造方法中建立的mWorker,這裏將其賦值給了callable變量,而後將狀態變動爲NEW,而後來看看其run方法

public void run() {
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
複製代碼

首先判斷當前狀態是否是NEW不是就直接返回,而後調用mWorker.call,若是執行成功就調用set設置結果,執行失敗就調用setException設置異常,先來看看mWorker.call

public Result call() throws Exception {
    mTaskInvoked.set(true);
    Result result = null;
    try {
        Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
        result = doInBackground(mParams);
        Binder.flushPendingCommands();
    } catch (Throwable tr) {
        mCancelled.set(true);
        throw tr;
    } finally {
        postResult(result);
    }
    return result;
}
複製代碼

該方法先將mTaskInvoked設置爲true,代表該Task已經獲得執行,而後設置優先級爲後臺線程,再而後調用doInBackground獲取到結果,若是執行失敗會將mCancelled設置爲true,而後再拋出異常,最後調用postResult

private Result postResult(Result result) {
    @SuppressWarnings("unchecked")
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}
複製代碼

該方法只是發了一個消息,根據前面講到的InternalHandler的處理邏輯會調用finish

private void finish(Result result) {
    if (isCancelled()) {
        onCancelled(result);
    } else {
        onPostExecute(result);
    }
    mStatus = Status.FINISHED;
}
複製代碼

這裏判斷若是任務執行失敗就調用onCancelled執行成功就調用onPostExecute,最後將狀態置爲FINISHED,咱們再回到FutureTask的run方法,還沒分析執行call方法拋出異常的狀況來看看setException

protected void setException(Throwable t) {
    if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
        outcome = t;
        U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
複製代碼

判斷狀態是不是NEW或者COMPLETING是的話就把當前的狀態改爲EXCEPTIONAL而後調用finishCompletion

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (U.compareAndSwapObject(this, WAITERS, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}
複製代碼

該方法將阻塞隊列狀況而後調用done方法其實一個空實現,mFuture實現了它咱們來看看

mFuture = new FutureTask<Result>(mWorker) {
    @Override
    protected void done() {
        try {
            postResultIfNotInvoked(get());
        } catch (InterruptedException e) {
            android.util.Log.w(LOG_TAG, e);
        } catch (ExecutionException e) {
            throw new RuntimeException("An error occurred while executing doInBackground()",
                    e.getCause());
        } catch (CancellationException e) {
            postResultIfNotInvoked(null);
        }
    }
};
private void postResultIfNotInvoked(Result result) {
    final boolean wasTaskInvoked = mTaskInvoked.get();
    if (!wasTaskInvoked) {
        postResult(result);
    }
}
複製代碼

若是Task還沒獲得執行就被取消掉了那麼會調用postResult,到此爲止基本流程已經走完了,可是還有一個方法在這個流程中沒講到那就是publishProgress

protected final void publishProgress(Progress... values) {
    if (!isCancelled()) {
        getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                new AsyncTaskResult<Progress>(this, values)).sendToTarget();
    }
}
public void handleMessage(Message msg) {
    AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
    switch (msg.what) {
        case MESSAGE_POST_RESULT:
            // There is only one result
            result.mTask.finish(result.mData[0]);
            break;
        case MESSAGE_POST_PROGRESS:
            result.mTask.onProgressUpdate(result.mData);
            break;
    }
}
複製代碼

根據InternalHandler咱們能夠看出實際上是回調了onPregressUpdate,至此所有流程都走完了,來總結一下

總結

  • AsyncTask內部切換線程的原理也是線程池加Handler
  • 默認狀況下AsyncTask是串行執行的,也就是說假設你同時開啓三個AsyncTask,它們會一個個執行,而不會同時執行要想作到並行執行須要手動調用executeOnExecutor
相關文章
相關標籤/搜索