ElasticSearch中碰到的C10K問題

Elasticsearch基於Netty解決C10K問題背後的原理是JAVA NIO中的IO多路複用機制,涉及到三大"組件":SelectableChannel、Selector、SelectionKey。普通的"一請求一線程"方式,有一個線程負責accept請求,請求accepted後返回Channel,而後新建一個線程負責處理Channel上的IO事件。顯然當請求量達到C10K時,就得建立10K個線程,這對於一臺服務器是不可接受的。html

ServerSocketChannel ssc = ServerSocketChannel.open( );
ssc.socket( ).bind (new InetSocketAddress (port));
ssc.configureBlocking (false);
while (true) {
    System.out.println ("Waiting for connections");
    SocketChannel sc = ssc.accept( );
    if (sc == null) {
        // no connections, snooze a while
        Thread.sleep (2000);
    }else{
        Socket socket = sc.socket();// an accetped request
        //一請求一線程方式:new Thread processing sc.socket()
        
        //或者採用線程池方式:ExecutorService.execute(...) processing sc.socket()
    }
}

這時候,有人就會提出:accepted鏈接以後,也能夠不建立新線程,使用線程池來處理Channel上的IO事件。有一個線程負責accept請求,請求accepted後返回Channel,而後從線程池中取出一個線程負責處理Channel上的IO事件。這種方式只是當線程池中某個線程處理完Channel上的IO事件後,線程複用,又可讓它處理最新accepted的請求(這裏再也不new Thread了),可是當線程池中線程被耗盡(在10K的請求量下,線程池中有1w個線程嗎?)時,此時也無能爲力了。java

這種模型表示以下:(參考網友的圖:)編程

既然採用線程池並無解決C10K問題,線程池中的線程數量也是有限的,當有大量的IO請求時,IO事件通常都伴隨着阻塞操做,這些阻塞操做佔用了一個線程,但由於IO阻塞,線程就會被掛起,此時CPU卻很空閒。
而假設此時線程池中又沒有空閒線程了(要麼正在執行業務邏輯、要麼IO阻塞操做掛起了),此時就會看到:服務器的CPU利用率並不高,可是卻沒法接受新的鏈接請求,這也是爲何在故障檢查時發現CPU利用率並不高,可是日誌中卻有大量被拒絕的鏈接。服務器

CPU處理的事件有兩種類型:IO密集型、CPU密集型。假設CPU的核數爲16核,針對IO密集型任務,線程池中的線程數量能夠開到64個、128個...(固然不能無限制地達到幾萬個...),正是由於IO密集型任務有阻塞操做,多開線程能夠增長任務處理數量,從而提升CPU的吞吐量和利用率。而對於CPU密集型任務,線程池中線程數量通常設置爲17(CPU核數加1),由於CPU密集型任務,幾乎不會阻塞,一直在佔用CPU運行,這時線程池中建立大量線程反而會使CPU實際利用率(吞吐量)降低了,由於線程上下文切換消耗了大量系統資源。《JAVA併發編程實踐》中提到了CPU核數與線程數量之間的關係。架構

繼續分析,既然線程池的方式也不能解決C10K問題,這裏候就輪到IO多路複用機制了。(這裏引用了Netty中的EventLoopGroup)
原生 JAVA NIO處理、Netty處理的區別就是:Netty中把Channel上發生的IO事件的處理交給了EventLoopGroup來處理,EventLoopGroup實質是個ScheduledThreadPoolExecutor,它管理着若干EventLoop線程,EventLoop在各類文檔/資料中有一個專業名稱:I/O 事件線程。併發

這裏提個問題:爲何Netty裏面建議:不要使用EventLoopGroup處理IO阻塞操做,而是本身建立線程池,把IO阻塞操做代理給本身建立的線程池處理?socket

