RxJava因爲使用了多個回調,一開始理解起來可能有點難度,其實多看幾遍也就明白了,它的招式套路都是同樣的:api
首先就是建立Observable,建立Observable有不少種方式,這裏使用了Observable.create的方式;Observable.create()須要傳入一個參數,這個參數實際上是一個回調接口,在這個接口方法裏咱們處理開網絡請求和解析xml的工做,並在最後經過onNext()、onCompleted()和onError()通知Subscriber(訂閱者);
而後就是調用Observable.subscribe()方法對Observable進行訂閱。這裏要注意,若是不調用Observable.subscribe()方法,剛纔在Observable.create()處理的網絡請求和解析xml的代碼是不會執行的,這也就解釋了本文開頭所說的「若是沒有觀察者(即Subscribers),Observables是不會發出任何事件的」
說了那麼多,好像也沒有開線程處理網絡請求啊,這樣不會報錯嗎?別急,認真看上面的代碼,我還寫了兩個方法subscribeOn(Schedulers.newThread())和observeOn(AndroidSchedulers.mainThread()),沒錯,奧妙就在於此:
3.1 subscribeOn(Schedulers.newThread())表示開一個新線程處理Observable.create()方法裏的邏輯,也就是處理網絡請求和解析xml工做
3.2 observeOn(AndroidSchedulers.mainThread())表示subscriber所運行的線程是在UI線程上,也就是更新控件的操做是在UI線程上
3.3 若是這裏只有subscribeOn()方法而沒有observeOn()方法,那麼Observable.create()和subscriber()都是運行在subscribeOn()所指定的線程中;
3.4 若是這裏只有observeOn()方法而沒有subscribeOn()方法,那麼Observable.create()運行在主線程(UI線程)中,而subscriber()是運行在observeOn()所指定的線程中(本例的observeOn()剛好是指定主線程而已)網絡
Observable<String> myObservable = Observable.create( new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted(); }
});ide
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override public void onNext(String s) { System.out.println(s); }
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
};線程
myObservable.subscribe(mySubscriber);// Outputs "Hello, world!"server
public static Observable<WeatherData> getWeatherData(final String city) {
return Observable.create(new Observable.OnSubscribeFunc<WeatherData>() {
@Override
public Subscription onSubscribe(Observer<? super WeatherData> observer) {
try {
observer.onNext(apiManager.getWeather(city, "metric"));
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return Subscriptions.empty();
}
}).subscribeOn(Schedulers.threadPoolForIO());
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onComplete == null) {
throw new IllegalArgumentException("onComplete can not be null");
}
return subscribe(new Subscriber<T>() {
@Override
public final void onCompleted() {
onComplete.call();
}
@Override
public final void onError(Throwable e) {
onError.call(e);
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
});
} xml