完全搞懂 netty 線程模型

編者注:Netty是Java領域有名的開源網絡庫,特色是高性能和高擴展性,所以不少流行的框架都是基於它來構建的,好比咱們熟知的Dubbo、Rocketmq、Hadoop等。本文就netty線程模型展開分析討論下 : )java

IO模型react

  • BIO:同步阻塞IO模型;
  • NIO:基於IO多路複用技術的「非阻塞同步」IO模型。簡單來講,內核將可讀可寫事件通知應用,由應用主動發起讀寫操做;
  • AIO:非阻塞異步IO模型。簡單來講,內核將讀完成事件通知應用,讀操做由內核完成,應用只需操做數據便可;應用作異步寫操做時當即返回,內核會進行寫操做排隊並執行寫操做。

NIO和AIO不一樣之處在於應用是否進行真正的讀寫操做。linux

reactor和proactor模型nginx

  • reactor:基於NIO技術,可讀可寫時通知應用;
  • proactor:基於AIO技術,讀完成時通知應用,寫操做應用通知內核。

netty線程模型

netty的線程模型是基於Reactor模型的。程序員

netty單線程模型

Reactor 單線程模型,是指全部的 I/O 操做都在同一個 NIO 線程上面完成的,此時NIO線程職責包括:接收新建鏈接請求、讀寫操做等。
面試

在一些小容量應用場景下,可使用單線程模型(注意,Redis的請求處理也是單線程模型,爲何Redis的性能會如此之高呢?由於Redis的讀寫操做基本都是內存操做,而且Redis協議比較簡潔,序列化/反序列化耗費性能更低)。可是對於高負載、大併發的應用場景卻不合適,主要緣由以下:數據庫

  • 一個NIO線程同時處理成百上千的鏈接,性能上沒法支撐,即使NIO線程的CPU負荷達到100%,也沒法知足海量消息的編碼、解碼、讀取和發送。
  • 當NIO線程負載太重以後,處理速度將變慢,這會致使大量客戶端鏈接超時,超時以後每每會進行重發,這更加劇了NIO線程的負載,最終會致使大量消息積壓和處理超時,成爲系統的性能瓶頸。
  • 可靠性問題:一旦NIO線程意外跑飛,或者進入死循環,會致使整個系統通訊模塊不可用,不能接收和處理外部消息,形成節點故障。

Reactor多線程模型

Rector 多線程模型與單線程模型最大的區別就是有一組 NIO 線程來處理鏈接讀寫操做,一個NIO線程處理Accept。一個NIO線程能夠處理多個鏈接事件,一個鏈接的事件只能屬於一個NIO線程。後端

在絕大多數場景下,Reactor 多線程模型能夠知足性能需求。可是,在個別特殊場景中,一個 NIO 線程負責監聽和處理全部的客戶端鏈接可能會存在性能問題。例如併發百萬客戶端鏈接,或者服務端須要對客戶端握手進行安全認證,可是認證自己很是損耗性能。在這類場景下,單獨一個 Acceptor 線程可能會存在性能不足的問題,爲了解決性能問題,產生了第三種 Reactor 線程模型——主從Reactor 多線程模型。安全

Reactor主從多線程模型

主從 Reactor 線程模型的特色是:服務端用於接收客戶端鏈接的再也不是一個單獨的 NIO 線程,而是一個獨立的 NIO 線程池。Acceptor 接收到客戶端 TCP鏈接請求並處理完成後(可能包含接入認證等),將新建立的 SocketChannel注 冊 到 I/O 線 程 池(sub reactor 線 程 池)的某個I/O線程上, 由它負責SocketChannel 的讀寫和編解碼工做。Acceptor 線程池僅僅用於客戶端的登陸、握手和安全認證,一旦鏈路創建成功,就將鏈路註冊到後端 subReactor 線程池的 I/O 線程上,由 I/O 線程負責後續的 I/O 操做。網絡

netty線程模型思考

