技術點html
Reactivereact
Reactive是一種編程方式,由不一樣的方式來實現git
阻塞的弊端和並行的複雜github
在Reactor官方的網站上,指出了現有編程的一些不足https://projectreactor.io/docs/core/release/reference/index.html#_blocking_can_be_wasteful編程
Reactor認爲阻塞多是浪費的瀏覽器
概括安全
阻塞的弊端併發
由如下場景來講明app
public class DataLoader { public final void load() { long startTime = System.currentTimeMillis(); doLoad(); long costTime = System.currentTimeMillis() - startTime; System.out.println("load()總耗時:" + costTime + "毫秒"); } protected void doLoad() { loadConfigurations(); loadUsers(); loadOrders(); } protected final void loadConfigurations() { loadMock("loadConfigurations()",1); } protected final void loadUsers() { loadMock("loadUsers",2); } protected final void loadOrders() { loadMock("loadOrders()",3); } private void loadMock(String source,int seconds) { try { long startTime = System.currentTimeMillis(); long milliseconds = TimeUnit.SECONDS.toMillis(seconds); Thread.sleep(milliseconds); long costTime = System.currentTimeMillis() - startTime; System.out.printf("[線程: %s] %s 耗時: %d 毫秒\n", Thread.currentThread().getName(),source,costTime ); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new DataLoader().load(); } }
運行結果框架
[線程: main] loadConfigurations() 耗時: 1001 毫秒
[線程: main] loadUsers 耗時: 2001 毫秒
[線程: main] loadOrders() 耗時: 3003 毫秒
load()總耗時:6025毫秒
由結果可知,咱們在依次執行loadConfigurations()、loadUsers()、loadOrders()中,loadUsers()被loadConfigurations()阻塞了,loadOrders() 被loadUsers()阻塞了,它們都是main的主線程中的執行。因爲加載過程串行執行的關係,致使消耗實現線性累加。串行執行即Blocking模式。
並行的複雜
由如下場景來講明
public class ParalleDataLoader extends DataLoader { protected void doLoad() { ExecutorService executorService = Executors.newFixedThreadPool(3); //CompletionService是一個接口,ExecutorCompletionService爲其實現類 //ExecutorCompletionService在構造函數中會建立一個BlockingQueue // (使用的基於鏈表的無界隊列LinkedBlockingQueue), // 該BlockingQueue的做用是保存Executor執行的結果。 // 當計算完成時,調用FutureTask的done方法。 // 當提交一個任務到ExecutorCompletionService時, // 首先將任務包裝成QueueingFuture,它是FutureTask的一個子類, // 而後改寫FutureTask的done方法,以後把Executor執行的計算結果放入BlockingQueue中。 CompletionService completionService = new ExecutorCompletionService(executorService); completionService.submit(super::loadConfigurations,null); completionService.submit(super::loadUsers,null); completionService.submit(super::loadOrders,null); int count = 0; while (count < 3) { if (completionService.poll() != null) { count++; } } executorService.shutdown(); } public static void main(String[] args) { new ParalleDataLoader().load(); } }
這裏大概解釋一下ExecutorCompletionService,它的構造器會初始化一個線程池以及一個BlockingQueue
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }
提交線程的時候,會初始化一個FutureTask,並放入QueueingFuture中,交給線程池去執行。
public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }
private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); }
咱們看一下QueueingFuture的繼承圖
由圖可知,不管QueueingFuture,FutureTask,RunnableFuture其實都是一個Runnable。而在線程執行完畢後會執行一個done()方法,將結果放入BlockingQueue中。
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
private final BlockingQueue<Future<V>> completionQueue;
BlockingQueue是在ExecutorCompletionService被初始化了的,有關BlockingQueue的介紹能夠參考從BlockingQueue到無鎖Disruptor的性能提高
最後咱們用到了completionService.poll()
public Future<V> poll() { return completionQueue.poll(); }
將Future結果從BlockingQueue隊列中彈出。固然咱們示例中並無什麼結果須要彈出。
如今咱們回到示例代碼,運行結果
[線程: pool-1-thread-1] loadConfigurations() 耗時: 1002 毫秒
[線程: pool-1-thread-2] loadUsers 耗時: 2002 毫秒
[線程: pool-1-thread-3] loadOrders() 耗時: 3003 毫秒
load()總耗時:3059毫秒
由結果可知,程序改造爲並行加載後,性能和資源利用率獲得提高,消耗時間取最大者。但因爲以上三個方法之間沒有數據依賴關係,因此執行方式由串行調整爲並行後,可以達到性能提高的效果。若是方法之間存在依賴關係時,那麼提高效果是否還會如此明顯,而且若是確保它們的執行循序。問題如(線程安全性,原子性,可見性),由此問題能夠參考Fork/Join框架原理和使用探祕 ,在這篇博客中就能夠看到爲了保證線程安全性,性能已經不如單線程。
Reactor認爲異步不必定可以救贖
概括
Callback Hell
咱們來看這樣一段代碼
public class JavaGUI { public static void main(String[] args) { final JFrame jFrame = new JFrame("GUI 示例"); jFrame.setBounds(500,300,400,300); LayoutManager layoutManager = new BorderLayout(400,300); jFrame.setLayout(layoutManager); jFrame.addMouseListener(new MouseAdapter() { @Override public void mouseClicked(MouseEvent e) { System.out.printf("[線程 : %s] 鼠標點擊,座標(X : %d,Y : %d)\n", currentThreadName(),e.getX(),e.getY()); } }); jFrame.addWindowListener(new WindowAdapter() { @Override public void windowClosing(WindowEvent e) { System.out.printf("[線程 : %s] 清除 jFrame...\n",currentThreadName()); jFrame.dispose(); } @Override public void windowClosed(WindowEvent e) { System.out.printf("[線程 : %s] 退出程序... \n",currentThreadName()); System.exit(0); } }); System.out.println("當前線程:" + currentThreadName()); jFrame.setVisible(true); } private static String currentThreadName() { return Thread.currentThread().getName(); } }
當咱們執行了main方法之後,會打印當前線程,而且顯示window窗體。
咱們能夠看到打印了當前線程爲main的主線程。當咱們在窗體內用鼠標點擊的時候會打印以下內容
[線程 : AWT-EventQueue-0] 鼠標點擊,座標(X : 218,Y : 167)
[線程 : AWT-EventQueue-0] 鼠標點擊,座標(X : 130,Y : 120)
由打印的內容可知,咱們鼠標點擊並非main的主線程來執行的,說明它是一個異步的Callback,並且是非阻塞的,當咱們點擊鼠標產生鼠標事件時,沒有任何線程會阻塞該線程的執行。當咱們關閉窗口的時候,會打印以下內容
[線程 : AWT-EventQueue-0] 清除 jFrame...
[線程 : AWT-EventQueue-0] 退出程序...
說明關閉也是由同一個異步線程來執行的。由此能夠看出Java GUI以及事件/監聽模式基本採用匿名內置類,即回調實現。當監聽的維度增多,Callback實現也隨之增多。同時,事件/監聽者模式(觀察者模式)的併發模型可爲同步或異步。這裏說的同步、異步是線程模型;阻塞、非阻塞是編程模型。在Spring中,於這種GUI回調相似的有Spring Boot的消息事件機制 ,這裏面也有同步,異步,阻塞,非阻塞的說明。
Future阻塞問題
咱們來修改一下ParalleDataLoader的代碼造成一個Future的阻塞
public class FutureBlockingDataLoader extends DataLoader { protected void doLoad() { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletionService completionService = new ExecutorCompletionService(executorService); runComletely(completionService.submit(super::loadConfigurations,null)); runComletely(completionService.submit(super::loadUsers,null)); runComletely(completionService.submit(super::loadOrders,null)); executorService.shutdown(); } private void runComletely(Future<?> future) { try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } public static void main(String[] args) { new FutureBlockingDataLoader().load(); } }
運行結果
[線程: pool-1-thread-1] loadConfigurations() 耗時: 1000 毫秒
[線程: pool-1-thread-2] loadUsers 耗時: 2002 毫秒
[線程: pool-1-thread-3] loadOrders() 耗時: 3001 毫秒
load()總耗時:6073毫秒
由結果可知,future.get()成爲future阻塞的源泉。該方法不得不等待任務執行完成,換言之,若是多個任務提交後,返回多個Future逐一調用get()方法時,將會依次blocking,任務的執行從並行變成串行。
Future鏈式問題
因爲Future沒法異步執行結果鏈式處理,儘管FutureBlockingDataLoader可以解決方法數據依賴以及順序執行的問題,不過它將並行執行帶回了阻塞(串行)執行。因此,它不是一個理想實現。不過CompletableFuture能夠幫助提高Future限制。
public class ChainDataLoader extends DataLoader { protected void doLoad() { CompletableFuture .runAsync(super::loadConfigurations) .thenRun(super::loadUsers) .thenRun(super::loadOrders) .whenComplete((result,throwable) -> System.out.println("[線程 :" + Thread.currentThread().getName() + "] 加載完成") ).join(); } public static void main(String[] args) { new ChainDataLoader().load(); } }
運行結果
[線程: ForkJoinPool.commonPool-worker-9] loadConfigurations() 耗時: 1004 毫秒
[線程: ForkJoinPool.commonPool-worker-9] loadUsers 耗時: 2004 毫秒
[線程: ForkJoinPool.commonPool-worker-9] loadOrders() 耗時: 3001 毫秒
[線程 :ForkJoinPool.commonPool-worker-9] 加載完成
load()總耗時:6079毫秒
由結果可知,當異步執行時,它並非由3個線程去執行,而是由同一個線程進行鏈式執行的,之因此加入join,是爲了讓主線程等待返回。它跟第一個DataLoader的不一樣在於,DataLoader是所有由主線程去阻塞執行的,而這裏若是不使用join()則確定爲非阻塞的,只不過join()會阻塞,這個是線程相關的常識,具體能夠參考線程,JVM鎖整理 。也就是說,若是去掉join(),因爲CompletableFuture都是守護線程,主線程執行完,它是不會執行的,如今咱們把代碼稍做修改以下。
public class ChainDataLoader extends DataLoader { protected void doLoad() { CompletableFuture .runAsync(super::loadConfigurations) .thenRun(super::loadUsers) .thenRun(super::loadOrders) .whenComplete((result,throwable) -> System.out.println("[線程 :" + Thread.currentThread().getName() + "] 加載完成") ); System.out.println("[線程 :" + Thread.currentThread().getName() +"】後續執行"); } public static void main(String[] args) { new ChainDataLoader().load(); } }
運行結果
[線程 :main】後續執行
load()總耗時:60毫秒
證實CompletableFuture還未啓動,並未執行。但若是咱們把new ChainDataLoader().load();這段代碼放入Controller中
@RestController public class TestController { @GetMapping("/future") public void findfuture() { new ChainDataLoader().load(); } }
經過瀏覽器訪問
能夠看到後臺打印
[線程 :reactor-http-nio-2】後續執行
load()總耗時:3毫秒
[線程: ForkJoinPool.commonPool-worker-9] loadConfigurations() 耗時: 1003 毫秒
[線程: ForkJoinPool.commonPool-worker-9] loadUsers 耗時: 2004 毫秒
[線程: ForkJoinPool.commonPool-worker-9] loadOrders() 耗時: 3005 毫秒
[線程 :ForkJoinPool.commonPool-worker-9] 加載完成
證實異步線程是非阻塞而且執行了的。
這裏咱們能夠看到CompletableFuture屬於異步操做,若是強制等待結束的話,又回到了阻塞編程的方式,而且讓咱們明白到非阻塞不必定提高性能,由於即使是非阻塞,在異步線程中,它同樣要使用6秒才能完成,相比於ParalleDataLoader的並行執行,只須要3秒完成來講,非阻塞的好處是讓主方法線程及時完成,讓主方法線程池能夠及時釋放。不過同理,在ParalleDataLoader中若是不進行completionService.poll()的阻塞操做,主線程一樣會率先返回,因爲線程池中的線程並不是守護線程,它在主線程完成後會繼續執行。
public class ParalleDataLoader extends DataLoader { protected void doLoad() { ExecutorService executorService = Executors.newFixedThreadPool(3); CompletionService completionService = new ExecutorCompletionService(executorService); completionService.submit(super::loadConfigurations,null); completionService.submit(super::loadUsers,null); completionService.submit(super::loadOrders,null); // int count = 0; // while (count < 3) { // if (completionService.poll() != null) { // count++; // } // } executorService.shutdown(); } public static void main(String[] args) { new ParalleDataLoader().load(); } }
運行結果
load()總耗時:59毫秒
[線程: pool-1-thread-1] loadConfigurations() 耗時: 1004 毫秒
[線程: pool-1-thread-2] loadUsers 耗時: 2004 毫秒
[線程: pool-1-thread-3] loadOrders() 耗時: 3002 毫秒
一樣咱們把new ParalleDataLoader().load()放入Controller中
@RestController public class TestController { @GetMapping("/future") public void findfuture() { new ParalleDataLoader().load(); } }
經過瀏覽器訪問,後臺打印
load()總耗時:1毫秒
[線程: pool-1-thread-1] loadConfigurations() 耗時: 1000 毫秒
[線程: pool-1-thread-2] loadUsers 耗時: 2004 毫秒
[線程: pool-1-thread-3] loadOrders() 耗時: 3005 毫秒
這裏一樣爲異步非阻塞,而且併發了3個線程,異步線程總耗時是3秒。可是這樣會形成異步線程池的線程數併發量比較大。
Reactive Stream JVM認爲異步系統和資源消費須要特殊處理,在Reactor的github的官網上,有這樣一段描述https://github.com/reactive-streams/reactive-streams-jvm
Reactive的理解,須要從不少方便
Reactive Programming定義
The Reactive Mainifesto認爲:官網https://www.reactivemanifesto.org/
側重點
WebFlux的線程觀察
public class FluxTest { public static void main(String[] args) { Flux.just("a","b","c") .subscribe(FluxTest::println); System.out.println("你好"); } private static void println(Object object) { String threadName = Thread.currentThread().getName(); System.out.println("[線程: " + threadName + "] " + object); } }
運行結果
[線程: main] a
[線程: main] b
[線程: main] c
你好
根據結果,咱們能夠看到這並非一個異步的,而是一個同步非阻塞的主線程執行。如今咱們來修改一下代碼
public class FluxTest { public static void main(String[] args) { Flux.just("a","b","c") .publishOn(Schedulers.elastic()) .subscribe(FluxTest::println); System.out.println("你好"); } private static void println(Object object) { String threadName = Thread.currentThread().getName(); System.out.println("[線程: " + threadName + "] " + object); } }
運行結果
你好 [線程: elastic-2] a [線程: elastic-2] b [線程: elastic-2] c