reactive streams與觀察者模式

本文主要研究下java裏頭的reactive streams與觀察者模式。html

reactive streams

reactive編程範式是一個異步編程範式,主要涉及數據流及變化的傳播,能夠看作是觀察者設計模式的擴展。java

java裏頭的iterator是以pull模型,即訂閱者使用next去拉取下一個數據;而reactive streams則是以push模型爲主,訂閱者調用subscribe方法訂閱,發佈者調用訂閱者的onNext通知訂閱者新消息。react

reactive streams java api

reactive streams定義了4個java api,以下spring

Processor<T,R>

processor既是Subscriber也是Publisher,表明兩者的處理階段編程

Publisher

publisher是數據的提供者, 將數據發佈給訂閱者設計模式

Subscriber

在調用Publisher.subscribe(Subscriber)以後,Subscriber.onSubscribe(Subscription)將會被調用api

Subscription

Subscription表明訂閱者與發佈者的一次訂閱週期,一旦調用cancel去掉訂閱,則發佈者不會再推送消息。緩存

觀察者模式

觀察者模式的實現有推模型和拉模型bash

  • 拉模型

即發佈者通知訂閱有新消息,訂閱者再去找發佈者拉取異步

  • 推模型

即發佈者通知訂閱者有消息,通知的時候已經帶上了一個新消息

reactor實例

maven

<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.1.2.RELEASE</version>
		</dependency>
複製代碼

reactor 3 是java裏頭reactive streams的一個實現,基於reactive streams的java api,是spring 5反應式編程的基礎。

Flux實例

@Test
    public void testBackpressure(){
        Flux.just(1, 2, 3, 4)
                .log()
                .subscribe(new Subscriber<Integer>() {
                    private Subscription s;
                    int onNextAmount;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(2);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(integer);
                        onNextAmount++;
                        if (onNextAmount % 2 == 0) {
                            s.request(2);
                        }
                    }

                    @Override
                    public void onError(Throwable t) {}

                    @Override
                    public void onComplete() {}
                });

        try {
            Thread.sleep(10*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
複製代碼

小結

從上面的代碼看,reactive streams其實是推拉結合的模式的結合。爲何還要拉呢?

rabbitmq vs kafka

rabbitmq是以推爲主的,若是消費者消費能力跟不上,則消息會堆積在內存隊列中(必要時可能寫磁盤)

kafka則是以拉爲主的,生產者推送消息到broker,消費者本身根據本身的能力從broker拉取消息,因爲消息是持久化的,所以無需關心生產消費速率的不平衡

backpressure

backpressure這個是爲處理生產速率與消費速率不平衡這個問題而衍生出來的,訂閱者能夠在next方法裏頭根據本身的狀況,使用request方法告訴發佈者要取N個數據,發佈者則向訂閱者推送N個數據。經過request達到訂閱者對發佈者的反饋。而對於發佈者而言,爲了實現backpressure,則須要有一個緩存隊列來緩衝訂閱者沒來得及消費的數據。涉及到緩衝,就涉及容量是有界仍是無界,若是是有界則在緩衝慢的時候,處理策略是怎樣等等。

doc

相關文章
相關標籤/搜索