RxJava是ReactiveX在JVM上的一個實現,使用可觀察序列來編寫異步和基於事件的程序的庫。它擴展了觀察者模式以支持數據/事件序列,並添加了容許您以聲明方式組合序列的運算符,同時抽象出對低級線程,同步,線程安全和併發數據結構等問題的關注。java
觀察者模式也被稱爲發佈-訂閱(Publish/Subscribe)模式,它屬於行爲型模式的一種。觀察者模式定義了一種一對多的依賴關係,一個主題對象可被多個觀察者對象同時監聽。當這個主題對象狀態變化時,會通知全部觀察者對象並做出相應處理邏輯。react
1.建立抽象被觀察者(Subject):git
public interface Star {
/**
* 添加粉絲
*/
void addFan(Fan fan);
/**
* 取消粉絲
*/
void removeFan(Fan fan);
/**
* 分享動態
*/
void notifyFan(String message);
}
複製代碼
2.建立抽象觀察者(Observer)github
public interface Fan {
/**
* 更新動態
*/
void update(String message);
}
複製代碼
3.建立具體被觀察者(Concrete Subject 具體明星)安全
public class AStar implements Star{
private List<Fan> fanList = null;
public AStar(){
fanList = new ArrayList<Fan>();
}
@Override
public void addFan(Fan fan){
fanList.add(fan);
}
@Override
public void removeFan(Fan fan){
fanList.remove(fan);
}
@Override
public void notifyFan(String message){
for(Fan fan : fanList){
fan.update("AStar 發佈了 ** 信息");
}
}
}
複製代碼
4.建立具體觀察者(Concrere Observer 具體粉絲)bash
public class AFan implements Fan{
private String fanName;
public AFan(String fanName){
this.fanName = fanName;
}
@Override
public void update(String message){
Log.d("AFan 收到了 AStar 發佈的消息");
}
}
複製代碼
Observable(被觀察者),Observer(觀察者),subscribe(訂閱)。數據結構
Observable 是一個抽象類,實現了ObservableSource抽象接口。併發
public abstract class Observable<T> implements ObservableSource<T> {
......
}
複製代碼
ObservableSource中subscribe()用來訂閱觀察者,因此ObservableSource至關於抽象被觀察者。異步
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(@NonNull Observer<? super T> observer);
}
複製代碼
經過ObservableSource的subscribe()方法可知抽象觀察者爲裏面的參數對象Observer。async
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
複製代碼
/**
* 建立Observable
*/
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
}
});
/**
* Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.
* @param <T> the element type
* @param source the emitter that is called when an Observer subscribes to the returned {@code Observable}
* @return the new Observable instance
* @see ObservableOnSubscribe
* @see ObservableEmitter
* @see Cancellable
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼
經過create方法源碼可知ObservableCreate爲具體被觀察者。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
複製代碼
上述實現observer接口的observer爲具體觀察者。
Rxjava訂閱實現
observable.subscribe(observer);
複製代碼
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
複製代碼