架構設計:系統間通訊(7)——IO通訊模型和Netty 下篇

接上文《架構設計:系統間通訊(6)——IO通訊模型和Netty 上篇java

五、再次審視爲何使用Netty

上篇文章咱們討論了Netty的基本原理,重要概念,並使用java代碼描述了Netty的基本使用。固然Netty的技術涵蓋點遠遠不是那一篇基礎代碼就能夠所有歸納的,可是至少能夠給讀者一個切入點。讓你們去思考一個咱們一直在討論的問題:爲何有了JAVA NIO框架後咱們還須要有Netty這樣的框架對底層再次進行封裝?python

5-一、IO模型的封裝

5-1-一、再次總結IO模型

在前文中咱們已經提到了,幾種典型的IO模型(參見系統間通訊三、四、5這三篇文章中的介紹,這裏再進行一次總結):linux

  • 阻塞和非阻塞:這個概念是針對應用程序而言,是指應用程序中的線程在向操做系統發送IO請求後,是否一直等待操做系統的IO響應。若是是,那麼就是阻塞式的;若是不是,那麼應用程序通常會以輪詢的方式以必定週期詢問操做系統,直到某次得到了IO響應爲止(輪序間隔應用程序線程能夠作一些其餘工做)。c++

  • 同步和異步:IO操做都是由操做系統進行的(這裏的IO操做是個普遍概念了:磁盤IO、網絡IO都算),不一樣的操做系統對不一樣設備的IO操做都有不一樣的模式。同步和異步這兩個概念都指代的操做系統級別,同步IO是指操做系統和設備進行交互時,必須等待一次完整的請求-響應完成,才能進行下一次操做(固然操做系統和設備自己也有不少技術加快這個反應過程,例如「磁盤預讀」技術、數據緩存技術);異步IO是指操做系統和設備進行交互時,沒必要等待本次獲得響應,就能夠直接進行下一次操做請求。設備處理完某次請求後,會主動給操做系統相應的響應通知web

  • 多路複用IO:多路複用IO,從本質上看仍是一種同步IO,由於它沒有100%消除IO_WAIT,操做系統也沒有爲它提供「主動通知」機制。可是多路複用IO的處理速度已經至關快了,利用設備執行IO操做的時間,操做系統能夠繼續執行IO請求。並一樣採用週期性輪詢的方式,獲取一批IO操做請求的執行響應。操做系統支持的多路複用IO技術主要有select、poll、epoll、kqueue。shell

  • 阻塞式同步IO模型:這個從字面上就很好理解了,應用程序請求IO操做,並一直等待處理結果;操做系統同時也進行IO操做,並等待設備的處理結果;能夠看出,應用程序的請求線程和操做系統的內核線程都是等待狀態。apache

  • 非阻塞式同步IO模型:應用程序請求IO,而且不用一直等待返回結果就去作其餘事情。隔必定的週期,再去詢問操做系統上次IO操做有沒有結果,直到某一次詢問從操做系統拿到IO結果;操做系統內核線程在進行IO操做時,仍是處理一直等待設備返回操做結果的狀態。bootstrap

  • 非阻塞式多路複用IO模型:應用程序請求IO的工做採用非阻塞方式進行;操做系統採用多路複用模式工做。windows

  • 非阻塞式異步IO模型:應用程序請求IO的工做採用非阻塞方式進行,可是不須要輪詢了,由於操做系統異步IO其中一個主要特性就是:能夠在有IO響應結果的時候,主動進行通知緩存

5-1-二、對IO模型的再次封裝

以上這些IO工做模型,在JAVA中都可以找到對應的支持:傳統的JAVA Socket套接字支持阻塞/非阻塞模式下的同步IO(有的技術資料裏面也稱爲OIO或者BIO);JAVA NIO框架在不一樣操做系統下支持不一樣種類的多路複用IO技術(windows下的select模型、Linux下的poll/epoll模型);JAVA AIO框架支持異步IO(windows下的IOCP和Linux使用epoll的模擬AIO)

