RxRetrofit - 終極封裝 - 深刻淺出 & 斷點續傳

背景

斷點續傳下載一直是移動開發中必不可少的一項重要的技術,一樣的RxJavaRetrofit的結合讓這個技術解決起來更加的靈活,咱們徹底能夠封裝一個適合自的下載框架,簡單並且安全!html

效果


實現

下載和以前的http請求能夠相互獨立,因此咱們單獨給download創建一個工程moudel處理java

數據庫

默認項目採用的GreenDao數據庫管理數據,新版GreenDao自動生成的炒做類的方式,因此若是缺失xxxDao文件,檢查是否關聯正確的路徑git

根目錄build.gradlegithub

classpath 'org.greenrobot:greendao-gradle-plugin:+'複製代碼

依賴:數據庫

apply plugin: 'org.greenrobot.greendao'
            xxxxxxxxx
compile 'org.greenrobot:greendao:3.2.0'複製代碼

greenDAO-源碼安全

若是須要替換本身的數據庫框架字須要修改DbDownUtil文件便可app

傳送門-DbDownUtil框架

1.建立service接口

和之前同樣,先寫接口
注意:Streaming是判斷是否寫入內存的標示,若是小文件能夠考慮不寫,通常狀況必須寫;下載地址須要經過@url動態指定(不適固定的),@head標籤是指定下載的起始位置(斷點續傳的位置)dom

/*斷點續傳下載接口*/
    @Streaming/*大文件須要加入這個判斷,防止下載過程當中寫入到內存中*/
    @GET
    Observable<ResponseBody> download(@Header("RANGE") String start, @Url String url);複製代碼

2.複寫ResponseBody

和以前的上傳封裝同樣,下載更加的須要進度,因此咱們一樣覆蓋ResponseBody類,寫入進度監聽回調ide

/** * 自定義進度的body * @author wzg */
public class DownloadResponseBody extends ResponseBody {
    private ResponseBody responseBody;
    private DownloadProgressListener progressListener;
    private BufferedSource bufferedSource;

    public DownloadResponseBody(ResponseBody responseBody, DownloadProgressListener progressListener) {
        this.responseBody = responseBody;
        this.progressListener = progressListener;
    }

    @Override
    public BufferedSource source() {
        if (bufferedSource == null) {
            bufferedSource = Okio.buffer(source(responseBody.source()));
        }
        return bufferedSource;
    }

    private Source source(Source source) {
        return new ForwardingSource(source) {
            long totalBytesRead = 0L;
            @Override
            public long read(Buffer sink, long byteCount) throws IOException {
                long bytesRead = super.read(sink, byteCount);
                // read() returns the number of bytes read, or -1 if this source is exhausted.
                totalBytesRead += bytesRead != -1 ? bytesRead : 0;
                if (null != progressListener) {
                    progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
                }
                return bytesRead;
            }
        };
    }
}複製代碼

3.自定義進度回調接口

/** * 成功回調處理 * Created by WZG on 2016/10/20. */
public interface DownloadProgressListener {
    /** * 下載進度 * @param read * @param count * @param done */
    void update(long read, long count, boolean done);
}複製代碼

4.複寫Interceptor

複寫Interceptor,能夠將咱們的監聽回調經過okhttp的client方法addInterceptor自動加載咱們的監聽回調和ResponseBody

/** * 成功回調處理 * Created by WZG on 2016/10/20. */
public class DownloadInterceptor implements Interceptor {

    private DownloadProgressListener listener;

    public DownloadInterceptor(DownloadProgressListener listener) {
        this.listener = listener;
    }

    @Override
    public Response intercept(Chain chain) throws IOException {
        Response originalResponse = chain.proceed(chain.request());

        return originalResponse.newBuilder()
                .body(new DownloadResponseBody(originalResponse.body(), listener))
                .build();
    }
}複製代碼

5.封裝請求downinfo數據

這個類中的數據可自由擴展,用戶本身選擇須要保持到數據庫中的數據,能夠自由選擇須要數據庫第三方框架,demo採用greenDao框架存儲數據

