Reactor深度探索

技術點html

  • 反應器模式(Reactor) 同步非阻塞,每一個事情能夠分爲幾個步驟,每一個步驟能夠相應去作,每一個步驟是不會相互影響的,可是作起來是串行的。有關Netty的具體實現,能夠參考Netty整理
  • Proactor模式 異步非阻塞,每一個事情同時作,或者說是異步的去作,
  • 觀察者模式(Observer) JDK的實現能夠參考使用JDK的觀察者接口進行消息推送  觀察者模式是一個推的模式
  • 迭代器模式(Iterator) 是一種拉的模式,數據準備好後,進行一個循環拉取。
  • Java併發模型

Reactivereact

Reactive是一種編程方式,由不一樣的方式來實現git

  • RxJava : Reactive Extensions
  • Reactor : Spring WebFlux Reactive類庫
  • Flow API : Java 9 Flow API實現

阻塞的弊端和並行的複雜github

在Reactor官方的網站上,指出了現有編程的一些不足https://projectreactor.io/docs/core/release/reference/index.html#_blocking_can_be_wasteful編程

Reactor認爲阻塞多是浪費的瀏覽器

概括安全

  • 阻塞致使性能瓶頸和浪費資源
  • 增長線程可能會引發資源競爭和併發問題(可見性問題,原子性問題)
  • 並行的方式不是銀彈(不能解決全部問題)

阻塞的弊端併發

由如下場景來講明app

public class DataLoader {
    public final void load() {
        long startTime = System.currentTimeMillis();
        doLoad();
        long costTime = System.currentTimeMillis() - startTime;
        System.out.println("load()總耗時:" + costTime + "毫秒");
    }

    protected void doLoad() {
        loadConfigurations();
        loadUsers();
        loadOrders();
    }

    protected final void loadConfigurations() {
        loadMock("loadConfigurations()",1);
    }

    protected final void loadUsers() {
        loadMock("loadUsers",2);
    }

    protected final void loadOrders() {
        loadMock("loadOrders()",3);
    }

    private void loadMock(String source,int seconds) {
        try {
            long startTime = System.currentTimeMillis();
            long milliseconds = TimeUnit.SECONDS.toMillis(seconds);
            Thread.sleep(milliseconds);
            long costTime = System.currentTimeMillis() - startTime;
            System.out.printf("[線程: %s] %s 耗時: %d 毫秒\n",
                    Thread.currentThread().getName(),source,costTime );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new DataLoader().load();
    }
}

運行結果框架

[線程: main] loadConfigurations() 耗時: 1001 毫秒
[線程: main] loadUsers 耗時: 2001 毫秒
[線程: main] loadOrders() 耗時: 3003 毫秒
load()總耗時:6025毫秒

由結果可知,咱們在依次執行loadConfigurations()、loadUsers()、loadOrders()中,loadUsers()被loadConfigurations()阻塞了,loadOrders() 被loadUsers()阻塞了,它們都是main的主線程中的執行。因爲加載過程串行執行的關係,致使消耗實現線性累加。串行執行即Blocking模式。

並行的複雜

由如下場景來講明

public class ParalleDataLoader extends DataLoader {
    protected void doLoad() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        //CompletionService是一個接口,ExecutorCompletionService爲其實現類
        //ExecutorCompletionService在構造函數中會建立一個BlockingQueue
        // (使用的基於鏈表的無界隊列LinkedBlockingQueue),
        // 該BlockingQueue的做用是保存Executor執行的結果。
        // 當計算完成時,調用FutureTask的done方法。
        // 當提交一個任務到ExecutorCompletionService時,
        // 首先將任務包裝成QueueingFuture,它是FutureTask的一個子類,
        // 而後改寫FutureTask的done方法,以後把Executor執行的計算結果放入BlockingQueue中。
        CompletionService completionService = new ExecutorCompletionService(executorService);
        completionService.submit(super::loadConfigurations,null);
        completionService.submit(super::loadUsers,null);
        completionService.submit(super::loadOrders,null);

        int count = 0;
        while (count < 3) {
            if (completionService.poll() != null) {
                count++;
            }
        }
        executorService.shutdown();
    }

    public static void main(String[] args) {
        new ParalleDataLoader().load();
    }
}

這裏大概解釋一下ExecutorCompletionService,它的構造器會初始化一個線程池以及一個BlockingQueue

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

提交線程的時候,會初始化一個FutureTask,並放入QueueingFuture中,交給線程池去執行。