實際上Netty是對JAVA BIO 、JAVA NIO框架的再次封裝。讓咱們再也不糾結於選用哪一種底層實現。您能夠理解成Netty/MINA 框架是一個面向上層業務實現進行封裝的「業務層」框架。而JAVA Socket框架、JAVA NIO框架、JAVA AIO框架更偏向於對下層技術實現的封裝,是面向「技術層」的框架

5-二、數據信息格式的封裝

「技術層」框架自己只對IO模型技術實現進行了封裝,並不關心IO模型中流淌的數據格式;「業務層」框架對數據格式也進行了處理,讓咱們能夠抽出精力關注業務自己。

  • Protobuf數據協議的集成:Netty利用自身的Channelpipeline的設計(在《架構設計:系統間通訊(6)——IO通訊模型和Netty 上篇》中講過),對Protobuf數據協議進行了無縫結合。

  • JBoss Marshalling數據協議的集成:JBoss Marshalling 是一個Java對象的序列化API包,修正了JDK自帶的序列化包的不少問題,又保持跟 java.io.Serializable 接口的兼容。Netty經過封裝這個協議,能夠幫助咱們在客戶端-服務端簡便的進行對象系列化和反序列化。

  • HTTP Request / HTTP Response 協議的集成:在Netty中,能夠方便的接受和發送Http協議。也就是說,咱們可使用Netty搭建本身的WEB服務器,固然您還能夠根據本身的業務要求,方便的設計相似於Struts、Spring MVC這樣的WEB框架。

下面是一個使用Netty的Http編碼/解碼處理器,設計的一個簡單的WEB服務器:

package testNetty;

import java.net.InetSocketAddress;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ThreadFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultThreadFactory;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class TestHTTPNetty {
    static {
        BasicConfigurator.configure();
    }

    public static void main(String[] args) throws Exception {
        //這就是主要的服務啓動器
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        //=======================下面咱們設置線程池(代碼已經詳細講解過,就再也不贅述了)
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        ThreadFactory threadFactory = new DefaultThreadFactory("work thread pool");
        int processorsNumber = Runtime.getRuntime().availableProcessors();
        EventLoopGroup workLoogGroup = new NioEventLoopGroup(processorsNumber * 2, threadFactory, SelectorProvider.provider());
        serverBootstrap.group(bossLoopGroup , workLoogGroup);

        //========================下面咱們設置咱們服務的通道類型(代碼已經詳細講解過,就再也不贅述了)
        serverBootstrap.channel(NioServerSocketChannel.class);

        //========================設置處理器
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            /* (non-Javadoc) * @see io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.Channel) */
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                //咱們在socket channel pipeline中加入http的編碼和解碼器
                ch.pipeline().addLast(new HttpResponseEncoder());
                ch.pipeline().addLast(new HttpRequestDecoder());
                ch.pipeline().addLast(new HTTPServerHandler());
            }
        });

        serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
        serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 83));
    }
}

/** * @author yinwenjie */
@Sharable
class HTTPServerHandler extends ChannelInboundHandlerAdapter {
    /** * 日誌 */
    private static Log LOGGER = LogFactory.getLog(HTTPServerHandler.class);

    /** * 因爲一次httpcontent可能沒有傳輸徹底部的請求信息。因此這裏要作一個連續的記錄 * 而後在channelReadComplete方法中(執行了這個方法說明此次全部的http內容都傳輸完了)進行處理 */
    private static AttributeKey<StringBuffer> CONNTENT = AttributeKey.valueOf("content");

    /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        /* * 在測試中,咱們首先取出客戶端傳來的參數、URL信息,而且返回給一個確認信息。 * 要使用HTTP服務,咱們首先要了解Netty中http的格式,以下: * ---------------------------------------------- * | http request | http content | http content | * ---------------------------------------------- * * 因此經過HttpRequestDecoder channel handler解碼後的msg多是兩種類型: * HttpRquest:裏面包含了請求head、請求的url等信息 * HttpContent:請求的主體內容 * */
        if(msg instanceof HttpRequest) {
            HttpRequest request = (HttpRequest)msg;
            HttpMethod method = request.getMethod();

            String methodName = method.name();
            String url = request.getUri();
            HTTPServerHandler.LOGGER.info("methodName = " + methodName + " && url = " + url);
        } 