public class DownInfo {
    /*存儲位置*/
    private String savePath;
    /*下載url*/
    private String url;
    /*基礎url*/
    private String baseUrl;
    /*文件總長度*/
    private long countLength;
    /*下載長度*/
    private long readLength;
    /*下載惟一的HttpService*/
    private HttpService service;
    /*回調監聽*/
    private HttpProgressOnNextListener listener;
    /*超時設置*/
    private  int DEFAULT_TIMEOUT = 6;
    /*下載狀態*/
    private DownState state;
    }複製代碼

6.DownState狀態封裝

很簡單,和大多數封裝框架同樣

public enum  DownState {
    START,
    DOWN,
    PAUSE,
    STOP,
    ERROR,
    FINISH,
}複製代碼

7.請求HttpProgressOnNextListener回調封裝類

注意:這裏和DownloadProgressListener不一樣,這裏是下載這個過程當中的監聽回調,DownloadProgressListener只是進度的監聽
經過抽象類,能夠自由選擇須要覆蓋的類,不須要徹底覆蓋!更加靈活

/** * 下載過程當中的回調處理 * Created by WZG on 2016/10/20. */
public abstract class HttpProgressOnNextListener<T> {
    /** * 成功後回調方法 * @param t */
    public abstract void onNext(T t);

    /** * 開始下載 */
    public abstract void onStart();

    /** * 完成下載 */
    public abstract void onComplete();


    /** * 下載進度 * @param readLength * @param countLength */
    public abstract void updateProgress(long readLength, long countLength);

    /** * 失敗或者錯誤方法 * 主動調用,更加靈活 * @param e */
     public void onError(Throwable e){

     }

    /** * 暫停下載 */
    public void onPuase(){

    }

    /** * 中止下載銷燬 */
    public void onStop(){

    }
}複製代碼

8.封裝回調Subscriber

準備的工做作完,須要將回調和傳入回調的信息統一封裝到sub中,統一判斷;和封裝二的原理同樣,咱們經過自定義Subscriber來提早處理返回的數據,讓用戶字須要關係成功和失敗以及向關心的數據,避免重複多餘的代碼出如今處理類中

  • sub須要繼承DownloadProgressListener,和自帶的回調一塊兒組成咱們須要的回調結果

  • 傳入DownInfo數據,經過回調設置DownInfo的不一樣狀態,保存狀態

  • 經過RxAndroid將進度回調指定到主線程中(若是不須要進度最好去掉該處理避免主線程處理負擔)

  • update進度回調在斷點續傳使用時,須要手動判斷斷點後加載的長度,由於指定斷點下載長度下載後總長度=(物理長度-起始下載長度)

/** * 用於在Http請求開始時,自動顯示一個ProgressDialog * 在Http請求結束是,關閉ProgressDialog * 調用者本身對請求數據進行處理 * Created by WZG on 2016/7/16. */
public class ProgressDownSubscriber<T> extends Subscriber<T> implements DownloadProgressListener {
    //弱引用結果回調
    private WeakReference<HttpProgressOnNextListener> mSubscriberOnNextListener;
    /*下載數據*/
    private DownInfo downInfo;


