這段時間看了看工做室的工具庫的下載組件,發現其存在一些問題:git
1.下載核心邏輯有 bug,在暫停下載或下載失敗等狀況時有機率沒法順利完成下載。
2.雖然原來的設計是採用多線程斷點續傳的設計,但打了一下日誌發現其實下載任務都是在同一個線程下串行執行,並無起到加快下載速度的做用。
考慮到原來的代碼並不複雜,所以對這部分下載組件進行了重寫。這裏記錄一下里面的多線程斷點續傳功能的實現。
github
請查看完整的PDF版
(更多完整項目下載。未完待續。源碼。圖文知識後續上傳github。)
能夠點擊關於我聯繫我獲取完整PDF
(VX:mm14525201314)數據庫
首先咱們談一談,多線程下載的意義。安全
在平常的場景下,網絡中不可能只有下載方與服務器之間這樣一條鏈接,爲了不在這樣的場景下的網絡擁塞,TCP 協議經過調節窗口的大小來避免出現擁塞,但這個窗口的大小可能沒辦法達到咱們預期的效果:充分利用咱們的帶寬。所以咱們能夠採用多個 TCP 鏈接的形式來提升咱們帶寬的利用率,從而加快下載速度。服務器
打個比喻就是咱們要從一個水缸中用抽水機經過水管抽水,因爲管子的直徑等等的限制,咱們單條管子沒法徹底利用咱們的抽水機的抽水動力。所以咱們就將這些抽水的任務分紅了多份,分攤到多個管子上,這樣就能夠更充分的利用咱們的抽水機動力,從而提升抽水的速度。網絡
所以,咱們使用多線程下載的主要意義就是——提升下載速度。多線程
前面提到了咱們主要的目的是將一個總的下載任務分攤到多個子任務中,好比假設咱們用 5 個線程下載這個文件,那麼咱們就能夠對一個長度爲 N 的任務進行以下圖的均分:
但真實場景下每每 N 都不是恰好爲 5 的倍數的,所以對於最後一個任務還須要加上剩餘的任務量,也就是 N/5+N%5。框架
上面的任務分配咱們已經瞭解了,看起來很理想,但有一個問題,咱們如何實現向服務器只請求這個文件的某一段而不是所有呢?dom
咱們能夠經過在請求頭中加入 Range 字段來指定請求的範圍,從而實現指定某一段的數據。ide
如:RANGE bytes=10000-19999
就指定了 10000-19999 這段字節的數據
因此咱們的核心思想就是經過它拿到文件對應字節段的 InputStream,而後對它讀取並寫入文件。
下面再講講文件寫入問題,因爲咱們是多線程下載,所以文件並非每次都是從前日後一個個字節寫入的,隨時可能在文件的任何一個地方寫入數據。所以咱們須要可以在文件的指定位置寫入數據。這裏咱們用到了RandomAccessFile
來實現這個功能。
RandomAccessFile
是一個隨機訪問文件類,同時整合了 FileOutputStream
和 FileInputStream
,支持從文件的任何字節處讀寫數據。經過它咱們就能夠在文件的任何字節處寫入數據。
接下來簡單講講咱們這裏是如何使用 RandomAccessFile
的。咱們對於每一個子任務來講都有一個開始和結束的位置。每一個任務均可以經過 RandomAccessFile::seek
跳轉到文件的對應字節位置,而後從該位置開始讀取 InputStream
並寫入。
這樣,就實現了不一樣線程對文件的隨機寫入。
因爲咱們在真正開始下載以前,咱們須要先將任務分配到各個線程,所以咱們須要先了解到文件的大小。
爲了獲取到文件的大小,咱們用到 Response Headers
中的 Content-Length
字段。
以下圖所示,能夠看到,打開該下載請求的連接後,Response Headers
中包含了咱們須要的 Content-Length
,也就是該文件的大小,單位是字節。
對於多個子任務,咱們如何實現它們的斷點續傳呢?
其實原理很簡單,只須要保證每一個子任務的下載進度可以被即時地記錄便可。這樣繼續下載時只須要讀取這些下載記錄,從上次下載結束的位置開始下載便可。
它的實現有不少方式,只要能作到數據持久化便可。這裏我使用的是數據庫來實現。
這樣,咱們的子任務須要擁有一些必要的信息
completedSize
:當前下載完成大小taskSize
:子任務總大小startPos
:子任務開始位置currentPos
:子任務進行到的位置endPos
:子任務結束位置經過這些信息,咱們就可以記錄子任務的下載進度從而恢復咱們以前的下載,實現斷點續傳。
下面咱們用代碼來實現這樣一個多線程下載功能。
首先,咱們定義一下下載中的各個狀態:
public class DownloadStatus { public static final int IDLE = 233; // 空閒,默認狀態 public static final int COMPLETED = 234; // 完成 public static final int DOWNLOADING = 235; // 下載中 public static final int PAUSE = 236; // 暫停 public static final int ERROR = 237; // 出錯 }
能夠看到,這裏定義瞭如上的五種狀態。
這裏須要用到如數據庫及 HTTP 請求的功能,咱們這裏定義其接口以下,具體實現各位能夠根據須要本身實現:
public interface DownloadDbHelper { /** * 從數據庫中刪除子任務記錄 * @param task 子任務記錄 */ void delete(SubDownloadTask task); /** * 向數據庫中插入子任務記錄 * @param task 子任務記錄 */ void insert(SubDownloadTask task); /** * 在數據庫中更新子任務記錄 * @param task 子任務記錄 */ void update(SubDownloadTask task); /** * 獲取全部指定Task下的子任務記錄 * @param taskTag Task的Tag * @return 子任務記錄 */ List<SubDownloadTask> queryByTaskTag(String taskTag); }
public interface DownloadHttpHelper { /** * 獲取文件總長度 * @param url 下載url * @param callback 獲取文件長度CallBack */ void getTotalSize(String url, NetCallback<Long> callback); /** * 獲取InputStream * @param url 下載url * @param start 開始位置 * @param end 結束位置 * @param callback 獲取字節流的CallBack */ void getStreamByRange(String url, long start, long end, NetCallback<InputStream> callback); }
咱們先從上到下,從子任務開始實現。在個人設計中,它具備以下的成員變量:
@Entity public class SubDownloadTask implements Runnable { public static final int BUFFER_SIZE = 1024 * 1024; private static final String TAG = SubDownloadTask.class.getSimpleName(); @Id private Long id; private String url; // 文件下載的 url private String taskTag; // 父任務的 Tag private long taskSize; // 子任務大小 private long completedSize; // 子任務完成大小 private long startPos; // 開始位置 private long currentPos; // 當前位置 private long endPos; // 結束位置 private volatile int status; // 當前下載狀態 @Transient private SubDownloadListener listener; // 子任務下載監聽,主要用於提示父任務 @Transient private File saveFile; // 要保存到的文件 ... }
因爲這裏的數據庫的操做是用 GreenDao
實現,所以這裏有一些相關注解,各位能夠忽略。
InputStream
獲取能夠看到,子任務是一個 Runnable,咱們能夠經過其 run 方法開始下載,這樣就能夠經過如 ExecutorService 來開啓多個線程執行子任務。
咱們看到其 run 方法:
@Override public void run() { status = DownloadStatus.DOWNLOADING; DownloadManager.getInstance() .getHttpHelper() .getStreamByRange(url, currentPos, endPos, new NetCallback<InputStream>() { @Override public void onResult(InputStream inputStream) { listener.onSubStart(); writeFile(inputStream); } @Override public void onError(String message) { listener.onSubError("文件流獲取失敗"); status = DownloadStatus.ERROR; } }); }
能夠看到,咱們獲取了其從 currentPos
到 endPos
端的字節流,經過其 Response Body 拿到了它的 InputStream
,而後調用了 writeFile(InputStream)
方法進行文件的寫入。
文件寫入
接下來看到 writeFile
方法:
private void writeFile(InputStream in) { try { RandomAccessFile file = new RandomAccessFile(saveFile, "rwd"); // 經過 saveFile 創建RandomAccessFile file.seek(currentPos); // 跳轉到對應位置 byte[] buffer = new byte[BUFFER_SIZE]; while (true) { // 循環讀取 InputStream,直到暫停或讀取結束 if (status != DownloadStatus.DOWNLOADING) { // 狀態不爲 DOWNLOADING,中止下載 break; } int offset = in.read(buffer, 0, BUFFER_SIZE); if (offset == -1) { // 讀取不到數據,說明讀取結束 break; } // 將讀取到的數據寫入文件 file.write(buffer, 0, offset); // 下載數據並在數據庫中更新 currentPos += offset; completedSize += offset; DownloadManager.getInstance() .getDbHelper() .update(this); // 通知父任務下載進度 listener.onSubDownloading(offset); } if(status == DownloadStatus.DOWNLOADING) { // 下載完成 status = DownloadStatus.COMPLETED; // 通知父任務下載完成 listener.onSubComplete(completedSize); } file.close(); in.close(); } catch (IOException e) { e.printStackTrace(); listener.onSubError("文件下載失敗"); status = DownloadStatus.ERROR; resetTask(); } }
具體流程能夠看代碼中的註釋。能夠看到,子任務實際上就是循環讀取 InputStream
,並寫入文件,同時將下載進度同步到數據庫。
父任務也就是咱們具體的下載任務,咱們一樣先看到成員變量:
public class DownloadTask implements SubDownloadListener { private static final String TAG = DownloadTask.class.getSimpleName(); private String tag; // 下載任務的 Tag,用於區分不一樣下載任務 private String url; // 下載 url private String savePath; // 保存路徑 private String fileName; // 保存文件名 private DownloadListener listener; // 下載監聽 private long completeSize; // 下載完成大小 private long totalSize; // 下載任務總大小 private int status; // 當前下載進度 private int threadNum; // 線程數(由外部設置的每一個任務的下載線程數) private File file; // 保存文件 private List<SubDownloadTask> subTasks; // 子任務列表 private ExecutorService mExecutorService; // 線程池,用於執行子任務 ... }
對於一個下載任務,能夠經過 download 方法開始執行:
public void download() { listener.onStart(); subTasks = querySubTasks(); status = DownloadStatus.DOWNLOADING; if (subTasks.isEmpty()) { // 是新任務 downloadNewTask(); } else if (subTasks.size() == threadNum) { // 不是新任務 downloadExistTask(); } else { // 不是新任務,但下載線程數有誤 listener.onError("斷點數據有誤"); resetTask(); } }
能夠看到,咱們先將子任務列表從數據庫中讀取出來。
downloadNewTask
方法。downloadExistTask
方法。咱們先看到 downloadNewTask
方法:
DownloadManager.getInstance() .getHttpHelper() .getTotalSize(url, new NetCallback<Long>() { @Override public void onResult(Long total) { completeSize = 0L; totalSize = total; initSubTasks(); startAsyncDownload(); } @Override public void onError(String message) { error("獲取文件長度失敗"); } });
能夠看到,獲取到總長度後,經過調用 initSubTasks
方法,對子任務列表進行了初始化(計算子任務長度等),而後調用了 startAsyncDownload
方法後經過 ExecutorService
運行子任務進入子任務進行下載。
咱們看到 initSubTasks
方法:
private void initSubTasks() { long averageSize = totalSize / threadNum; for (int taskIndex = 0; taskIndex < threadNum; taskIndex++) { long taskSize = averageSize; if (taskIndex == threadNum - 1) { // 最後一個任務,則 size 還須要加入剩餘量 taskSize += totalSize % threadNum; } long start = 0L; int index = taskIndex; while (index > 0) { start += subTasks.get(index - 1).getTaskSize(); index--; } long end = start + taskSize - 1; // 注意這裏 SubDownloadTask subTask = new SubDownloadTask(); subTask.setUrl(url); subTask.setStatus(DownloadStatus.IDLE); subTask.setTaskTag(tag); subTask.setCompletedSize(0); subTask.setTaskSize(taskSize); subTask.setStartPos(start); subTask.setCurrentPos(start); subTask.setEndPos(end); subTask.setSaveFile(file); subTask.setListener(this); DownloadManager.getInstance() .getDbHelper() .insert(subTask); subTasks.add(subTask); } }
能夠看到就是計算每一個任務的大小及開始及結束點的位置,這裏要注意的是 endPos 須要 -1,不然各個任務的下載位置會重疊,而且最後一個任務會多下載一個字節致使如文件損壞等影響。具體緣由就是好比一個大小爲 500 的文件,則應當是 0-499 而不是 0-500。
接下來咱們看看 downloadExistTask
方法:
private void downloadExistTask() { // 不是新任務,且下載線程數無誤,計算已下載大小 completeSize = countCompleteSize(); totalSize = countTotalSize(); startAsyncDownload(); }
這裏其實很簡單,遍歷子任務列表計算已下載量及總任務量,並調用 startAsyncDownload 開始多線程下載。
具體執行子任務咱們能夠看到 startAsyncDownload
方法:
private void startAsyncDownload() { for (SubDownloadTask subTask : subTasks) { if (subTask.getCompletedSize() < subTask.getTaskSize()) { // 只下載沒有下載結束的子任務 mExecutorService.execute(subTask); } } }
能夠看到,這裏其實只是經過 ExecutorService 執行對應子任務(Runnable)而已。
咱們接下來看到 pause 方法:
public void pause() { stopAsyncDownload(); status = DownloadStatus.PAUSE; listener.onPause(); }
能夠看到,這裏只是調用了 stopAsyncDownload
方法中止子任務。
看到 stopAsyncDownload
方法:
private void stopAsyncDownload() { for (SubDownloadTask subTask : subTasks) { if (subTask.getStatus() != DownloadStatus.COMPLETED) { // 下載完成的再也不取消 subTask.cancel(); } } }
能夠看到,調用了子任務的 cancel
方法。
繼續看到子任務的 cancel
方法:
void cancel() { status = DownloadStatus.PAUSE; listener.onSubCancel(); }
這裏很簡單,僅僅是將下載狀態設置爲了 PAUSE,這樣在寫入文件的下一次 while 循環時便會停止循環從而結束 Runnable
的執行。
看到 cancel
方法:
public void cancel() { stopAsyncDownload(); resetTask(); listener.onCancel(); }
能夠看到和暫停的邏輯差很少,只是在暫停後還須要對子任務重置從而使得下次下載從頭開始。
前面提到,外部能夠經過 DownloadListener
監聽下載的進度,下面是 DownloadListener
接口的定義:
public interface DownloadListener { default void onStart() {} default void onDownloading(long progress, long total) {} default void onPause() {} default void onCancel() {} default void onComplete() {} default void onError(String message) {} }
咱們實時的下載進度實際上是在子任務的保存文件過程當中才能體現出來的,一樣,子任務的下載失敗也須要通知到 DownloadListener
,這是怎麼作到的呢?
前面提到了,咱們還定義了一個 SubDownloadListener
,其監聽者就是子任務的父任務。經過監聽咱們能夠將子任務狀態反饋到父任務,父任務再根據具體狀況反饋數據給 DownloadListener
。
public interface SubDownloadListener { void onSubStart(); void onSubDownloading(int offset); void onSubCancel(); void onSubComplete(long completeSize); void onSubError(String message); }
好比以前看到,每次下載失敗咱們都會調用 onSubError
,每次讀取 offset 的數據都會調用 onSubDownload(offset)
,每一個任務下載失敗都會調用 onSubComplete(completeSize)
。這樣,咱們子任務的下載狀態就成功返回給了上層。
咱們接着看看上層是如何處理的:
@Override public void onSubStart() {} @Override public void onSubDownloading(int offset) { synchronized (this) { completeSize = completeSize + offset; listener.onDownloading(completeSize, totalSize); } } @Override public void onSubCancel() {} @Override public void onSubComplete(long completeSize) { checkComplete(); } @Override public void onSubError(String message) { error(message); }
能夠看到,每次下載到一段數據,它都會把數據量返回上來,此時 completeSize
就加上了對應的 offset,而後再將新的 completeSize
通知給監聽者,這樣就實現了下載進度的監聽。這裏之因此加鎖是由於會有多個線程(子任務線程)對 completeSize
進行操做,加鎖保證線程安全。
而每次有子任務完成,它都會調用 checkComplete
方法檢查是否下載完成,若每一個子任務都下載完成,則說明任務下載完成,而後通知監聽者。
一樣的,每次子任務出現錯誤,都會通知監聽者出現錯誤,並作一些錯誤狀況下的處理。
到這裏,這篇文章就結束了,咱們成功實現了多線程斷點續傳下載功能。基於這個原理,咱們能夠作一些上層的封裝實現一個文件下載框架。
請查看完整的PDF版
(更多完整項目下載。未完待續。源碼。圖文知識後續上傳github。)
能夠點擊關於我聯繫我獲取完整PDF
(VX:mm14525201314)