        //若是條件成立,則在這個代碼段實現http請求內容的累加
        if(msg instanceof HttpContent) {
            StringBuffer content = ctx.attr(HTTPServerHandler.CONNTENT).get();
            if(content == null) {
                content = new StringBuffer();
                ctx.attr(HTTPServerHandler.CONNTENT).set(content);
            }

            HttpContent httpContent = (HttpContent)msg;
            ByteBuf contentBuf = httpContent.content();
            String preContent = contentBuf.toString(io.netty.util.CharsetUtil.UTF_8);
            content.append(preContent);
        }
    } 

    /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelReadComplete(io.netty.channel.ChannelHandlerContext) */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        HTTPServerHandler.LOGGER.info("super.channelReadComplete(ChannelHandlerContext ctx)");

        /* * 一旦本次http請求傳輸完成,則能夠進行業務處理了。 * 而且返回響應 * */
        StringBuffer content = ctx.attr(HTTPServerHandler.CONNTENT).get();
        HTTPServerHandler.LOGGER.info("http客戶端傳來的信息爲:" + content);

        //開始返回信息了
        String returnValue = "return response";
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        HttpHeaders httpHeaders = response.headers();
        //這些就是http response 的head信息咯,參見http規範。另外您還能夠設置本身的head屬性
        httpHeaders.add("param", "value");
        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
        //必定要設置長度,不然http客戶端會一直等待(由於返回的信息長度客戶端不知道)
        response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, returnValue.length());

        ByteBuf responseContent = response.content();
        responseContent.writeBytes(returnValue.getBytes("UTF-8"));

        //開始返回
        ctx.writeAndFlush(response);
    } 
}

因爲上篇文章中已經介紹了Netty的基本使用方法,因此以上的代碼將其餘沒必要要的註釋、方法都去掉了,只作了實現web服務器的最簡代碼。可是這段代碼本省是能夠運行的。下面是運行效果:

這裏寫圖片描述

5-三、解決了「技術層」框架中的技術問題

經過閱讀Netty框架的代碼,咱們知道了Netty框架至少解決了JAVA NIO框架中的一些Bug:

sun.nio.ch.Util contains code which is not thread safe and can throw a NullPointerException:
private static String bugLevel = null;

    static boolean atBugLevel(String bl) {      // package-private
        if (bugLevel == null) {
            if (!sun.misc.VM.isBooted())
                return false;
            java.security.PrivilegedAction pa =
                new GetPropertyAction("sun.nio.ch.bugLevel");
// the next line can reset bugLevel to null
            bugLevel = (String)AccessController.doPrivileged(pa);
            if (bugLevel == null)
                bugLevel = "";
        }
        return (bugLevel != null) && bugLevel.equals(bl);
    }

Suppose that two threads enter the "if (buglevel == null)" body at the same time. The first one runs until the return line and gets scheduled out right after the (buglevel != null) check. The second one then runs until right after the doPrivileged() call, sets bugLevel to null and gets scheduled out. The first one continues and hits a NullPointerException while calling bugLevel.equals() with bugLevel being null.

這個問題在Netty框架中,負責進行JAVA NIO Selector的NioEventLoop類中獲得瞭解決。

  • workaround the infamous epoll 100% CPU bug。http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933。這個Bug出如今linux系統環境,大體是說JAVA NIO 框架在實現 Linux內核 kernel 2.6+中的epoll模型時。Selector.select(timeout)方法不能阻塞指定的timeout時間,致使CPU 100%的狀況:
A DESCRIPTION OF THE PROBLEM :
Trying to get all bindings from the transient nameserver brings orbd into a state where it consumes 100% CPU. Its interesting to note that the problem only occurs if the client is programmed in c++. I was not able to reproduce the problem with a client programmed in Java.

STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
1) Get omniORB 4.1 from http://omniorb.sourceforge.net

2) Compile omniORB (requires python devel package installed)
cd /tmp
mkdir omni_local
tar xvzf omniORB-4.1.0.tar.gz
cd omniORB-4.1.0
./configure --prefix=/tmp/omni_local
make
make install

3) Compile the test program (binding_browser, source attached to this report)
g++ -I/tmp/omni_local/include -L/tmp/omni_local/lib -lomniORB4 -lomniDynamic4 -lomnithread -lpthread -lrt BindingBrowser.cc -o binding_browser

4) Start orbd
<JAVA_HOME>/bin/orbd -ORBInitialPort 12345

5) Start binding_browser (from another shell)
5a) export LD_LIBRARY_PATH=/tmp/omni_local/lib:$LD_LIBRARY_PATH
5b) ./binding_browser -ORBInitRef NameService=corbaloc::1.2@localhost:12345/TNameService

Repeat step 5b until orbd consumes 100% cpu.

這個問題從官方的Bug Database中的描述看,是在JDK7的版本中被解決的。Netty框架在JDK 6+的環境下在JAVA NIO框架封裝之上解決了這個Bug。

5-四、半包和粘包問題

咱們考慮一下這樣的狀況:咱們編寫了一個機器人控制程序,經過一個遙控器(客戶端)向機器人(服務器)創建了一個長鏈接,並經過這個鏈接接二連三的從遙控器發送控制指令給機器人。因爲是連續控制指令,因此指令與指令之間沒有間隔(實際上您還能夠想一想不少相似場景,例如:開發的Online對戰遊戲)。

咱們使用JSON格式做爲指令數據的承載格式。那麼發送方和接收方的數據發送-接受過程可能以下圖所示。

這裏寫圖片描述

經過上圖咱們看到了接收方爲了接受這兩條連貫的指令,一共作了三次接受,第二次接收的時候,收到了一部分message1的內容和一部分message2的內容。這裏要說明幾個注意事項:

  • MSS:MSS屬性是TCP鏈接雙方在三次握手時所確認的每個TCP報文段中數據字段的最大長度。注意,一是鏈接雙方協商出來的;二是隻是數據段的最大長度,不包括IP協議頭和TCP協議頭的最大長度。

  • 半包是指接收方應用程序在接收信息時,沒有接收到一個完成的信息格式塊;粘包是指,接收方應用程序在接受信息時,除了接收到發送方應用程序發送的某一個完整數據信息描述外,還接受到了一下發送方應用程序發送的下一個數據信息的一部分。

  • 半包和粘包是針對應用程序來講的,這個問題只會發生在TCP一些進行連續發送數據時(TCP長鏈接)。UDP不會出現這個問題,由於UDP都是有邊界的數據報;TCP短鏈接也不會出現,由於發送完一個指令信息後鏈接就斷開了,不會發送第二個指令數據。

  • 半包和粘包問題產生的根本是由於TCP本質上沒有「數據塊」的概念,而是一連串的數據流。在應用程序層面上咱們所定義的「數據塊」在TCP層面上並不被協議承認

  • 半包/粘包是一個應用層問題。要解決半包/粘包問題,就是在應用程序層面創建協商一致的信息還原依據。常見的有兩種方式:一是消息定長,即保證每個完整的信息描述的長度都是必定的,這樣不管TCP/IP協議如何進行分片,數據接收方均可以按照固定長度進行消息的還原。二是在完整的一塊數據結束後增長協商一致的分隔符(例如增長一個回車符)。

在JAVA NIO技術框架中,半包和粘包問題咱們須要本身解決,若是使用Netty框架,它其中提供了多種解碼器的封裝幫助咱們解決半包和粘包問題。甚至針對不一樣的數據格式,Netty都提供了半包和粘包問題的現成解決方式,例如以前咱們提到的ProtobufVarint32FrameDecoder解碼器,就是專門解決Protobuf數據格式在TCP長鏈接傳輸時的半包問題的。

