本文主要研究下java裏頭的reactive streams與觀察者模式。html
reactive編程範式是一個異步編程範式,主要涉及數據流及變化的傳播,能夠看作是觀察者設計模式的擴展。java
java裏頭的iterator是以pull模型,即訂閱者使用next去拉取下一個數據;而reactive streams則是以push模型爲主,訂閱者調用subscribe方法訂閱,發佈者調用訂閱者的onNext通知訂閱者新消息。react
reactive streams定義了4個java api,以下spring
processor既是Subscriber也是Publisher,表明兩者的處理階段編程
publisher是數據的提供者, 將數據發佈給訂閱者設計模式
在調用Publisher.subscribe(Subscriber)以後,Subscriber.onSubscribe(Subscription)將會被調用api
Subscription表明訂閱者與發佈者的一次訂閱週期,一旦調用cancel去掉訂閱,則發佈者不會再推送消息。緩存
觀察者模式的實現有推模型和拉模型bash
即發佈者通知訂閱有新消息,訂閱者再去找發佈者拉取異步
即發佈者通知訂閱者有消息,通知的時候已經帶上了一個新消息
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.2.RELEASE</version>
</dependency>
複製代碼
reactor 3 是java裏頭reactive streams的一個實現,基於reactive streams的java api,是spring 5反應式編程的基礎。
@Test
public void testBackpressure(){
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int onNextAmount;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(2);
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
onNextAmount++;
if (onNextAmount % 2 == 0) {
s.request(2);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});
try {
Thread.sleep(10*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
複製代碼
從上面的代碼看,reactive streams其實是推拉結合的模式的結合。爲何還要拉呢?
rabbitmq是以推爲主的,若是消費者消費能力跟不上,則消息會堆積在內存隊列中(
必要時可能寫磁盤
)
kafka則是以拉爲主的,生產者推送消息到broker,消費者本身根據本身的能力從broker拉取消息,因爲消息是持久化的,所以無需關心生產消費速率的不平衡
backpressure這個是爲處理生產速率與消費速率不平衡這個問題而衍生出來的,訂閱者能夠在next方法裏頭根據本身的狀況,使用request方法告訴發佈者要取N個數據,發佈者則向訂閱者推送N個數據。經過request達到訂閱者對發佈者的反饋。而對於發佈者而言,爲了實現backpressure,則須要有一個緩存隊列來緩衝訂閱者沒來得及消費的數據。涉及到緩衝,就涉及容量是有界仍是無界,若是是有界則在緩衝慢的時候,處理策略是怎樣等等。