    public ProgressDownSubscriber(DownInfo downInfo) {
        this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener());
        this.downInfo=downInfo;
    }

    /** * 訂閱開始時調用 * 顯示ProgressDialog */
    @Override
    public void onStart() {
        if(mSubscriberOnNextListener.get()!=null){
            mSubscriberOnNextListener.get().onStart();
        }
        downInfo.setState(DownState.START);
    }

    /** * 完成,隱藏ProgressDialog */
    @Override
    public void onCompleted() {
        if(mSubscriberOnNextListener.get()!=null){
            mSubscriberOnNextListener.get().onComplete();
        }
        downInfo.setState(DownState.FINISH);
    }

    /** * 對錯誤進行統一處理 * 隱藏ProgressDialog * * @param e */
    @Override
    public void onError(Throwable e) {
        /*中止下載*/
        HttpDownManager.getInstance().stopDown(downInfo);
        if(mSubscriberOnNextListener.get()!=null){
            mSubscriberOnNextListener.get().onError(e);
        }
        downInfo.setState(DownState.ERROR);
    }

    /** * 將onNext方法中的返回結果交給Activity或Fragment本身處理 * * @param t 建立Subscriber時的泛型類型 */
    @Override
    public void onNext(T t) {
        if (mSubscriberOnNextListener.get() != null) {
            mSubscriberOnNextListener.get().onNext(t);
        }
    }

    @Override
    public void update(long read, long count, boolean done) {
        if(downInfo.getCountLength()>count){
            read=downInfo.getCountLength()-count+read;
        }else{
            downInfo.setCountLength(count);
        }
        downInfo.setReadLength(read);
        if (mSubscriberOnNextListener.get() != null) {
            /*接受進度消息,形成UI阻塞,若是不須要顯示進度可去掉實現邏輯,減小壓力*/
            rx.Observable.just(read).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                      /*若是暫停或者中止狀態延遲,不須要繼續發送回調,影響顯示*/
                    if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.STOP)return;
                    downInfo.setState(DownState.DOWN);
                    mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength());
                }
            });
        }
    }

}複製代碼

9.下載管理類封裝HttpDownManager

1. 單利獲取

/** * 獲取單例 * @return */
    public static HttpDownManager getInstance() {
        if (INSTANCE == null) {
            synchronized (HttpDownManager.class) {
                if (INSTANCE == null) {
                    INSTANCE = new HttpDownManager();
                }
            }
        }
        return INSTANCE;
    }複製代碼

2. 由於單利因此須要記錄正在下載的數據和回到sub

/*回調sub隊列*/
    private HashMap<String,ProgressDownSubscriber> subMap;
    /*單利對象*/
    private volatile static HttpDownManager INSTANCE;

    private HttpDownManager(){
        downInfos=new HashSet<>();
        subMap=new HashMap<>();
    }複製代碼

3.開始下載須要記錄下載的service避免每次都重複建立,而後請求sercie接口,獲得ResponseBody數據後將數據流寫入到本地文件中(6.0系統後須要提早申請權限)

/** * 開始下載 */
    public void startDown(DownInfo info){
        /*正在下載不處理*/
        if(info==null||subMap.get(info.getUrl())!=null){
            return;
        }
        /*添加回調處理類*/
        ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info);
        /*記錄回調sub*/
        subMap.put(info.getUrl(),subscriber);
        /*獲取service,屢次請求公用一個sercie*/
        HttpService httpService;
        if(downInfos.contains(info)){
            httpService=info.getService();
        }else{
            DownloadInterceptor interceptor = new DownloadInterceptor(subscriber);
            OkHttpClient.Builder builder = new OkHttpClient.Builder();
            //手動建立一個OkHttpClient並設置超時時間
            builder.connectTimeout(info.getConnectionTime(), TimeUnit.SECONDS);
            builder.addInterceptor(interceptor);

            Retrofit retrofit = new Retrofit.Builder()
                    .client(builder.build())
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                    .baseUrl(info.getBaseUrl())
                    .build();
            httpService= retrofit.create(HttpService.class);
            info.setService(httpService);
        }
        /*獲得rx對象-上一次下載的位置開始下載*/
        httpService.download("bytes=" + info.getReadLength() + "-",info.getUrl())
                /*指定線程*/
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                   /*失敗後的retry配置*/
                .retryWhen(new RetryWhenNetworkException())
                /*讀取下載寫入文件*/
                .map(new Func1<ResponseBody, DownInfo>() {
                    @Override
                    public DownInfo call(ResponseBody responseBody) {
                        try {
                            writeCache(responseBody,new File(info.getSavePath()),info);
                        } catch (IOException e) {
                            /*失敗拋出異常*/
                            throw new HttpTimeException(e.getMessage());
                        }
                        return info;
                    }
                })
                /*回調線程*/
                .observeOn(AndroidSchedulers.mainThread())
                /*數據回調*/
                .subscribe(subscriber);

    }複製代碼

4.寫入文件