下文中咱們會介紹FixedLengthFrameDecoder、DelimiterBasedFrameDecoder、LineBasedFrameDecoder來解決半包/粘包的問題。

因爲上文中咱們已經經過完整的代碼演示了Netty的基本使用,因此下面的示例代碼中爲了節約篇幅,我只會列出重要的代碼片斷。

5-4-一、使用FixedLengthFrameDecoder解決問題

FixedLengthFrameDecoder解碼處理器將TCP/IP的數據按照指定的長度進行從新拆分,若是接收到的數據不知足設置的固定長度,Netty將等待新的數據到達:

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ByteArrayEncoder());
        ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
        ch.pipeline().addLast(new TCPServerHandler());
        ch.pipeline().addLast(new ByteArrayDecoder());
    }
});

Netty上層的channelRead事件方法將在Channel接收到20個字符的狀況下被觸發;而若是剩餘的內容不到20個字符,channelRead方法將不會被觸發(但注意channelReadComplete方法會觸發的啦)。

5-4-二、使用LineBasedFrameDecoder解決問題

LineBasedFrameDecoder解碼器,基於最簡單的「換行符」進行接收到的信息的再組織。windows和linux兩個操做系統中的「換行符」是不同的,LineBasedFrameDecoder解碼器都支持。固然這個解碼器沒有咱們後面介紹的DelimiterBasedFrameDecoder解碼器靈活。

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ByteArrayEncoder());
        ch.pipeline().addLast(new LineBasedFrameDecoder(100));
        ch.pipeline().addLast(new TCPServerHandler());
        ch.pipeline().addLast(new ByteArrayDecoder());
    }
});

那麼若是客戶端發送的數據是:

this is 0 client \r\n request 1 \r\n」

那麼接收方從新經過「換行符」從新組織後,將分兩次接受到數據:

this is 0 client
request 1

5-4-三、使用DelimiterBasedFrameDecoder解決問題

DelimiterBasedFrameDecoder是按照「自定義」分隔符(也能夠是「回車符」或者「空字符」注意windows系統中和linux系統中「回車符」的表示是不同的)進行信息的從新拆分。

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ByteArrayEncoder());
        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1500, false, Delimiters.lineDelimiter()));
        ch.pipeline().addLast(new TCPServerHandler());
        ch.pipeline().addLast(new ByteArrayDecoder());
    }
});

DelimiterBasedFrameDecoder有三個參數,這裏介紹一下:

DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, ByteBuf... delimiters)
  • maxFrameLength:最大分割長度,若是接收方在一段長度 大於maxFrameLength的數據段中,沒有找到指定的分隔符,那麼這個處理器會拋出TooLongFrameException異常。

  • stripDelimiter:這個是一個布爾型參數,指代是否保留指定的分隔符。

  • delimiters:設置的分隔符。通常使用Delimiters.lineDelimiter()或者Delimiters.nulDelimiter()。固然您也能夠自定義分隔符,定義成bytebuf的類型就好了。

5-五、專一於業務

Netty框架的特性,使咱們不須要關心下層所工做的IO模型,利用Netty提供的面向事件驅動的方法結構,使咱們更能集中精力關注應用層業務。

在這5篇文章中,咱們重點介紹了幾種典型的IO模型,而且介紹了JAVA語言對這幾種IO模型的實現,最後咱們簡單介紹了一下Netty框架,而且比較了JAVA NIO框架和Netty框架側重點。實際上這幾篇文章咱們講述的問題只有一個「信息如何進行傳遞」。

從下一篇文章開始,咱們開始介紹JAVA RMI技術,並從JAVA RMI技術引出一項系統間通訊重要的技術RPC,咱們還會降到RPC的重要實現 TaoBao-Dubbo框架,最後咱們講解ESB技術和幾個典型的ESB實現。這些技術要解決的問題是「傳遞過程若是進行統籌管理」。

相關文章
相關標籤/搜索