IO多路複用機制爲何能解決C10K問題?下面詳細分析why?
當新請求到來時,有一個單獨的線程負責accept請求,請求 accepted 後返回一個Channel,"使用"Selector在Channel上註冊它感興趣的事件,就是與前面2種方式的本質區別。這樣,無論請求量有多大(C10K的請求量),Server 都可以將之accepted,而後僅僅只是在建立的Channel上註冊了感興趣的事件而已(真正的IO事件可能還沒有發生)。
經過Selector輪詢,檢查哪一個Channel上註冊的事件發生了,若是事件發生了,才"開動"線程去處理(這個線程能夠來自EventLoopGroup線程池,也能夠是本身 new Thread ,也能夠是自已 new 一個ThreadPool中的線程)。這就是IO多路複用機制原理。因此,真正解決C10K問題的緣由是基於Selector的IO多路複用機制。oop

// Allocate an unbound server socket channel
ServerSocketChannel serverChannel = ServerSocketChannel.open( );
// Get the associated ServerSocket to bind it with
ServerSocket serverSocket = serverChannel.socket( );
// Create a new Selector for use below
Selector selector = Selector.open( );
// Set the port the server channel will listen to
serverSocket.bind (new InetSocketAddress (port));
// Set nonblocking mode for the listening socket
serverChannel.configureBlocking (false);
// Register the ServerSocketChannel with the Selector
serverChannel.register (selector, SelectionKey.OP_ACCEPT);
while (true) {
// This may block for a long time. Upon returning, the selected set contains keys of the ready channels.
    int n = selector.select();
    if (n == 0) {
        continue; // nothing to do
    }
    // Get an iterator over the set of selected keys
    Iterator it = selector.selectedKeys().iterator( );
    // Look at each key in the selected set
    while (it.hasNext( )) {
        SelectionKey key = (SelectionKey) it.next( );
        // Is a new connection coming in?
        if (key.isAcceptable( )) {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel channel = server.accept();
            registerChannel (selector, channel,SelectionKey.OP_READ);
            sayHello (channel);
        }
        // Is there data to read on this channel?
        if (key.isReadable( )) {
            readDataFromSocket (key);
        }
    //.....

在IO多路複用機制下,Server accepted 鏈接後返回一個Channel,並在Channel上註冊感興趣的事件(好比讀操做對應着讀事件)。在實際TCP鏈接中,創建了鏈接並不表明就當即發送數據了,IO多路複用基於Selector輪詢(epoll),只有當數據發送過來了,底層OS把事件"通知"給Selector,數據就緒後,才"開動"EventLoopGroup中的EventLoop線程去處理數據。(readiness selection),這樣Server處理C10K的鏈接就成爲可能了。以下圖:每一個Socket(Channel)上的相應的事件都註冊到Selector,而後有一個線程輪詢Selector selector.select(),當某個Socket上的事件發生了時,再進行相應處理。post

只是在原生的JAVA NIO下,咱們須要本身編寫代碼如何處理每一個就緒選擇的事件。而基於Netty,已經幫咱們封裝好了這些處理邏輯,每一個Channel上的事件直接交由EventLoopGroup處理,示例圖以下:this

在這裏EventLoopGroup相當重要,由於已就緒的IO事件是交給它來處理的(take EventLoop-n and bind EventLoop-n to Channel),若是EventLoopGroup中的線程執行某種"阻塞"操做(EventLoop-n process IO),那就會影響可以處理已就緒的IO事件數量,進而影響Server能接受/處理多少鏈接。所以,能夠本身再建立一個線程池,把阻塞操做交給該線程池執行,就能保證EventLoopGroup高效地處理已發生的IO事件而不發生阻塞。

實際應用

Kafka Borker處理Client的請求是基於Reactor模式

  • acceptor 線程監聽Client的鏈接請求。
  • 請求創建後,生成SocketChanel(可理解爲Client與Broker之間發消息通道),processor 線程將SocketChannel上的發生的"事件"放到一個請求隊列中(queued.max.requests參數),processor 線程 就是 IO事件線程,而IO事件線程最好是不能阻塞的。
  • KafkaRequestHandler線程池,這是真正的執行業務邏輯處理的線程。processor線程將 業務邏輯處理(如可能發生的IO阻塞操做)代理給KafkaRequestHandler線程池來處理。該線程池中線程數量由broker參數 num.io.threads指定。

我的理解,可能有錯誤。

參考資料:

原文:http://www.javashuo.com/article/p-waoliekm-dw.html

相關文章
相關標籤/搜索