public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture(f));
    return f;
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
    if (aes == null)
        return new FutureTask<V>(task, result);
    else
        return aes.newTaskFor(task, result);
}

咱們看一下QueueingFuture的繼承圖

由圖可知,不管QueueingFuture,FutureTask,RunnableFuture其實都是一個Runnable。而在線程執行完畢後會執行一個done()方法,將結果放入BlockingQueue中。

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}
private final BlockingQueue<Future<V>> completionQueue;

BlockingQueue是在ExecutorCompletionService被初始化了的,有關BlockingQueue的介紹能夠參考從BlockingQueue到無鎖Disruptor的性能提高

最後咱們用到了completionService.poll()

public Future<V> poll() {
    return completionQueue.poll();
}

將Future結果從BlockingQueue隊列中彈出。固然咱們示例中並無什麼結果須要彈出。

如今咱們回到示例代碼,運行結果

[線程: pool-1-thread-1] loadConfigurations() 耗時: 1002 毫秒
[線程: pool-1-thread-2] loadUsers 耗時: 2002 毫秒
[線程: pool-1-thread-3] loadOrders() 耗時: 3003 毫秒
load()總耗時:3059毫秒

由結果可知,程序改造爲並行加載後,性能和資源利用率獲得提高,消耗時間取最大者。但因爲以上三個方法之間沒有數據依賴關係,因此執行方式由串行調整爲並行後,可以達到性能提高的效果。若是方法之間存在依賴關係時,那麼提高效果是否還會如此明顯,而且若是確保它們的執行循序。問題如(線程安全性,原子性,可見性),由此問題能夠參考Fork/Join框架原理和使用探祕 ,在這篇博客中就能夠看到爲了保證線程安全性,性能已經不如單線程。

Reactor認爲異步不必定可以救贖

概括

  • Callbacks是解決非阻塞的方案,而後它們之間很難組合,而且快速地將代碼引導至"Callback Hell"的不歸路
  • Futures相對於Callbacks好一點,不過仍是沒法組合,不過ComletableFuture可以提高這方面的不足。好比在上面的示例中,若是loadUsers要傳遞數據到loadOrders中也是極其困難的。

Callback Hell

咱們來看這樣一段代碼

public class JavaGUI {
    public static void main(String[] args) {
        final JFrame jFrame = new JFrame("GUI 示例");
        jFrame.setBounds(500,300,400,300);
        LayoutManager layoutManager = new BorderLayout(400,300);
        jFrame.setLayout(layoutManager);
        jFrame.addMouseListener(new MouseAdapter() {
            @Override
            public void mouseClicked(MouseEvent e) {
                System.out.printf("[線程 : %s] 鼠標點擊,座標(X : %d,Y : %d)\n",
                        currentThreadName(),e.getX(),e.getY());
            }
        });
        jFrame.addWindowListener(new WindowAdapter() {
            @Override
            public void windowClosing(WindowEvent e) {
                System.out.printf("[線程 : %s] 清除 jFrame...\n",currentThreadName());
                jFrame.dispose();
            }

            @Override
            public void windowClosed(WindowEvent e) {
                System.out.printf("[線程 : %s] 退出程序... \n",currentThreadName());
                System.exit(0);
            }
        });
        System.out.println("當前線程:" + currentThreadName());
        jFrame.setVisible(true);
    }

    private static String currentThreadName() {
        return Thread.currentThread().getName();
    }
}

當咱們執行了main方法之後,會打印當前線程,而且顯示window窗體。

咱們能夠看到打印了當前線程爲main的主線程。當咱們在窗體內用鼠標點擊的時候會打印以下內容

[線程 : AWT-EventQueue-0] 鼠標點擊,座標(X : 218,Y : 167)
[線程 : AWT-EventQueue-0] 鼠標點擊,座標(X : 130,Y : 120)


由打印的內容可知,咱們鼠標點擊並非main的主線程來執行的,說明它是一個異步的Callback,並且是非阻塞的,當咱們點擊鼠標產生鼠標事件時,沒有任何線程會阻塞該線程的執行。當咱們關閉窗口的時候,會打印以下內容

[線程 : AWT-EventQueue-0] 清除 jFrame...
[線程 : AWT-EventQueue-0] 退出程序...

