ES 源代碼閱讀(三)

elasticsearch 1.7.5 中 的maven依賴  io.netty netty 3.10.5.Final
NettyTransport.java
首先使用 ServerSocketChannelFactory 來建立 ServerBootstrap 實例, 而後設置
piplineFactory.ServerSocketChannelFactory 包括兩種,oio/nio兩種實現.
對於ServerSocketChannelFactory須要兩種線程池,boss和worker.boss負責監聽端口,
每個端口都有專門的線程複製處理,當一個請求來臨時,boss線程會打開socket,封
裝成channel,而後將channel交給worker來完成,而後boss線程繼續監聽.
worker 線程則是因爲oio/nio 的兩種實現來決定.
對於oio方式,channel被交由worker後,以後直到socket都關閉的過程當中都有這個worker
線程處理.
而對於nio方式,每個worker能夠處理不一樣的channel.可使用少許的worker線程處
理大量channel. worker線程在處理channel時,交給channelPipline來處理.在channelPipline中
能夠有不少channelHandler.通常狀況下Pipline中的handler是由相關worker線程
來完成,所以爲了節約worker線程處理時間,一般另外建立線程和使用額外的線程池來
完成邏輯的處理.
netty3和4仍是有些差異的.
參考:
http://blog.163.com/wm_at163/blog/static/132173490201271154751439/
http://ifeve.com/channel-pipeline/java

NettyTransport.java 關鍵代碼
// 建立ServerBootstrap方法
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),Executors.newCachedThreadPool(workerFactory),workerCount));
// 加載pipline
serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory(name, settings));
ServerChannelPipelineFactory 做爲setPipelineFactory的參數. ServerChannelPipelineFactory 實現 org.jboss.netty.channel.ChannelPipelineFactory接口.
在生成getPipline方法中socket

            ChannelPipeline channelPipeline = Channels.pipeline();
            channelPipeline.addLast("openChannels", nettyTransport.serverOpenChannels);
            SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
            if (nettyTransport.maxCumulationBufferCapacity != null) {
                if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
                    sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
                } else {
                    sizeHeader.setMaxCumulationBufferCapacity((int) nettyTransport.maxCumulationBufferCapacity.bytes());
                }
            }
            if (nettyTransport.maxCompositeBufferComponents != -1) {
                sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents);
            }
            channelPipeline.addLast("size", sizeHeader);
            channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger, name));
            return channelPipeline;elasticsearch

這就表示:任何請求符合條件的都會通過pipline來處理.會交給MessageChannelHandler來完成處理.
最後bind開始提供服務.maven

相關文章
相關標籤/搜索