netty 的線程模型並非一成不變的,它實際取決於用戶的啓動參數配置。經過設置不一樣的啓動參數,Netty 能夠同時支持 Reactor 單線程模型、多線程模型。

爲了儘量地提高性能,Netty 在不少地方進行了無鎖化的設計,例如在 I/O 線程內部進行串行操做,避免多線程競爭致使的性能降低問題。表面上看,串行化設計彷佛 CPU 利用率不高,併發程度不夠。可是,經過調整 NIO 線程池的線程參數,能夠同時啓動多個串行化的線程並行運行,這種局部無鎖化的串行線程設計相比一個隊列多個工做線程的模型性能更優。(小夥伴們後續多線程併發流程可參考該類實現方案

Netty 的 NioEventLoop 讀取到消息以後,直接調用 ChannelPipeline 的fireChannelRead (Object msg)。 只要用戶不主動切換線程, 一直都是由NioEventLoop 調用用戶的 ChannelHandler,期間不進行線程切換。這種串行化處理方式避免了多線程操做致使的鎖的競爭,從性能角度看是最優的。

Netty擁有兩個NIO線程池,分別是bossGroupworkerGroup,前者處理新建鏈接請求,而後將新創建的鏈接輪詢交給workerGroup中的其中一個NioEventLoop來處理,後續該鏈接上的讀寫操做都是由同一個NioEventLoop來處理。注意,雖然bossGroup也能指定多個NioEventLoop(一個NioEventLoop對應一個線程),可是默認狀況下只會有一個線程,由於通常狀況下應用程序只會使用一個對外監聽端口。

這裏試想一下,難道不能使用多線程來監聽同一個對外端口麼,即多線程epoll_wait到同一個epoll實例上?

epoll相關的主要兩個方法是epoll_wait和epoll_ctl,多線程同時操做同一個epoll實例,那麼首先須要確認epoll相關方法是否線程安全:簡單來講,epoll是經過鎖來保證線程安全的, epoll中粒度最小的自旋鎖ep->lock(spinlock)用來保護就緒的隊列, 互斥鎖ep->mtx用來保護epoll的重要數據結構紅黑樹

看到這裏,可能有的小夥伴想到了Nginx多進程針對監聽端口的處理策略,Nginx是經過accept_mutex機制來保證的。accept_mutex是nginx的(新建鏈接)負載均衡鎖,讓多個worker進程輪流處理與client的新鏈接。當某個worker進程的鏈接數達到worker_connections配置(單個worker進程的最大處理鏈接數)的最大鏈接數的7/8時,會大大減少獲取該worker獲取accept鎖的機率,以此實現各worker進程間的鏈接數的負載均衡。accept鎖默認打開,關閉它時nginx處理新建鏈接耗時會更短,可是worker進程之間可能鏈接不均衡,而且存在「驚羣」問題。只有在使能accept_mutex而且當前系統不支持原子鎖時,纔會用文件實現accept鎖。注意,accept_mutex加鎖失敗時不會阻塞當前線程,相似tryLock。

現代linux中,多個socker同時監聽同一個端口也是可行的,nginx 1.9.1也支持這一行爲。linux 3.9以上內核支持SO_REUSEPORT選項,容許多個socker bind/listen在同一端口上。這樣,多個進程能夠各自申請socker監聽同一端口,當鏈接事件來臨時,內核作負載均衡,喚醒監聽的其中一個進程來處理,reuseport機制有效的解決了epoll驚羣問題。

再回到剛纔提出的問題,java中多線程來監聽同一個對外端口,epoll方法是線程安全的,這樣就可使用使用多線程監聽epoll_wait了麼,固然是不建議這樣乾的,除了epoll的驚羣問題以外,還有一個就是,通常開發中咱們使用epoll設置的是LT模式(水平觸發方式,與之相對的是ET默認,前者只要鏈接事件未被處理就會在epoll_wait時始終觸發,後者只會在真正有事件來時在epoll_wait觸發一次),這樣的話,多線程epoll_wait時就會致使第一個線程epoll_wait以後還未處理完畢已發生的事件時,第二個線程也會epoll_wait返回,顯然這不是咱們想要的,關於java nio的測試demo以下:

public class NioDemo {
    private static AtomicBoolean flag = new AtomicBoolean(true);
    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(8080));
        // non-block io
        serverChannel.configureBlocking(false);
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 多線程執行
        Runnable task = () -> {
            try {
                while (true) {
                    if (selector.select(0) == 0) {
                        System.out.println("selector.select loop... " + Thread.currentThread().getName());
                        Thread.sleep(1);
                        continue;
                    }

                    if (flag.compareAndSet(true, false)) {
                        System.out.println(Thread.currentThread().getName() + " over");
                        return;
                    }

                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();

                        // accept event
                        if (key.isAcceptable()) {
                            handlerAccept(selector, key);
                        }

                        // socket event
                        if (key.isReadable()) {
                            handlerRead(key);
                        }

                        /**
                         * Selector不會本身從已選擇鍵集中移除SelectionKey實例,必須在處理完通道時手動移除。
                         * 下次該通道變成就緒時,Selector會再次將其放入已選擇鍵集中。
                         */
                        iter.remove();
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        };

        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < 2; i++) {
            Thread thread = new Thread(task);
            threadList.add(thread);
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
        System.out.println("main end");
    }

    static void handlerAccept(Selector selector, SelectionKey key) throws Exception {
        System.out.println("coming a new client... " + Thread.currentThread().getName());
        Thread.sleep(10000);
        SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
        channel.configureBlocking(false);
        channel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
    }

    static void handlerRead(SelectionKey key) throws Exception {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();

        int num = channel.read(buffer);
        if (num <= 0) {
            // error or fin
            System.out.println("close " + channel.getRemoteAddress());
            channel.close();
        } else {
            buffer.flip();
            String recv = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
            System.out.println("recv: " + recv);

            buffer = ByteBuffer.wrap(("server: " + recv).getBytes());
            channel.write(buffer);
        }
    }
}

netty線程模型實踐

(1) 時間可控的簡單業務直接在 I/O 線程上處理

時間可控的簡單業務直接在 I/O 線程上處理,若是業務很是簡單,執行時間很是短,不須要與外部網絡交互、訪問數據庫和磁盤,不須要等待其它資源,則建議直接在業務 ChannelHandler 中執行,不須要再啓業務的線程或者線程池。避免線程上下文切換,也不存在線程併發問題。

(2) 複雜和時間不可控業務建議投遞到後端業務線程池統一處理

複雜度較高或者時間不可控業務建議投遞到後端業務線程池統一處理,對於此類業務,不建議直接在業務 ChannelHandler 中啓動線程或者線程池處理,建議將不一樣的業務統一封裝成 Task,統一投遞到後端的業務線程池中進行處理。過多的業務ChannelHandler 會帶來開發效率和可維護性問題,不要把 Netty 看成業務容器,對於大多數複雜的業務產品,仍然須要集成或者開發本身的業務容器,作好和Netty 的架構分層。

(3) 業務線程避免直接操做 ChannelHandler

業務線程避免直接操做 ChannelHandler,對於 ChannelHandler,IO 線程和業務線程均可能會操做,由於業務一般是多線程模型,這樣就會存在多線程操做ChannelHandler。爲了儘可能避免多線程併發問題,建議按照 Netty 自身的作法,經過將操做封裝成獨立的 Task 由 NioEventLoop 統一執行,而不是業務線程直接操做,相關代碼以下所示:

若是你確認併發訪問的數據或者併發操做是安全的,則無需畫蛇添足,這個須要根據具體的業務場景進行判斷,靈活處理。

推薦閱讀

歡迎小夥伴關注【TopCoder】閱讀更多精彩好文。

img

相關文章
相關標籤/搜索