說明關閉也是由同一個異步線程來執行的。由此能夠看出Java GUI以及事件/監聽模式基本採用匿名內置類,即回調實現。當監聽的維度增多,Callback實現也隨之增多。同時,事件/監聽者模式(觀察者模式)的併發模型可爲同步或異步。這裏說的同步、異步是線程模型;阻塞、非阻塞是編程模型。在Spring中,於這種GUI回調相似的有Spring Boot的消息事件機制 ,這裏面也有同步,異步,阻塞,非阻塞的說明。

Future阻塞問題

咱們來修改一下ParalleDataLoader的代碼造成一個Future的阻塞

public class FutureBlockingDataLoader extends DataLoader {
    protected void doLoad() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletionService completionService = new ExecutorCompletionService(executorService);
        runComletely(completionService.submit(super::loadConfigurations,null));
        runComletely(completionService.submit(super::loadUsers,null));
        runComletely(completionService.submit(super::loadOrders,null));
        executorService.shutdown();
    }

    private void runComletely(Future<?> future) {
        try {
            future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new FutureBlockingDataLoader().load();
    }
}

運行結果

[線程: pool-1-thread-1] loadConfigurations() 耗時: 1000 毫秒
[線程: pool-1-thread-2] loadUsers 耗時: 2002 毫秒
[線程: pool-1-thread-3] loadOrders() 耗時: 3001 毫秒
load()總耗時:6073毫秒

由結果可知,future.get()成爲future阻塞的源泉。該方法不得不等待任務執行完成,換言之,若是多個任務提交後,返回多個Future逐一調用get()方法時,將會依次blocking,任務的執行從並行變成串行。

Future鏈式問題

因爲Future沒法異步執行結果鏈式處理,儘管FutureBlockingDataLoader可以解決方法數據依賴以及順序執行的問題,不過它將並行執行帶回了阻塞(串行)執行。因此,它不是一個理想實現。不過CompletableFuture能夠幫助提高Future限制。

public class ChainDataLoader extends DataLoader {
    protected void doLoad() {
        CompletableFuture
                .runAsync(super::loadConfigurations)
                .thenRun(super::loadUsers)
                .thenRun(super::loadOrders)
                .whenComplete((result,throwable) ->
                    System.out.println("[線程 :" + Thread.currentThread().getName() + "] 加載完成")
                ).join();
    }

    public static void main(String[] args) {
        new ChainDataLoader().load();
    }
}

運行結果

[線程: ForkJoinPool.commonPool-worker-9] loadConfigurations() 耗時: 1004 毫秒
[線程: ForkJoinPool.commonPool-worker-9] loadUsers 耗時: 2004 毫秒
[線程: ForkJoinPool.commonPool-worker-9] loadOrders() 耗時: 3001 毫秒
[線程 :ForkJoinPool.commonPool-worker-9] 加載完成
load()總耗時:6079毫秒

由結果可知,當異步執行時,它並非由3個線程去執行,而是由同一個線程進行鏈式執行的,之因此加入join,是爲了讓主線程等待返回。它跟第一個DataLoader的不一樣在於,DataLoader是所有由主線程去阻塞執行的,而這裏若是不使用join()則確定爲非阻塞的,只不過join()會阻塞,這個是線程相關的常識,具體能夠參考線程,JVM鎖整理 。也就是說,若是去掉join(),因爲CompletableFuture都是守護線程,主線程執行完,它是不會執行的,如今咱們把代碼稍做修改以下。

public class ChainDataLoader extends DataLoader {
    protected void doLoad() {
        CompletableFuture
                .runAsync(super::loadConfigurations)
                .thenRun(super::loadUsers)
                .thenRun(super::loadOrders)
                .whenComplete((result,throwable) ->
                    System.out.println("[線程 :" + Thread.currentThread().getName() + "] 加載完成")
                );
        System.out.println("[線程 :" + Thread.currentThread().getName() +"】後續執行");
    }

    public static void main(String[] args) {
        new ChainDataLoader().load();
    }
}

運行結果

