springboot2 已經發布,其中最亮眼的非webflux響應式編程莫屬了!響應式的weblfux能夠支持高吞吐量,意味着使用相同的資源能夠處理更加多的請求,毫無疑問將會成爲將來技術的趨勢,是必學的技術!不少人都看過相關的入門教程,但看完以後總以爲很迷糊,知其然不知道其因此然,包括我本人也有相同的疑惑。後面在研究和學習中發現,是個人學習路徑不對,不少基本概念不熟悉,以前公司主打的jdk版本仍是1.6/1.7,直接跳到運行在jdk8上的webflux,跨度太大,迷惑是在所不免的!html
在這裏我我的推薦的學習途徑以下:先學習jdk8的lambda表達式和stream流編程,瞭解函數式編程的知識點和思想,接着學習jdk9的響應式流flux,理解響應式流概念,理解背壓和實現機制。這2者學好以後,很容易理解webflux的基石reactor,再學習webflux就水到渠成了!java
這裏我記錄了本身的學習之路,列出了每一塊的學習重點,除了API的知識點學習以外,更加劇要的瞭解底層運行機制和實現原理。對於我我的來講,學習技術若是不瞭解原理,知識點須要死記硬背,而瞭解了底層機制以後,不但不須要死記硬背,還能夠把本身的技術點連成面融會貫通,很容易觸類旁通,知識點也不會忘記,也能和別人扯扯技術的底層實現了。react
下面只講解重點/高級知識和底層原理,入門教程請自行搜索學習web
lambda表達式最終會返回一個實現了指定接口的實例,看上去和內部匿名類很像,但有一個最大的區別就是代碼裏面的this,內部匿名類this指向的就是匿名類,而lambda表達式裏面的this指向的當前類。面試
package jdk8.lambda; /** * lambda表達式的this * * @author 曉風輕 * */ public class ThisDemo { private String name = "ThisDemo"; public void test() { // 匿名類實現 new Thread(new Runnable() { private String name = "Runnable"; @Override public void run() { System.out.println("這裏的this指向匿名類:" + this.name); } }).start(); // lambda實現 new Thread(() -> { System.out.println("這裏的this指向當前的ThisDemo類:" + this.name); }).start(); } public static void main(String[] args) { ThisDemo demo = new ThisDemo(); demo.test(); } }
輸出spring
這裏的this指向匿名類:Runnable
這裏的this指向當前的ThisDemo類:ThisDemomongodb
lambda表達式裏面,會把lambda表達式在本類中生成一個以lambda$+數字的方法。關鍵點:該方法不必定是static的方法,是static仍是非static,取決於lambda表達式裏面是否引用了this。這就是爲何lambda表達式裏面的this指向的是本地,由於他在本類裏面建立了一個方法,而後把lambda表達式裏面的代碼放進去。數據庫
// lambda實現 // 下面會自動生成lambda$0方法,因爲使用了this,因此是非static方法 new Thread(() -> { System.out.println("這裏的this指向當前的ThisDemo類:" + this.name); }).start(); // lambda實現 // 下面會自動生成lambda$1方法,因爲使用了this,因此是static方法 new Thread(() -> { System.out.println("這裏沒有引用this,生成的lambda1方法是static的"); }).start();
上面代碼會自動生成2個lambda$方法編程
使用javap -s -p 類名, 能夠看出一個是static,一個是非staic的tomcat
這就是爲何lambda表達式裏面的this指向當前類的底層機制!由於代碼就是在本類的一個方法裏面執行的。
額外說一句,自動生成的方法是否帶參數取決於lambda是否有參數,例子中表達式沒有參數(箭頭左邊是空的),因此自動生成的也沒有。
方法引用有多種,靜態方法的方法引用很好理解,但實例對象的方法引用一開始確實讓我有點費解,這和靜態方法引用由啥區別?看上去很像啊。
class DemoClass { /** * 這裏是一個靜態方法 */ public static int staticMethod(int i) { return i * 2; } /** * 這裏是一個實例方法 */ public int normalMethod(int i) { System.out.println("實例方法能夠訪問this:" + this); return i * 3; } } public class MethodRefrenceDemo { public static void main(String[] args) { // 靜態方法的方法引用 IntUnaryOperator methodRefrence1 = DemoClass::staticMethod; System.out.println(methodRefrence1.applyAsInt(111)); DemoClass demo = new DemoClass(); // 實例方法的方法引用 IntUnaryOperator methodRefrence2 = demo::normalMethod; System.out.println(methodRefrence2.applyAsInt(111)); } }
這裏牽涉到不一樣的語言裏面對this的實現方法。咱們知道靜態方法和實例方法的區別是實例方法有this,靜態方法沒有。java裏面是怎麼樣實現this的呢?
java裏面在默認把this做爲參數,放到實例方法的第一個參數。
就是說:
/** * 這裏是一個實例方法 */ public int normalMethod(int i) { System.out.println("實例方法能夠訪問this:" + this); return i * 2; }
編譯以後和下面這樣的代碼編譯以後是同樣的!
/** * 這裏是一個實例方法 */ public int normalMethod(DemoClass this,int i) { System.out.println("實例方法能夠訪問this:" + this); return i * 2; }
第1個證據,看反編譯裏面的本地變量表。
靜態方法:
而實例方法
第2個證據,下面這樣的代碼能正確執行。
class DemoCl2{ /** * 這裏是一個實例方法, 代碼上2個參數 * 而咱們調用的時候只有一個參數 */ public int normalMethod(DemoClass2 this,int i) { return i * 2; } } public class MethodRefrenceDemo { public static void main(String[] args) { DemoClass2 demo2 = new DemoClass2(); // 代碼定義上有2個參數, 第一個參數爲this // 但實際上調用的時候只須要一個參數 demo2.normalMethod(1); } }
因此,個人理解,java裏面的全部方法都是靜態方法,只是有些方法有this變量,有些沒有。
因此,成員方法咱們也能夠寫成靜態方法的方法引用。以下:
public class MethodRefrenceDemo { public static void main(String[] args) { // 靜態方法的方法引用 IntUnaryOperator methodRefrence1 = DemoClass::staticMethod; System.out.println(methodRefrence1.applyAsInt(111)); DemoClass demo = new DemoClass(); // 實例方法normalMethod的方法引用 IntUnaryOperator methodRefrence2 = demo::normalMethod; System.out.println(methodRefrence2.applyAsInt(111)); // 對同一個實例方法normalMethod也可使用靜態引用 // 代碼上normalMethod雖然只有一個參數,但實際上有一個隱含的this函數 // 因此使用的是2個參數bifunction函數接口 BiFunction<DemoClass, Integer, Integer> methodRefrence3 = DemoClass::normalMethod; System.out.println(methodRefrence3.apply(demo, 111)); } }
上面代碼裏面。對同一個實例方法normalMethod,咱們既可使用實例方法引用(實例::方法名),也可使用靜態方法引用(類名::方法名)。
惰性求值在lambda裏面很是重要,也很是有用。
舉例,編程規範裏面有一條規範,是打印日誌前須要判斷日誌級別(性能要求高的時候)。以下
// 打印日誌前須要先判斷日誌級別 if (logger.isLoggable(Level.FINE)) { logger.fine("打印一些日誌:" + this); }
爲何要加判斷呢?不加判斷會有問題呢? 看以下代碼:
package jdk8.lambda; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; /** * lambda的惰性求值 * * @author 曉風輕 */ public class LogDemo { private static final Logger logger = Logger .getLogger(LogDemo.class.getName()); @Override public String toString() { System.out.println("這個方法執行了, 耗時1秒鐘"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } return "LogDemo"; } public void test() { // 若是不加判斷直接打印, 會有額外多餘的開銷, 就算最終日誌並無打印 logger.fine("打印一些日誌:" + this); } public static void main(String[] args) { LogDemo demo = new LogDemo(); demo.test(); } }
執行代碼,發現雖然日誌沒有打印,但toString方法仍是執行了,屬於多餘浪費的開銷。
每個日誌打印都加判斷,看着很彆扭,如今有了lambda表達式以後,可使用lambda的惰性求值,就能夠去掉if判斷,以下
// 使用lambda表達式的惰性求值,不須要判斷日誌級別 logger.fine(() -> "打印一些日誌:" + this);
這個現象很好理解,簡單講解一下。就是沒有使用表達式的時候,至關於
String msg = "打印一些日誌:" + this logger.fine(msg);
雖然最後沒有打印,但字符串拼接的工做仍是執行了。而使用了lambda表達式以後,字符串的拼接放到一個函數裏面,fine日誌須要打印的時候纔去調用這個方法才真正執行!從而實現了惰性求值。
後面咱們學習的jdk8的stream流編程裏面,沒有調用最終操做的時候,中間操做的方法都不會執行,這也是惰性求值。
stream編程主要是學習API的使用,但前提是學好lambda,基礎好了,看這些方法定義很是簡單,要是沒有打好基礎,你會有不少東西須要記憶。
通常來講,咱們以前的編碼方法,叫外部迭代,stream的寫法叫內部迭代。內部迭代代碼更加可讀更加優雅,關注點是作什麼(外部迭代關注是怎麼樣作),也很容易讓咱們養成編程小函數的好習慣!這點在編程習慣裏面很是重要!看例子:
import java.util.stream.IntStream; public class StreamDemo1 { public static void main(String[] args) { int[] nums = { 1, 2, 3 }; // 外部迭代 int sum = 0; for (int i : nums) { sum += i; } System.out.println("結果爲:" + sum); // 使用stream的內部迭代 // map就是中間操做(返回stream的操做) // sum就是終止操做 int sum2 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum(); System.out.println("結果爲:" + sum2); System.out.println("惰性求值就是終止沒有調用的狀況下,中間操做不會執行"); IntStream.of(nums).map(StreamDemo1::doubleNum); } public static int doubleNum(int i) { System.out.println("執行了乘以2"); return i * 2; } }
操做類型概念要理清楚。有幾個維度。
首先分爲 中間操做 和 最終操做,在最終操做沒有調用的狀況下,全部的中級操做都不會執行。那麼那些是中間操做那些是最終操做呢? 簡單來講,返回stream流的就是中間操做,能夠繼續鏈式調用下去,不是返回stream的就是最終操做。這點很好理解。
最終操做裏面分爲短路操做和非短路操做,短路操做就是limit/findxxx/xxxMatch這種,就是找了符合條件的就終止,其餘的就是非短路操做。在無限流裏面須要調用短路操做,不然像炫邁口香糖同樣根本停不下來!
中間操做又分爲 有狀態操做 和 無狀態操做,怎麼樣區分呢? 一開始不少同窗須要死記硬背,其實你主要掌握了狀態這個關鍵字就不須要死記硬背。狀態就是和其餘數據有關係。咱們能夠看方法的參數,若是是一個參數的,就是無狀態操做,由於只和本身有關,其餘的就是有狀態操做。如map/filter方法,只有一個參數就是本身,就是無狀態操做;而distinct/sorted就是有狀態操做,由於去重和排序都須要和其餘數據比較,理解了這點,就不須要死記硬背了!
爲何要知道有狀態和無狀態操做呢?在多個操做的時候,咱們須要把無狀態操做寫在一塊兒,有狀態操做放到最後,這樣效率會更加高。
咱們能夠經過下面的代碼來理解stream的運行機制
package stream; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; /** * 驗證stream運行機制 * * 1. 全部操做是鏈式調用, 一個元素只迭代一次 * 2. 每個中間操做返回一個新的流. 流裏面有一個屬性sourceStage * 指向同一個 地方,就是Head * 3. Head->nextStage->nextStage->... -> null * 4. 有狀態操做會把無狀態操做階段,單獨處理 * 5. 並行環境下, 有狀態的中間操做不必定能並行操做. * * 6. parallel/ sequetial 這2個操做也是中間操做(也是返回stream) * 可是他們不建立流, 他們只修改 Head的並行標誌 * * @author 曉風輕 * */ public class RunStream { public static void main(String[] args) { Random random = new Random(); // 隨機產生數據 Stream<Integer> stream = Stream.generate(() -> random.nextInt()) // 產生500個 ( 無限流須要短路操做. ) .limit(500) // 第1個無狀態操做 .peek(s -> print("peek: " + s)) // 第2個無狀態操做 .filter(s -> { print("filter: " + s); return s > 1000000; }) // 有狀態操做 .sorted((i1, i2) -> { print("排序: " + i1 + ", " + i2); return i1.compareTo(i2); }) // 又一個無狀態操做 .peek(s -> { print("peek2: " + s); }).parallel(); // 終止操做 stream.count(); } /** * 打印日誌並sleep 5 毫秒 * * @param s */ public static void print(String s) { // System.out.println(s); // 帶線程名(測試並行狀況) System.out.println(Thread.currentThread().getName() + " > " + s); try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { } } }
你們本身測試一下代碼,能發現stream的調用方法,就像現實中的流水線同樣,一個元素只會迭代一次,但若是中間有無狀態操做,先後的操做會單獨處理(元素就會被屢次迭代)。
就是reactive stream,也就是flow。其實和jdk8的stream沒有一點關係。說白了就一個發佈-訂閱模式,一共只有4個接口,3個對象,很是簡單清晰。寫一個入門例子就能夠掌握。
package jdk9; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; /** * 帶 process 的 flow demo */ /** * Processor, 須要繼承SubmissionPublisher並實現Processor接口 * * 輸入源數據 integer, 過濾掉小於0的, 而後轉換成字符串發佈出去 */ class MyProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關係, 須要用它來給發佈者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("處理器接受到數據: " + item); // 過濾掉小於0的, 而後發佈出去 if (item > 0) { this.submit("轉換後的數據:" + item); } // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 咱們能夠告訴發佈者, 後面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 所有數據處理完了(發佈者關閉了) System.out.println("處理器處理完了!"); // 關閉發佈者 this.close(); } } public class FlowDemo2 { public static void main(String[] args) throws Exception { // 1. 定義發佈者, 發佈的數據類型是 Integer // 直接使用jdk自帶的SubmissionPublisher SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>(); // 2. 定義處理器, 對數據進行過濾, 並轉換爲String類型 MyProcessor processor = new MyProcessor(); // 3. 發佈者 和 處理器 創建訂閱關係 publiser.subscribe(processor); // 4. 定義最終訂閱者, 消費 String 類型數據 Subscriber<String> subscriber = new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關係, 須要用它來給發佈者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(String item) { // 接受到一個數據, 處理 System.out.println("接受到數據: " + item); // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 咱們能夠告訴發佈者, 後面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 所有數據處理完了(發佈者關閉了) System.out.println("處理完了!"); } }; // 5. 處理器 和 最終訂閱者 創建訂閱關係 processor.subscribe(subscriber); // 6. 生產數據, 併發布 // 這裏忽略數據生產過程 publiser.submit(-111); publiser.submit(111); // 7. 結束後 關閉發佈者 // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉 publiser.close(); // 主線程延遲中止, 不然數據沒有消費就退出 Thread.currentThread().join(1000); } }
背壓依個人理解來講,是指訂閱者能和發佈者交互(經過代碼裏面的調用request和cancel方法交互),能夠調節發佈者發佈數據的速率,解決把訂閱者壓垮的問題。關鍵在於上面例子裏面的訂閱關係Subscription這個接口,他有request和cancel 2個方法,用於通知發佈者須要數據和通知發佈者再也不接受數據。
咱們重點理解背壓在jdk9裏面是如何實現的。關鍵在於發佈者Publisher的實現類SubmissionPublisher的submit方法是block方法。訂閱者會有一個緩衝池,默認爲Flow.defaultBufferSize() = 256。當訂閱者的緩衝池滿了以後,發佈者調用submit方法發佈數據就會被阻塞,發佈者就會停(慢)下來;訂閱者消費了數據以後(調用Subscription.request方法),緩衝池有位置了,submit方法就會繼續執行下去,就是經過這樣的機制,實現了調節發佈者發佈數據的速率,消費得快,生成就快,消費得慢,發佈者就會被阻塞,固然就會慢下來了。
怎麼樣實現發佈者和多個訂閱者之間的阻塞和同步呢?使用的jdk7的Fork/Join的ManagedBlocker,有興趣的請本身查找相關資料。
spring webflux是基於reactor來實現響應式的。那麼reactor是什麼呢?我是這樣理解的
reactor = jdk8的stream + jdk9的flow響應式流。理解了這句話,reactor就很容易掌握。
reactor裏面Flux和Mono就是stream,他的最終操做就是 subscribe/block 2種。reactor裏面說的不訂閱將什麼也不會方法就是咱們最開始學習的惰性求值。
咱們來看一段代碼,理解一下:
package com.imooc; import java.util.concurrent.TimeUnit; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; public class ReactorDemo { public static void main(String[] args) { // reactor = jdk8 stream + jdk9 reactive stream // Mono 0-1個元素 // Flux 0-N個元素 String[] strs = { "1", "2", "3" }; // 2. 定義訂閱者 Subscriber<Integer> subscriber = new Subscriber<Integer>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關係, 須要用它來給發佈者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("接受到數據: " + item); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發佈者再也不接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 咱們能夠告訴發佈者, 後面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 所有數據處理完了(發佈者關閉了) System.out.println("處理完了!"); } }; // 這裏就是jdk8的stream Flux.fromArray(strs).map(s -> Integer.parseInt(s)) // 最終操做 // 這裏就是jdk9的reactive stream .subscribe(subscriber); } }
上面的例子裏面,咱們能夠把jdk9裏面flowdemo的訂閱者代碼原封不動的copy過來,直接就能夠用在reactor的subscribe方法上。訂閱就是至關於調用了stream的最終操做。有了 reactor = jdk8 stream + jdk9 reactive stream 概念後,在掌握了jdk8的stream和jkd9的flow以後,reactor也不難掌握。
上面的基礎和原理掌握以後,學習webflux就水到渠成了!webflux的關鍵是本身編寫的代碼裏面返回流(Flux/Mono),spring框架來負責處理訂閱。 spring框架提供2種開發模式來編寫響應式代碼,使用mvc以前的註解模式和使用router function模式,都須要咱們的代碼返回流,spring的響應式數據庫spring data jpa,如使用mongodb,也是返回流,訂閱都須要交給框架,本身不能訂閱。而編寫響應式代碼以前,咱們還須要瞭解2個重要的概念,就是異步servlet和SSE。
學習異步servlet咱們最重要的瞭解同步servlet阻塞了什麼?爲何須要異步servlet?異步servlet能支持高吞吐量的原理是什麼?
servlet容器(如tomcat)裏面,每處理一個請求會佔用一個線程,同步servlet裏面,業務代碼處理多久,servlet容器的線程就會等(阻塞)多久,而servlet容器的線程是由上限的,當請求多了的時候servlet容器線程就會所有用完,就沒法再處理請求(這個時候請求可能排隊也可能丟棄,得看如何配置),就會限制了應用的吞吐量!
而異步serlvet裏面,servlet容器的線程不會傻等業務代碼處理完畢,而是直接返回(繼續處理其餘請求),給業務代碼一個回調函數(asyncContext.complete()),業務代碼處理完了再通知我!這樣就可使用少許的線程處理更加高的請求,從而實現高吞吐量!
咱們看示例代碼:
import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class AsyncServlet */ @WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" }) public class AsyncServlet extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public AsyncServlet() { super(); } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { long t1 = System.currentTimeMillis(); // 開啓異步 AsyncContext asyncContext = request.startAsync(); // 執行業務代碼 CompletableFuture.runAsync(() -> doSomeThing(asyncContext, asyncContext.getRequest(), asyncContext.getResponse())); System.out.println("async use:" + (System.currentTimeMillis() - t1)); } private void doSomeThing(AsyncContext asyncContext, ServletRequest servletRequest, ServletResponse servletResponse) { // 模擬耗時操做 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { } // try { servletResponse.getWriter().append("done"); } catch (IOException e) { e.printStackTrace(); } // 業務代碼處理完畢, 通知結束 asyncContext.complete(); } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { doGet(request, response); } }
你們能夠運行上面代碼,業務代碼花了5秒,但servlet容器的線程幾乎沒有任何耗時。而若是是同步servlet的,線程就會傻等5秒,這5秒內這個線程只處理了這一個請求。
響應式流裏面,能夠屢次返回數據(其實和響應式沒有關係),使用的技術就是H5的SSE。咱們學習技術,API的使用只是最初級也是最簡單的,更加劇要的是須要知其然並知其因此然,不然你只能死記硬背不用就忘!咱們不知足在spring裏面能實現sse效果,更加須要知道spring是如何作到的。其實SSE很簡單,咱們花一點點時間就能夠掌握,咱們在純servlet環境裏面實現。咱們看代碼,這裏一個最簡單的示例。
import java.io.IOException; import java.util.concurrent.TimeUnit; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * Servlet implementation class SSE */ @WebServlet("/SSE") public class SSE extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public SSE() { super(); } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); for (int i = 0; i < 5; i++) { // 指定事件標識 response.getWriter().write("event:me\n"); // 格式: data: + 數據 + 2個回車 response.getWriter().write("data:" + i + "\n\n"); response.getWriter().flush(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } } } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { doGet(request, response); } }
關鍵是ContentType 是 「text/event-stream」,而後返回的數據有固定的要求格式便可。
通過上面的一步一個腳印的學習,咱們的基礎已經打牢,障礙已經掃清,如今能夠進入輕鬆愉快的spring flux學習之旅了!Enjoy!
我的認爲,spring的weblfux響應式編程的高吞吐量特性,將會逐步會成爲技術趨勢,成爲咱們對系統進行垂直擴展的首選。那麼應該如何進行spring的weblfux響應式編程呢?請參考我這篇文章:springboot2 webflux 響應式編程學習路徑,我的建議把基礎夯實了在學習,直接學習步子扯的太大會有太多疑惑,這些疑惑早晚你要退回來補。固然更加高效的觀看個人實戰課程 SpringBoot2.0不容錯過的新特性 WebFlux響應式編程,海量的知識點,從簡到難,一個一個知識點底層原理運行機制的講解,最後還直播講解使用IoC/AOP編寫相似feign的聲明式的全響應式框架,相信你必定有能有所獲!聽完課程後,你也能夠和麪試官扯扯相關知識點的底層實現了!7小時只須要128元,良心課程,絕對物超所值!!:) 去看看吧