源碼解析,如需轉載,請註明做者:Yuloran (t.cn/EGU6c76)java
造輪子者:Season_zlc併發
本文主要講述 RxDownload2
的線程調度異步
顧名思義,就是分發下載任務的線程。該線程運行在 DownloadService
中,從業務上看,DownloadService
應當僅被 start() & bind()
一次。任務分發線程,在 onBind()
時建立:ide
/** * start and bind service. * * @param callback Called when service connected. */
private void startBindServiceAndDo(final ServiceConnectedCallback callback) {
Intent intent = new Intent(context, DownloadService.class);
intent.putExtra(DownloadService.INTENT_KEY, maxDownloadNumber);
context.startService(intent);
context.bindService(intent, new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder binder) {
DownloadService.DownloadBinder downloadBinder
= (DownloadService.DownloadBinder) binder;
downloadService = downloadBinder.getService();
context.unbindService(this);
bound = true;
callback.call();
}
@Override
public void onServiceDisconnected(ComponentName name) {
//注意!!這個方法只會在系統殺掉Service時纔會調用!!
bound = false;
}
}, Context.BIND_AUTO_CREATE);
}
複製代碼
上述代碼有個細節,onServiceConnected()
中立刻調了 unbindService()
。post
@Nullable
@Override
public IBinder onBind(Intent intent) {
log("bind Download Service");
startDispatch();
return mBinder;
}
複製代碼
/** * start dispatch download queue. */
private void startDispatch() {
disposable = Observable
.create(new ObservableOnSubscribe<DownloadMission>() {
@Override
public void subscribe(ObservableEmitter<DownloadMission> emitter) throws Exception {
DownloadMission mission;
while (!emitter.isDisposed()) {
try {
log(WAITING_FOR_MISSION_COME);
mission = downloadQueue.take();
log(Constant.MISSION_COMING);
} catch (InterruptedException e) {
log("Interrupt blocking queue.");
continue;
}
emitter.onNext(mission);
}
emitter.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<DownloadMission>() {
@Override
public void accept(DownloadMission mission) throws Exception {
mission.start(semaphore);
}
});
}
複製代碼
.subscribeOn(Schedulers.newThread())
代表該線程經過 new Thread() 的方式產生的disposable
從新賦值前,沒有先嚐試對其取消訂閱。若是屢次調用 bindService()
,就會出現線程泄露顧名思義,就是下載任務的執行線程。該線程運行在 Schedulers.io()
線程池上。入參信號量用來限制同時下載的最大任務數。ui
@Override
public void start(final Semaphore semaphore) {
disposable = start(bean, semaphore, new MissionCallback() {
@Override
public void start() {
// 回調開始下載
if (callback != null) callback.start();
}
@Override
public void next(DownloadStatus value) {
// 回調下載中
status = value;
processor.onNext(started(value));
if (callback != null) callback.next(value);
}
@Override
public void error(Throwable throwable) {
// 回調下載失敗
processor.onNext(failed(status, throwable));
if (callback != null) callback.error(throwable);
}
@Override
public void complete() {
// 回調下載完成
processor.onNext(completed(status));
if (callback != null) callback.complete();
}
});
}
複製代碼
protected Disposable start(DownloadBean bean, final Semaphore semaphore, final MissionCallback callback) {
return rxdownload.download(bean)
.subscribeOn(Schedulers.io()) // 指定下載任務執行線程
.doOnLifecycle(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
if (canceled.get()) {
dispose(disposable);
}
log(TRY_TO_ACQUIRE_SEMAPHORE);
// 申請信號量
semaphore.acquire();
log(ACQUIRE_SUCCESS);
// 得到信號量後,需再次檢測是否已經暫停下載
if (canceled.get()) {
// 已經暫停,則取消訂閱,釋放信號量
dispose(disposable);
} else {
callback.start();
}
}
}, new Action() {
@Override
public void run() throws Exception {
// 取消訂閱時,須要釋放信號量
semaphore.release();
}
})
.subscribe(new Consumer<DownloadStatus>() {
@Override
public void accept(DownloadStatus value) throws Exception {
// 回調下載進度
callback.next(value);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
// 回調下載失敗
callback.error(throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
// 回調下載完成
callback.complete();
}
});
}
複製代碼
顧名思義,就是中斷下載任務的線程。包括暫停、刪除、所有暫停、所有取消四個操做。這些操做也運行在 Schedulers.io()
線程池上。this
/** * Pause download. * <p> * Pause a url or all tasks belonging to missionId. * * @param missionId url or missionId */
public Observable<?> pauseServiceDownload(final String missionId) {
// createGeneralObservable 是一個異步綁定下載服務的Observable,經過資源數爲1的信號量實現強制同步
return createGeneralObservable(new GeneralObservableCallback() {
@Override
public void call() {
// 服務綁定後,調用服務的暫停下載
downloadService.pauseDownload(missionId);
}
}).observeOn(AndroidSchedulers.mainThread());
}
複製代碼
/** * return general observable * * @param callback Called when observable created. * @return Observable */
private Observable<?> createGeneralObservable(final GeneralObservableCallback callback) {
// 方法名起的很差,應該叫 bindService
return Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
if (!bound) {
// 由於 onServiceConnected 是異步回調的,因此這裏用了個資源數爲1的信號量實現強制同步(CountDownLatch也能夠實現強制同步)
semaphore.acquire();
if (!bound) {
startBindServiceAndDo(new ServiceConnectedCallback() {
@Override
public void call() {
// 服務綁定後,回調 callback
doCall(callback, emitter);
// 釋放信號量
semaphore.release();
}
});
} else {
doCall(callback, emitter);
semaphore.release();
}
} else {
doCall(callback, emitter);
}
}
}).subscribeOn(Schedulers.io()); // 指定在 io 線程執行,因此暫停下載也是在這個線程執行
}
複製代碼
同理,刪除下載也會先調用 createGeneralObservable()
,因此刪除操做也是在 Schedulers.io()
上執行的。url
Schedulers.newThread()
Schedulers.io()
Schedulers.io()
RxDownload2 系列文章:spa