本文連接java
下載文件是一個比較常見的需求。給定一個url,咱們能夠使用URLConnection下載文件。 使用OkHttp也能夠經過流來下載文件。 給OkHttp中添加攔截器,便可實現下載進度的監聽功能。android
代碼能夠參考:github.com/RustFisher/…git
獲取並使用字節流,須要注意兩個要點,一個是服務接口方法的 @Streaming 註解,另外一個是獲取到ResponseBody。github
獲取流(Stream)。先定義一個服務ApiService。給方法添加上@Streaming的註解。網絡
private interface ApiService {
@Streaming
@GET
Observable<ResponseBody> download(@Url String url);
}
複製代碼
初始化OkHttp。記得填入你的baseUrl。app
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.connectTimeout(8, TimeUnit.SECONDS)
.build();
retrofit = new Retrofit.Builder()
.client(okHttpClient)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.baseUrl("https://yourbaseurl.com")
.build();
複製代碼
發起網絡請求。獲取到ResponseBody。socket
String downUrl = "xxx.com/aaa.apk";
retrofit.create(ApiService.class)
.download(downUrl)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.doOnNext(new Consumer<ResponseBody>() {
@Override
public void accept(ResponseBody responseBody) throws Exception {
// 處理 ResponseBody 中的流
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "accept on error: " + downUrl, throwable);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ResponseBody>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ResponseBody responseBody) {
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "Download center retrofit onError: ", e);
}
@Override
public void onComplete() {
}
});
複製代碼
經過ResponseBody拿到字節流 body.byteStream()。這裏會先建立一個臨時文件tmpFile,把數據寫到臨時文件裏。 下載完成後再重命名成目標文件targetFile。ide
public void saveFile(ResponseBody body) {
state = DownloadTaskState.DOWNLOADING;
byte[] buf = new byte[2048];
int len;
FileOutputStream fos = null;
try {
Log.d(TAG, "saveFile: body content length: " + body.contentLength());
srcInputStream = body.byteStream();
File dir = tmpFile.getParentFile();
if (dir == null) {
throw new FileNotFoundException("target file has no dir.");
}
if (!dir.exists()) {
boolean m = dir.mkdirs();
onInfo("Create dir " + m + ", " + dir);
}
File file = tmpFile;
if (!file.exists()) {
boolean c = file.createNewFile();
onInfo("Create new file " + c);
}
fos = new FileOutputStream(file);
long time = System.currentTimeMillis();
while ((len = srcInputStream.read(buf)) != -1 && !isCancel) {
fos.write(buf, 0, len);
int duration = (int) (System.currentTimeMillis() - time);
int overBytes = len - downloadBytePerMs() * duration;
if (overBytes > 0) {
try {
Thread.sleep(overBytes / downloadBytePerMs());
} catch (Exception e) {
e.printStackTrace();
}
}
time = System.currentTimeMillis();
if (isCancel) {
state = DownloadTaskState.CLOSING;
srcInputStream.close();
break;
}
}
if (!isCancel) {
fos.flush();
boolean rename = tmpFile.renameTo(targetFile);
if (rename) {
setState(DownloadTaskState.DONE);
onSuccess(url);
} else {
setState(DownloadTaskState.ERROR);
onError(url, new Exception("Rename file fail. " + tmpFile));
}
}
} catch (FileNotFoundException e) {
Log.e(TAG, "saveFile: FileNotFoundException ", e);
setState(DownloadTaskState.ERROR);
onError(url, e);
} catch (Exception e) {
Log.e(TAG, "saveFile: IOException ", e);
setState(DownloadTaskState.ERROR);
onError(url, e);
} finally {
try {
if (srcInputStream != null) {
srcInputStream.close();
}
if (fos != null) {
fos.close();
}
} catch (IOException e) {
Log.e(TAG, "saveFile", e);
}
if (isCancel) {
onCancel(url);
}
}
}
複製代碼
每次讀數據的循環,計算讀了多少數據和用了多少時間。超過限速後主動sleep一下,達到控制下載速度的效果。 要注意不能sleep過久,以避免socket關閉。 這裏控制的是網絡數據流與本地文件的讀寫速度。ui
OkHttp實現下載進度監聽,能夠從字節流的讀寫那裏入手。也能夠使用攔截器,參考官方的例子。 這裏用攔截器的方式實現網絡下載進度監聽功能。this
先定義回調。
public interface ProgressListener {
void update(String url, long bytesRead, long contentLength, boolean done);
}
複製代碼
自定義ProgressResponseBody。
public class ProgressResponseBody extends ResponseBody {
private final ResponseBody responseBody;
private final ProgressListener progressListener;
private BufferedSource bufferedSource;
private final String url;
ProgressResponseBody(String url, ResponseBody responseBody, ProgressListener progressListener) {
this.responseBody = responseBody;
this.progressListener = progressListener;
this.url = url;
}
@Override
public MediaType contentType() {
return responseBody.contentType();
}
@Override
public long contentLength() {
return responseBody.contentLength();
}
@Override
public BufferedSource source() {
if (bufferedSource == null) {
bufferedSource = Okio.buffer(source(responseBody.source()));
}
return bufferedSource;
}
private Source source(final Source source) {
return new ForwardingSource(source) {
long totalBytesRead = 0L;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead = super.read(sink, byteCount);
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += bytesRead != -1 ? bytesRead : 0;
progressListener.update(url, totalBytesRead, responseBody.contentLength(), bytesRead == -1);
return bytesRead;
}
};
}
}
複製代碼
定義攔截器。從Response中獲取信息。
public class ProgressInterceptor implements Interceptor {
private ProgressListener progressListener;
public ProgressInterceptor(ProgressListener progressListener) {
this.progressListener = progressListener;
}
@NotNull
@Override
public Response intercept(@NotNull Chain chain) throws IOException {
Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body(new ProgressResponseBody(chain.request().url().url().toString(), originalResponse.body(), progressListener))
.build();
}
}
複製代碼
在建立OkHttpClient時添加ProgressInterceptor。
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.connectTimeout(8, TimeUnit.SECONDS)
.addInterceptor(new ProgressInterceptor(new ProgressListener() {
@Override
public void update(String url, long bytesRead, long contentLength, boolean done) {
// tellProgress(url, bytesRead, contentLength, done);
}
}))
.build();
複製代碼
值得注意的是這裏的進度更新很是頻繁。並不必定每次回調都要去更新UI。