注意:一開始調用進度回調是第一次寫入在進度回調以前,因此須要判斷一次DownInfo是否獲取到下載總長度,沒有這選擇當前ResponseBody 讀取長度爲總長度

/** * 寫入文件 * @param file * @param info * @throws IOException */
    public void writeCache(ResponseBody responseBody,File file,DownInfo info) throws IOException{
        if (!file.getParentFile().exists())
            file.getParentFile().mkdirs();
        long allLength;
        if (info.getCountLength()==0){
            allLength=responseBody.contentLength();
        }else{
            allLength=info.getCountLength();
        }
            FileChannel channelOut = null;
            RandomAccessFile randomAccessFile = null;
            randomAccessFile = new RandomAccessFile(file, "rwd");
            channelOut = randomAccessFile.getChannel();
            MappedByteBuffer mappedBuffer = channelOut.map(FileChannel.MapMode.READ_WRITE,
                    info.getReadLength(),allLength-info.getReadLength());
            byte[] buffer = new byte[1024*8];
            int len;
            int record = 0;
            while ((len = responseBody.byteStream().read(buffer)) != -1) {
                mappedBuffer.put(buffer, 0, len);
                record += len;
            }
            responseBody.byteStream().close();
                if (channelOut != null) {
                    channelOut.close();
                }
                if (randomAccessFile != null) {
                    randomAccessFile.close();
                }
    }複製代碼

5.中止下載

調用 subscriber.unsubscribe()解除監聽,而後remove記錄的下載數據和sub回調,而且設置下載狀態(同步數據庫本身添加)

/** * 中止下載 */
    public void stopDown(DownInfo info){
        if(info==null)return;
        info.setState(DownState.STOP);
        info.getListener().onStop();
        if(subMap.containsKey(info.getUrl())) {
            ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
            subscriber.unsubscribe();
            subMap.remove(info.getUrl());
        }
        /*保存數據庫信息和本地文件*/
        db.save(info);
    }複製代碼

6.暫停下載

原理和中止下載原理同樣

/** * 暫停下載 * @param info */
    public void pause(DownInfo info){
        if(info==null)return;
        info.setState(DownState.PAUSE);
        info.getListener().onPuase();
        if(subMap.containsKey(info.getUrl())){
            ProgressDownSubscriber subscriber=subMap.get(info.getUrl());
            subscriber.unsubscribe();
            subMap.remove(info.getUrl());
        }
        /*這裏須要講info信息寫入到數據中,可自由擴展,用本身項目的數據庫*/
        db.update(info);
    }複製代碼

7.暫停所有和中止所有下載任務

/** * 中止所有下載 */
    public void stopAllDown(){
        for (DownInfo downInfo : downInfos) {
            stopDown(downInfo);
        }
        subMap.clear();
        downInfos.clear();
    }

    /** * 暫停所有下載 */
    public void pauseAll(){
        for (DownInfo downInfo : downInfos) {
            pause(downInfo);
        }
        subMap.clear();
        downInfos.clear();
    }複製代碼

8.整合代碼HttpDownManager

一樣使用了封裝二中的retry處理和運行時異常自定義處理封裝(不復述了)

傳送門-HttpDownManager

總結

到此咱們的Rxjava+ReTrofit+okHttp深刻淺出-封裝就基本完成了,已經能夠徹底勝任開發和學習的所有工做,若是後續再使用過程當中有任何問題歡迎留言給我,會一直維護!

1.Retrofit+Rxjava+okhttp基本使用方法
    2.統一處理請求數據格式
    3.統一的ProgressDialog和回調Subscriber處理
    4.取消http請求
    5.預處理http請求
    6.返回數據的統一判斷
    7.失敗後的retry封裝處理
    8.RxLifecycle管理生命週期,防止泄露
    9.文件上傳和文件下載(支持多文件斷點續傳)複製代碼

終極封裝專欄

RxJava+Retrofit+OkHttp深刻淺出-終極封裝專欄)


源碼

傳送門-下載封裝源碼

傳送門-所有封裝源碼


建議

若是你對這套封裝有任何的問題和建議歡迎加入QQ羣告訴我!

相關文章
相關標籤/搜索