咱們都知道TCP是基於字節流的傳輸協議。那麼數據在通訊層傳播其實就像河水同樣並無明顯的分界線,而數據具體表示什麼意思什麼地方有句號什麼地方有分號這個對於TCP底層來講並不清楚。應用層向TCP層發送用於網間傳輸的、用8位字節表示的數據流,而後TCP把數據流分區成適當長度的報文段,以後TCP把結果包傳給IP層,由它來經過網絡將包傳送給接收端實體的TCP層。因此對於這個數據拆分紅大包小包的問題就是咱們今天要講的粘包和拆包的問題。php
粘包和拆包這兩個概念估計你們還不清楚,經過下面這張圖咱們來分析一下:
java
假設客戶端分別發送兩個數據包D1,D2個服務端,可是發送過程當中數據是何種形式進行傳播這個並不清楚,分別有下列4種狀況:web
咱們知道在TCP協議中,應用數據分割成TCP認爲最適合發送的數據塊,這部分是經過「MSS」(最大數據包長度)選項來控制的,一般這種機制也被稱爲一種協商機制,MSS規定了TCP傳往另外一端的最大數據塊的長度。這個值TCP協議在實現的時候每每用MTU值代替(須要減去IP數據包包頭的大小20Bytes和TCP數據段的包頭20Bytes)因此每每MSS爲1460。通信雙方會根據雙方提供的MSS值得最小值肯定爲此次鏈接的最大MSS值。express
tcp爲提升性能,發送端會將須要發送的數據發送到緩衝區,等待緩衝區滿了以後,再將緩衝中的數據發送到接收方。同理,接收方也有緩衝區這樣的機制,來接收數據。apache
發生粘包拆包的緣由主要有如下這些:bootstrap
應用程序寫入數據的字節大小大於套接字發送緩衝區的大小將發生拆包;數組
進行MSS大小的TCP分段。MSS是TCP報文段中的數據字段的最大長度,當TCP報文長度-TCP頭部長度>mss的時候將發生拆包;服務器
應用程序寫入數據小於套接字緩衝區大小,網卡將應用屢次寫入的數據發送到網絡上,將發生粘包;網絡
數據包大於MTU的時候將會進行切片。MTU即(Maxitum Transmission Unit) 最大傳輸單元,因爲以太網傳輸電氣方面的限制,每一個以太網幀都有最小的大小64bytes最大不能超過1518bytes,刨去以太網幀的幀頭14Bytes和幀尾CRC校驗部分4Bytes,那麼剩下承載上層協議的地方也就是Data域最大就只能有1500Bytes這個值咱們就把它稱之爲MTU。這個就是網絡層協議很是關心的地方,由於網絡層協議好比IP協議會根據這個值來決定是否把上層傳下來的數據進行分片。app
咱們知道tcp是無界的數據流,且協議自己沒法避免粘包,拆包的發生,那咱們只能在應用層數據協議上,加以控制。一般在制定傳輸數據時,可使用以下方法:
設置定長消息,服務端每次讀取既定長度的內容做爲一條完整消息;
使用帶消息頭的協議、消息頭存儲消息開始標識及消息長度信息,服務端獲取消息頭的時候解析出消息長度,而後向後讀取該長度的內容;
設置消息邊界,服務端從網絡流中按消息邊界分離出消息內容。好比在消息末尾加上換行符用以區分消息結束。
固然應用層還有更多複雜的方式能夠解決這個問題,這個就屬於網絡層的問題了,咱們仍是用java提供的方式來解決這個問題。咱們先看一個例子看看粘包是如何發生的。
服務端:
public class HelloWordServer {
private int port;
public HelloWordServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer());
try {
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
HelloWordServer server = new HelloWordServer(7788);
server.start();
}
}
服務端Initializer:
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 字符串解碼 和 編碼
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 本身的邏輯Handler
pipeline.addLast("handler", new HelloWordServerHandler());
}
}
服務端handler:
public class HelloWordServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String)msg;
System.out.println("server receive order : " + body + ";the counter is: " + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
客戶端:
public class HelloWorldClient {
private int port;
private String address;
public HelloWorldClient(int port,String address) {
this.port = port;
this.address = address;
}
public void start(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ClientChannelInitializer());
try {
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
HelloWorldClient client = new HelloWorldClient(7788,"127.0.0.1");
client.start();
}
}
客戶端Initializer:
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 客戶端的邏輯
pipeline.addLast("handler", new HelloWorldClientHandler());
}
}
客戶端handler:
public class HelloWorldClientHandler extends ChannelInboundHandlerAdapter {
private byte[] req;
private int counter;
public BaseClientHandler() {
req = ("Unless required by applicable law or agreed to in writing, software\n" +
" distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
" WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
" See the License for the specific language governing permissions and\n" +
" limitations under the License.This connector uses the BIO implementation that requires the JSSE\n" +
" style configuration. When using the APR/native implementation, the\n" +
" penSSL style configuration is required as described in the APR/native\n" +
" documentation.An Engine represents the entry point (within Catalina) that processes\n" +
" every request. The Engine implementation for Tomcat stand alone\n" +
" analyzes the HTTP headers included with the request, and passes them\n" +
" on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software\n" +
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
"# See the License for the specific language governing permissions and\n" +
"# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log\n" +
"# each component that extends LifecycleBase changing state:\n" +
"#org.apache.catalina.util.LifecycleBase.level = FINE"
).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message;
//將上面的全部字符串做爲一個消息體發送出去
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String buf = (String)msg;
System.out.println("Now is : " + buf + " ; the counter is : "+ (++counter));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
運行客戶端和服務端咱們能看到:
咱們看到這個長長的字符串被截成了2段發送,這就是發生了拆包的現象。一樣粘包咱們也很容易去模擬,咱們把BaseClientHandler中的channelActive方法裏面的:
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
這幾行代碼是把咱們上面的一長串字符轉成的byte數組寫進流裏發送出去,那麼咱們能夠在這裏把上面發送消息的這幾行循環幾遍這樣發送的內容增多了就有可能在拆包的時候把上一條消息的一部分分配到下一條消息裏面了,修改以下:
for (int i = 0; i < 3; i++) { message = Unpooled.buffer(req.length); message.writeBytes(req); ctx.writeAndFlush(message); }
改完以後咱們再運行一下,輸出太長很差截圖,咱們在輸出結果中能看到循環3次以後的消息服務端收到的就不是以前的完整的一條了,而是被拆分了4次發送。
對於上面出現的粘包和拆包的問題,Netty已有考慮,而且有實施的方案:LineBasedFrameDecoder。
咱們從新改寫一下ServerChannelInitializer:
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(2048));
// 字符串解碼 和 編碼
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 本身的邏輯Handler
pipeline.addLast("handler", new BaseServerHandler());
}
}
新增:pipeline.addLast(new LineBasedFrameDecoder(2048))。同時,咱們還得對上面發送的消息進行改造BaseClientHandler:
public class BaseClientHandler extends ChannelInboundHandlerAdapter {
private byte[] req;
private int counter;
req = ("Unless required by applicable dfslaw or agreed to in writing, software" +
" distributed under the License is distributed on an \"AS IS\" BASIS," +
" WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." +
" See the License for the specific language governing permissions and" +
" limitations under the License.This connector uses the BIO implementation that requires the JSSE" +
" style configuration. When using the APR/native implementation, the" +
" penSSL style configuration is required as described in the APR/native" +
" documentation.An Engine represents the entry point (within Catalina) that processes" +
" every request. The Engine implementation for Tomcat stand alone" +
" analyzes the HTTP headers included with the request, and passes them" +
" on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software" +
"# distributed under the License is distributed on an \"AS IS\" BASIS," +
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." +
"# See the License for the specific language governing permissions and" +
"# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log" +
"# each component that extends LifecycleBase changing state:" +
"#org.apache.catalina.util.LifecycleBase.level = FINE\n"
).getBytes();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message;
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String buf = (String)msg;
System.out.println("Now is : " + buf + " ; the counter is : "+ (++counter));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
去掉全部的」\n」,只保留字符串末尾的這一個。緣由稍後再說。channelActive方法中咱們沒必要再用循環屢次發送消息了,只發送一次就好(第一個例子中發送一次的時候是發生了拆包的),而後咱們再次運行,你們會看到這麼長一串字符只發送了一串就發送完畢。程序輸出我就不截圖了。下面來解釋一下LineBasedFrameDecoder。
LineBasedFrameDecoder的工做原理是它依次遍歷ByteBuf 中的可讀字節,判斷看是否有」\n」 或者」 \r\n」,若是有,就以此位置爲結束位置,從可讀索引到結束位置區間的字節就組成了一行。它是以換行符爲結束標誌的解碼器。支持攜帶結束符或者不攜帶結束符兩種解碼方式,同時支持配置單行的最大長度。若是連續讀取到最大長度後仍然沒有發現換行符,就會拋出異常,同時忽略掉以前讀到的異常碼流。這個對於咱們肯定消息最大長度的應用場景仍是頗有幫助。
對於上面的判斷看是否有」\n」 或者」 \r\n」以此做爲結束的標誌咱們可能回想,要是沒有」\n」 或者」 \r\n」那還有什麼別的方式能夠判斷消息是否結束呢。別擔憂,Netty對於此已經有考慮,還有別的解碼器能夠幫助咱們解決問題,下節咱們繼續學習。