斷點續傳下載一直是移動開發中必不可少的一項重要的技術,一樣的RxJava
和Retrofit
的結合讓這個技術解決起來更加的靈活,咱們徹底能夠封裝一個適合自的下載框架,簡單並且安全!html
下載和以前的http
請求能夠相互獨立,因此咱們單獨給download
創建一個工程moudel
處理java
默認項目採用的GreenDao
數據庫管理數據,新版GreenDao
自動生成的炒做類的方式,因此若是缺失xxxDao
文件,檢查是否關聯正確的路徑git
根目錄build.gradle
github
classpath 'org.greenrobot:greendao-gradle-plugin:+'複製代碼
依賴:數據庫
apply plugin: 'org.greenrobot.greendao'
xxxxxxxxx
compile 'org.greenrobot:greendao:3.2.0'複製代碼
若是須要替換本身的數據庫框架字須要修改DbDownUtil
文件便可app
和之前同樣,先寫接口
注意:Streaming是判斷是否寫入內存的標示,若是小文件能夠考慮不寫,通常狀況必須寫;下載地址須要經過@url動態指定(不適固定的),@head標籤是指定下載的起始位置(斷點續傳的位置)dom
/*斷點續傳下載接口*/
@Streaming/*大文件須要加入這個判斷,防止下載過程當中寫入到內存中*/
@GET
Observable<ResponseBody> download(@Header("RANGE") String start, @Url String url);複製代碼
和以前的上傳封裝同樣,下載更加的須要進度,因此咱們一樣覆蓋ResponseBody類,寫入進度監聽回調ide
/** * 自定義進度的body * @author wzg */
public class DownloadResponseBody extends ResponseBody {
private ResponseBody responseBody;
private DownloadProgressListener progressListener;
private BufferedSource bufferedSource;
public DownloadResponseBody(ResponseBody responseBody, DownloadProgressListener progressListener) {
this.responseBody = responseBody;
this.progressListener = progressListener;
}
@Override
public BufferedSource source() {
if (bufferedSource == null) {
bufferedSource = Okio.buffer(source(responseBody.source()));
}
return bufferedSource;
}
private Source source(Source source) {
return new ForwardingSource(source) {
long totalBytesRead = 0L;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead = super.read(sink, byteCount);
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += bytesRead != -1 ? bytesRead : 0;
if (null != progressListener) {
progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
}
return bytesRead;
}
};
}
}複製代碼
/** * 成功回調處理 * Created by WZG on 2016/10/20. */
public interface DownloadProgressListener {
/** * 下載進度 * @param read * @param count * @param done */
void update(long read, long count, boolean done);
}複製代碼
複寫Interceptor,能夠將咱們的監聽回調經過okhttp的client方法addInterceptor自動加載咱們的監聽回調和ResponseBody
/** * 成功回調處理 * Created by WZG on 2016/10/20. */
public class DownloadInterceptor implements Interceptor {
private DownloadProgressListener listener;
public DownloadInterceptor(DownloadProgressListener listener) {
this.listener = listener;
}
@Override
public Response intercept(Chain chain) throws IOException {
Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body(new DownloadResponseBody(originalResponse.body(), listener))
.build();
}
}複製代碼
這個類中的數據可自由擴展,用戶本身選擇須要保持到數據庫中的數據,能夠自由選擇須要數據庫第三方框架,demo採用greenDao框架存儲數據
public class DownInfo {
/*存儲位置*/
private String savePath;
/*下載url*/
private String url;
/*基礎url*/
private String baseUrl;
/*文件總長度*/
private long countLength;
/*下載長度*/
private long readLength;
/*下載惟一的HttpService*/
private HttpService service;
/*回調監聽*/
private HttpProgressOnNextListener listener;
/*超時設置*/
private int DEFAULT_TIMEOUT = 6;
/*下載狀態*/
private DownState state;
}複製代碼
很簡單,和大多數封裝框架同樣
public enum DownState {
START,
DOWN,
PAUSE,
STOP,
ERROR,
FINISH,
}複製代碼
注意:這裏和DownloadProgressListener不一樣,這裏是下載這個過程當中的監聽回調,DownloadProgressListener只是進度的監聽
經過抽象類,能夠自由選擇須要覆蓋的類,不須要徹底覆蓋!更加靈活
/** * 下載過程當中的回調處理 * Created by WZG on 2016/10/20. */
public abstract class HttpProgressOnNextListener<T> {
/** * 成功後回調方法 * @param t */
public abstract void onNext(T t);
/** * 開始下載 */
public abstract void onStart();
/** * 完成下載 */
public abstract void onComplete();
/** * 下載進度 * @param readLength * @param countLength */
public abstract void updateProgress(long readLength, long countLength);
/** * 失敗或者錯誤方法 * 主動調用,更加靈活 * @param e */
public void onError(Throwable e){
}
/** * 暫停下載 */
public void onPuase(){
}
/** * 中止下載銷燬 */
public void onStop(){
}
}複製代碼
準備的工做作完,須要將回調和傳入回調的信息統一封裝到sub中,統一判斷;和封裝二的原理同樣,咱們經過自定義Subscriber來提早處理返回的數據,讓用戶字須要關係成功和失敗以及向關心的數據,避免重複多餘的代碼出如今處理類中
sub須要繼承DownloadProgressListener,和自帶的回調一塊兒組成咱們須要的回調結果
傳入DownInfo數據,經過回調設置DownInfo的不一樣狀態,保存狀態
經過RxAndroid將進度回調指定到主線程中(若是不須要進度最好去掉該處理避免主線程處理負擔)
update進度回調在斷點續傳使用時,須要手動判斷斷點後加載的長度,由於指定斷點下載長度下載後總長度=(物理長度-起始下載長度)
/** * 用於在Http請求開始時,自動顯示一個ProgressDialog * 在Http請求結束是,關閉ProgressDialog * 調用者本身對請求數據進行處理 * Created by WZG on 2016/7/16. */
public class ProgressDownSubscriber<T> extends Subscriber<T> implements DownloadProgressListener {
//弱引用結果回調
private WeakReference<HttpProgressOnNextListener> mSubscriberOnNextListener;
/*下載數據*/
private DownInfo downInfo;
public ProgressDownSubscriber(DownInfo downInfo) {
this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener());
this.downInfo=downInfo;
}
/** * 訂閱開始時調用 * 顯示ProgressDialog */
@Override
public void onStart() {
if(mSubscriberOnNextListener.get()!=null){
mSubscriberOnNextListener.get().onStart();
}
downInfo.setState(DownState.START);
}
/** * 完成,隱藏ProgressDialog */
@Override
public void onCompleted() {
if(mSubscriberOnNextListener.get()!=null){
mSubscriberOnNextListener.get().onComplete();
}
downInfo.setState(DownState.FINISH);
}
/** * 對錯誤進行統一處理 * 隱藏ProgressDialog * * @param e */
@Override
public void onError(Throwable e) {
/*中止下載*/
HttpDownManager.getInstance().stopDown(downInfo);
if(mSubscriberOnNextListener.get()!=null){
mSubscriberOnNextListener.get().onError(e);
}
downInfo.setState(DownState.ERROR);
}
/** * 將onNext方法中的返回結果交給Activity或Fragment本身處理 * * @param t 建立Subscriber時的泛型類型 */
@Override
public void onNext(T t) {
if (mSubscriberOnNextListener.get() != null) {
mSubscriberOnNextListener.get().onNext(t);
}
}
@Override
public void update(long read, long count, boolean done) {
if(downInfo.getCountLength()>count){
read=downInfo.getCountLength()-count+read;
}else{
downInfo.setCountLength(count);
}
downInfo.setReadLength(read);
if (mSubscriberOnNextListener.get() != null) {
/*接受進度消息,形成UI阻塞,若是不須要顯示進度可去掉實現邏輯,減小壓力*/
rx.Observable.just(read).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
/*若是暫停或者中止狀態延遲,不須要繼續發送回調,影響顯示*/
if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.STOP)return;
downInfo.setState(DownState.DOWN);
mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength());
}
});
}
}
}複製代碼
/** * 獲取單例 * @return */
public static HttpDownManager getInstance() {
if (INSTANCE == null) {
synchronized (HttpDownManager.class) {
if (INSTANCE == null) {
INSTANCE = new HttpDownManager();
}
}
}
return INSTANCE;
}複製代碼
/*回調sub隊列*/
private HashMap<String,ProgressDownSubscriber> subMap;
/*單利對象*/
private volatile static HttpDownManager INSTANCE;
private HttpDownManager(){
downInfos=new HashSet<>();
subMap=new HashMap<>();
}複製代碼
/** * 開始下載 */
public void startDown(DownInfo info){
/*正在下載不處理*/
if(info==null||subMap.get(info.getUrl())!=null){
return;
}
/*添加回調處理類*/
ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info);
/*記錄回調sub*/
subMap.put(info.getUrl(),subscriber);
/*獲取service,屢次請求公用一個sercie*/
HttpService httpService;
if(downInfos.contains(info)){
httpService=info.getService();
}else{
DownloadInterceptor interceptor = new DownloadInterceptor(subscriber);
OkHttpClient.Builder builder = new OkHttpClient.Builder();
//手動建立一個OkHttpClient並設置超時時間
builder.connectTimeout(info.getConnectionTime(), TimeUnit.SECONDS);
builder.addInterceptor(interceptor);
Retrofit retrofit = new Retrofit.Builder()
.client(builder.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(info.getBaseUrl())
.build();
httpService= retrofit.create(HttpService.class);
info.setService(httpService);
}
/*獲得rx對象-上一次下載的位置開始下載*/
httpService.download("bytes=" + info.getReadLength() + "-",info.getUrl())
/*指定線程*/
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
/*失敗後的retry配置*/
.retryWhen(new RetryWhenNetworkException())
/*讀取下載寫入文件*/
.map(new Func1<ResponseBody, DownInfo>() {
@Override
public DownInfo call(ResponseBody responseBody) {
try {
writeCache(responseBody,new File(info.getSavePath()),info);
} catch (IOException e) {
/*失敗拋出異常*/
throw new HttpTimeException(e.getMessage());
}
return info;
}
})
/*回調線程*/
.observeOn(AndroidSchedulers.mainThread())
/*數據回調*/
.subscribe(subscriber);
}複製代碼
注意:一開始調用進度回調是第一次寫入在進度回調以前,因此須要判斷一次DownInfo是否獲取到下載總長度,沒有這選擇當前ResponseBody 讀取長度爲總長度
/** * 寫入文件 * @param file * @param info * @throws IOException */
public void writeCache(ResponseBody responseBody,File file,DownInfo info) throws IOException{
if (!file.getParentFile().exists())
file.getParentFile().mkdirs();
long allLength;
if (info.getCountLength()==0){
allLength=responseBody.contentLength();
}else{
allLength=info.getCountLength();
}
FileChannel channelOut = null;
RandomAccessFile randomAccessFile = null;
randomAccessFile = new RandomAccessFile(file, "rwd");
channelOut = randomAccessFile.getChannel();
MappedByteBuffer mappedBuffer = channelOut.map(FileChannel.MapMode.READ_WRITE,
info.getReadLength(),allLength-info.getReadLength());
byte[] buffer = new byte[1024*8];
int len;
int record = 0;
while ((len = responseBody.byteStream().read(buffer)) != -1) {
mappedBuffer.put(buffer, 0, len);
record += len;
}
responseBody.byteStream().close();
if (channelOut != null) {
channelOut.close();
}
if (randomAccessFile != null) {
randomAccessFile.close();
}
}複製代碼
調用 subscriber.unsubscribe()解除監聽,而後remove記錄的下載數據和sub回調,而且設置下載狀態(同步數據庫本身添加)
/** * 中止下載 */
public void stopDown(DownInfo info){
if(info==null)return;
info.setState(DownState.STOP);
info.getListener().onStop();
if(subMap.containsKey(info.getUrl())) {
ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
subscriber.unsubscribe();
subMap.remove(info.getUrl());
}
/*保存數據庫信息和本地文件*/
db.save(info);
}複製代碼
原理和中止下載原理同樣
/** * 暫停下載 * @param info */
public void pause(DownInfo info){
if(info==null)return;
info.setState(DownState.PAUSE);
info.getListener().onPuase();
if(subMap.containsKey(info.getUrl())){
ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
subscriber.unsubscribe();
subMap.remove(info.getUrl());
}
/*這裏須要講info信息寫入到數據中,可自由擴展,用本身項目的數據庫*/
db.update(info);
}複製代碼
/** * 中止所有下載 */
public void stopAllDown(){
for (DownInfo downInfo : downInfos) {
stopDown(downInfo);
}
subMap.clear();
downInfos.clear();
}
/** * 暫停所有下載 */
public void pauseAll(){
for (DownInfo downInfo : downInfos) {
pause(downInfo);
}
subMap.clear();
downInfos.clear();
}複製代碼
一樣使用了封裝二中的retry處理和運行時異常自定義處理封裝(不復述了)
到此咱們的Rxjava+ReTrofit+okHttp深刻淺出-封裝就基本完成了,已經能夠徹底勝任開發和學習的所有工做,若是後續再使用過程當中有任何問題歡迎留言給我,會一直維護!
1.Retrofit+Rxjava+okhttp基本使用方法
2.統一處理請求數據格式
3.統一的ProgressDialog和回調Subscriber處理
4.取消http請求
5.預處理http請求
6.返回數據的統一判斷
7.失敗後的retry封裝處理
8.RxLifecycle管理生命週期,防止泄露
9.文件上傳和文件下載(支持多文件斷點續傳)複製代碼