響應式編程系列(一):什麼是響應式編程?reactor入門

響應式編程 系列文章目錄

(一)什麼是響應式編程?reactor入門html

(二)Flux入門學習:流的概念,特性和基本操做react

(三)Flux深刻學習:流的高級特性和進階用法web

(四)reactor-core響應式api如何測試和調試?數據庫

(五)Spring reactive: Spring WebFlux的使用編程

(六)Spring reactive: webClient的使用api

引言

  Spring framework 5 的一大新特性:響應式編程(Reactive Programming)。那麼什麼是響應式?他能給咱們帶來什麼?如何優雅地使用?本系列會從最基礎的概念和簡單的api講起,再慢慢深刻探討響應式的一些高級特性,最後講解實戰內容,例如WebFlux和WebClient等在Spring boot中的使用,如何測試和調試。網絡

  想要了解原理的話,美團點評的這篇博客 Java NIO淺析 很是適合入門。多線程

簡單地說:異步

  當咱們調用socket.read()、socket.write()這類阻塞函數的時候,這類函數不能當即返回,也沒法中斷,須要等待socket可讀或者可寫,纔會返回,所以一個線程只能處理一個請求。在這等待的過程當中,cpu並不幹活,(即阻塞住了),那麼cpu的資源就沒有很好地利用起來。所以對於這種狀況,咱們使用多線程來提升cpu資源的利用率:在等待的這段時間,就能夠切換到別的線程去處理事件,直到socket可讀或可寫了,經過中斷信號通知cpu,再切換回來繼續處理數據。例如線程A正在等待socket可讀,而線程B已經就緒了,那麼就能夠先切換到線程B去處理。雖然上下文切換也會花一些時間,可是遠比阻塞在線程A這裏空等要好。固然計算機內部實際的狀況比這複雜得多。socket

  而NIO的讀寫函數能夠馬上返回,這就給了咱們不開線程利用CPU的最好機會:若是一個鏈接不能讀寫(socket.read()返回0或者socket.write()返回0),咱們能夠把這件事記下來。所以只須要一個線程不斷地輪詢這些事件,一旦有就緒的時間,處理便可。不須要多線程。

 

阻塞型IO

  • 須要多線程,即須要很大的線程池。
  • 每一個請求都要有一個單獨的線程去處理。

 

非阻塞型IO

  • 只須要數量很是少的線程。
  • 固定的幾個工做線程去處理事件。

 

使用NIO咱們能獲得什麼?

  • 事件驅動模型
  • 避免多線程
  • 單線程處理多任務
  • 非阻塞I/O,I/O讀寫再也不阻塞,而是返回0
  • 基於block的傳輸,一般比基於流的傳輸更高效
  • 更高級的IO函數,zero-copy
  • IO多路複用大大提升了Java網絡應用的可伸縮性和實用性

響應式編程入門

  響應式編程就是基於reactor的思想,當你作一個帶有必定延遲的纔可以返回的io操做時,不會阻塞,而是馬上返回一個流,而且訂閱這個流,當這個流上產生了返回數據,能夠馬上獲得通知並調用回調函數處理數據。

 

基本模型

咱們首先須要理解響應式編程的基本模型:

 

Flux

  Reactor中的發佈者(Publisher)由FluxMono兩個類定義,它們都提供了豐富的操做符(operator)。一個Flux對象表明一個包含0..N個元素的響應式序列,元素能夠是普通對象、數據庫查詢的結果、http響應體,甚至是異常。而一個Mono對象表明一個包含零/一個(0..1)元素的結果。上圖就是一個Flux類型的數據流,Flux往流上發送了3個元素,Subscriber經過訂閱這個流來接收通知。

如何建立一個流?最簡單的方式有如下幾種:

//建立一個流,並直接往流上發佈一個值爲value數據
Flux.just(value);

//經過list建立一個流,往流上依次發佈list中的數據
Flux.fromIterable(list);

//建立一個流,並向流上從i開始連續發佈n個數據,數據類型爲Integer
Flux.range(i, n);

//建立一個流,並定時向流上發佈一個數據,數據從0開始遞增,數據類型爲Long
Flux.interval(Duration.ofSeconds(n));

既然是「數據流」的發佈者,Flux和Mono均可以發出三種「數據信號」:元素值、錯誤信號、完成信號,錯誤信號和完成信號都是終止信號,完成信號用於告知下游訂閱者該數據流正常結束,錯誤信號終止數據流的同時將錯誤傳遞給下游訂閱者。

Subscriber 

subscriber是一個訂閱者,他只有很是簡單的4個接口:

public interface Subscriber<T> {
    void onSubscribe(Subscription var1);

    //收到下一個元素值信號時的行爲
    void onNext(T var1);

    //收到錯誤信號時的行爲
    void onError(Throwable var1);

    //收到終止信號時的行爲
    void onComplete();
}

Subscriber必需要訂閱一個Flux纔可以接收通知:

flux.subscribe(
    value -> handleData(value),
    error -> handleError(error),
    () -> handleComplete()
);

上面這個例子經過lambda表達式,定義了Subscriber分別在收到消息,收到錯誤,和消息流結束時的行爲,當Subscriber接收到一個新數據,就會異步地執行handleData方法處理數據。

 

簡單例子:

接下來咱們建立幾個最簡單的流來試一下:

首先咱們新建一個maven項目,引入reactor的類庫:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <version>3.2.3.RELEASE</version>
        <scope>test</scope>
    </dependency>
</dependencies>

 

編寫代碼以下:

public class ReactorTests {

    @After
    public void after() {
        sleep(30_000);
    }

    @Test
    public void testJust() {
        Flux.just("hello", "world")
            .subscribe(System.out::println);
    }

    @Test
    public void testList() {
        List<String> words = Arrays.asList(
            "hello",
            "reactive",
            "world"
        );

        Flux.fromIterable(words)
            .subscribe(System.out::println);
    }

    @Test
    public void testRange() {
        Flux.range(1, 10)
            .subscribe(System.out::println);
    }

    @Test
    public void testInterval() {
        Flux.interval(Duration.ofSeconds(1))
            .subscribe(System.out::println);
    }
}

訂閱這些流,收到數據以後只是簡單地把它打印出來,運行這些Test,就可以看到訂閱者在接收到流上的數據時,異步地去處理這些數據。

相關文章
相關標籤/搜索