在前一篇文章從Reactive編程到「好萊塢」中,談到了響應式的一些概念,講的有些發散。 但僅僅仍是停留在概念的層面,對於實戰性的東西並無涉及。
因此你們看了後,或許仍是有些不痛不癢。前端
響應式編程強調的是異步化、面向流的處理方式,這二者也並不是憑空生出,而是從大量的技術實踐中總結提煉出來的概念,就好比:java
咱們談異步化,容易聯想到 Java 異步IO(Asynchronized IO),並且習慣於將其和 BIO、NIO等概念來作對比。 卻不知,老早出現的 Swing 框架(Java UI)就已經將異步化思惟玩的很溜了,不信的能夠看看其內部 Observer模式(觀察者)的實現。react
咱們談流式處理,容易聯想到 時下當紅的 Flink框架。 但幾乎全部的大數據分析、批處理應用都是基於流式進行處理的,好比 ETL,甚至是一個最簡單的 Map Reduce 做業。web
除了前端,Reactive 概念在大數據領域的應用其實很是的普遍了。 可是對於大多數作 Web 後端開發的人來講或許普及程度並不高,以筆者自身的感覺是,碼了這麼些年頭,除了作好代碼分層以外,彷佛也沒有見到 Reactive能夠發揮重大做用的地方。 緣由就在於,在Web 後端開發領域基本是依託 HTTP協議機制實現的,這是一個至關簡單的 請求 -> 應答 交互模式,客戶端在發送請求後,會一直等待結果返回,也就是結果的通知是由客戶端主動獲取而非異步通知的,所以並非 Reactive 的風格。 但這已是符合用戶一向的使用方式了,絕大多數狀況下並不須要作什麼樣的變化,此時咱們對響應式的感知並不深入。編程
更符合Reactive 的另一個場景是 富客戶端(Rich Application),假設在須要大量複雜的前端交互的場景下,咱們能夠選擇將一些邏輯放在前端代碼中實現。 此時的 Web 交互就再也不是整個頁面的刷新,而是演變爲客戶端與服務端的"實時"雙向通信,這類應用也比較廣泛了,好比基於 WebSocket 實現的 聊天應用、小遊戲等等。後端
淺顯的從趨勢上看, Reactive 的前景仍是很明朗的,這裏並非說由於如今多數流行的編程語言中都有它的影子(好比提供了Rx風格的框架)。
而是將來的大數據處理、實時流計算會成爲主流,這是環境決定的。 而這時 Reactive 這種"面向流"的編程模式無疑是很合適的。框架
Java 平臺直到 JDK 9 才提供了對於 Reactive 的完整支持,而在此以前的JDK版本中,也以及存在一些有關聯性的API,好比:dom
這些關聯性API 並非完整的 Reactive,Java 9所支持的 Reactive Stream API 來自於2013年的響應式流規範(Reactive Stream Specification)。異步
https://www.reactive-streams.org/
基於這個規範中主要定義了下面幾個接口:
Java的響應式流接口統必定義在 java.util.concurrent.Flow接口
Publisher
即數據的發佈者。 Publisher 接口定義了一個subscribe方法,用於添加訂閱者:
Subscriber
指數據的訂閱者。 Subscriber 接口定義了4個方法,用於針對不一樣的事件做出響應。
首先,在subscribe方法調用成功後,Subscriber的 onSubscribe(Subscription s) 方法會被觸發(Subscription 表示當前的訂閱關係)。
此後,正常能夠繼續調用 Subscription 的 request(long n) 方法來向發佈者請求數據,n是指最大的數據條目數。
發佈者會產生3種不一樣的消息,分別對應到 Subscriber 的3個回調方法:
數據消息:對應 onNext 方法,表示發佈者產生的數據。
錯誤消息:對應 onError 方法,表示發佈者產生了錯誤。
結束消息:對應 onComplete 方法,表示發佈者已經完成了全部數據的發佈。
在上面的3種通知中,錯誤、結束消息都表示當前的流已經到達了終點,後面再也不會有消息產生。
Subscription
Subscription 表示的是一個訂閱關係。 能夠經過該對象請求數據(request方法),或者取消訂閱(cancel方法)。
Processor
Processor 表示的一種特殊的對象,既是生產者,又是訂閱者。
負壓的支持
負壓是響應式流定義的一種重要的能力,在上述的接口中,實質上已經提供了負壓的支持。
Publisher 只有在收到請求以後,纔會產生數據。 這就保證了 Subscriber 能夠根據本身的處理能力,肯定要向 Publisher 請求的數據量,以此保證自身不會被沖垮。
下面,以一個簡單的代碼示例來演示 Reactive Stream API 是如何使用的。
以某一個制奶廠爲例,爲了提升營收,工廠推出了一個廠家直銷的業務。 顧客能夠直接向廠方訂購必定天數的奶製品,天天則是由工廠的服務人員送奶上門。
爲了模擬這個場景,咱們實現的代碼以下:
public class MilkFactory extends SubmissionPublisher<String> { private final ScheduledFuture<?> periodicTask; private final ScheduledExecutorService scheduler; private static final List<String> milks = Arrays.asList("益力多", "酸牛奶", "原味奶", "低脂蛋奶", "羊奶", "甜牛奶"); public MilkFactory() { super(); //初始化定時器 scheduler = new ScheduledThreadPoolExecutor(1); //每一天生產完牛奶並推送給消費者 periodicTask = scheduler.scheduleAtFixedRate( () -> submit(produceMilk()), 0, 1, TimeUnit.SECONDS); } //隨機生產牛奶 private String produceMilk() { return milks.get((int) (Math.random() * milks.size())); } //關閉流 public void close() { periodicTask.cancel(false); scheduler.shutdown(); super.close(); } }
MilkFactory 集成自SubmissionPublisher(一個提供緩衝的Publisher實現),其內部會啓動一個定時器,用於模擬天天給用戶發放生產的牛奶。
經過submit()方法能夠將數據推送給用戶。
public class MilkCustomer implements Flow.Subscriber<String> { private Flow.Subscription subscription; private AtomicInteger available = new AtomicInteger(0); private int dayCount; public MilkCustomer(int dayCount) { this.dayCount = dayCount; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; //設置總量 available.set(dayCount); //第一天 subscription.request(1); } @Override public void onNext(String milk) { System.out.println("今天的牛奶到了: " + milk); //若是還有存量,繼續請求 if(available.decrementAndGet() > 0){ subscription.request(1); }else{ System.out.println("牛奶套餐已經派完,歡迎繼續訂購"); this.subscription.cancel(); } } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { System.out.println("closed."); } }
MilkCustomer 接受一個dayCount入參,即表示訂購的數量,在首次訂閱時會請求第一天的奶品,此後則每次收到到奶品後再請求下一天的,直到將總量消費完。
執行下面的代碼:
MilkFactory factory = new MilkFactory(); //訂閱1周 MilkCustomer customer = new MilkCustomer(7); factory.subscribe(customer);
輸出:
今天的牛奶到了: 酸牛奶 今天的牛奶到了: 羊奶 今天的牛奶到了: 原味奶 牛奶套餐已經派完,歡迎繼續訂購
在上例中,咱們使用 Java 提供的 Reactive Stream API 實現了一個"送奶上門" 的業務流。
整個過程相對是比較簡單的,最關鍵的地方就在於對流式處理以及訂閱關係的理解。 然而目前的 Reactive 實現尚未徹底的統一,好比 Spring WebFlux(SpringBoot 2支持) 仍然是基於 Reactor 私有API而不是 Reactive Stream API 來構建的,後面有機會再作下介紹。
關於Future和CompletableFuture的區別 https://juejin.im/post/5adbf8226fb9a07aac240a67