Observable<String> observable = Observable.just("hello world"); Consumer<String> consumer2 = new Consumer<String>(){ @Override public void accept(String s) throws Exception { System.out.println("Test1" + s); } }; observable.subscribe(consumer2);
咱們從最簡單的一段代碼入手()ide
Observable.just
實際上just
方法就是返回了一個ObservableJust
spa
ObservableJust
Observable.subscribe
建立了一個LambdaObserver
觀察者code
LambdaObserver
主要有onSubscribe
,onNext
,onError
,onComplete
方法對應的執行的邏輯是對應的是類構造參數的回調接口server
調用了ObservableJust.subscribeActual
入參是LambdaObserver
建立了一個ScalarDisposable
接口
ScalarDisposable
先會調用LambdaObserver
的onSubscribe
而後調用ScalarDisposable.run
根據上圖其實就是調用LambdaObserver.onNext
ip