[線程 :main】後續執行
load()總耗時:60毫秒

證實CompletableFuture還未啓動,並未執行。但若是咱們把new ChainDataLoader().load();這段代碼放入Controller中

@RestController
public class TestController {
    @GetMapping("/future")
    public void findfuture() {
        new ChainDataLoader().load();
    }
}

經過瀏覽器訪問

能夠看到後臺打印

[線程 :reactor-http-nio-2】後續執行
load()總耗時:3毫秒
[線程: ForkJoinPool.commonPool-worker-9] loadConfigurations() 耗時: 1003 毫秒
[線程: ForkJoinPool.commonPool-worker-9] loadUsers 耗時: 2004 毫秒
[線程: ForkJoinPool.commonPool-worker-9] loadOrders() 耗時: 3005 毫秒
[線程 :ForkJoinPool.commonPool-worker-9] 加載完成

證實異步線程是非阻塞而且執行了的。

這裏咱們能夠看到CompletableFuture屬於異步操做,若是強制等待結束的話,又回到了阻塞編程的方式,而且讓咱們明白到非阻塞不必定提高性能,由於即使是非阻塞,在異步線程中,它同樣要使用6秒才能完成,相比於ParalleDataLoader的並行執行,只須要3秒完成來講,非阻塞的好處是讓主方法線程及時完成,讓主方法線程池能夠及時釋放。不過同理,在ParalleDataLoader中若是不進行completionService.poll()的阻塞操做,主線程一樣會率先返回,因爲線程池中的線程並不是守護線程,它在主線程完成後會繼續執行。

public class ParalleDataLoader extends DataLoader {
    protected void doLoad() {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        CompletionService completionService = new ExecutorCompletionService(executorService);
        completionService.submit(super::loadConfigurations,null);
        completionService.submit(super::loadUsers,null);
        completionService.submit(super::loadOrders,null);

//        int count = 0;
//        while (count < 3) {
//            if (completionService.poll() != null) {
//                count++;
//            }
//        }
        executorService.shutdown();
    }

    public static void main(String[] args) {
        new ParalleDataLoader().load();
    }
}

運行結果

load()總耗時:59毫秒
[線程: pool-1-thread-1] loadConfigurations() 耗時: 1004 毫秒
[線程: pool-1-thread-2] loadUsers 耗時: 2004 毫秒
[線程: pool-1-thread-3] loadOrders() 耗時: 3002 毫秒

一樣咱們把new ParalleDataLoader().load()放入Controller中

@RestController
public class TestController {
    @GetMapping("/future")
    public void findfuture() {
        new ParalleDataLoader().load();
    }
}

經過瀏覽器訪問,後臺打印

load()總耗時:1毫秒
[線程: pool-1-thread-1] loadConfigurations() 耗時: 1000 毫秒
[線程: pool-1-thread-2] loadUsers 耗時: 2004 毫秒
[線程: pool-1-thread-3] loadOrders() 耗時: 3005 毫秒

這裏一樣爲異步非阻塞,而且併發了3個線程,異步線程總耗時是3秒。可是這樣會形成異步線程池的線程數併發量比較大。

Reactive Stream JVM認爲異步系統和資源消費須要特殊處理,在Reactor的github的官網上,有這樣一段描述https://github.com/reactive-streams/reactive-streams-jvm

  • 流式數據容量難以預判
  • 異步編程複雜
  • 數據源和消費端之間資源消費難以平衡

Reactive的理解,須要從不少方便

  • 維基百科
  • The Reactive Mainifesto : Resactive組織
  • Spring Framework
  • ReactiveX : RxJava
  • Reactor : WebFlux底層
  • @andrestaltz :著名做者

Reactive Programming定義

The Reactive Mainifesto認爲:官網https://www.reactivemanifesto.org/

  • 響應性 (Responsive)
  • 適應性強的 (Resilient)
  • 彈性的 (Elastic)
  • 消息驅動的 (Message Driven)

側重點

  • 面向Reactive系統
  • Reactive系統原則

WebFlux的線程觀察

public class FluxTest {
    public static void main(String[] args) {
        Flux.just("a","b","c")
                .subscribe(FluxTest::println);
        System.out.println("你好");
    }

    private static void println(Object object) {
        String threadName = Thread.currentThread().getName();
        System.out.println("[線程: " + threadName + "] " + object);
    }
}

運行結果

[線程: main] a
[線程: main] b
[線程: main] c
你好

根據結果,咱們能夠看到這並非一個異步的,而是一個同步非阻塞的主線程執行。如今咱們來修改一下代碼

public class FluxTest {
    public static void main(String[] args) {
        Flux.just("a","b","c")
                .publishOn(Schedulers.elastic())
                .subscribe(FluxTest::println);
        System.out.println("你好");
    }

    private static void println(Object object) {
        String threadName = Thread.currentThread().getName();
        System.out.println("[線程: " + threadName + "] " + object);
    }
}

運行結果

你好 [線程: elastic-2] a [線程: elastic-2] b [線程: elastic-2] c

相關文章
相關標籤/搜索