現在咱們使用通常目的的應用和類庫來相互交流。例如,咱們常用HTTP客戶端庫來從Web服務器端獲取信息和經過RPC的 方式來調用WebService。然而,通常目的的協議或實現並不能很好地伸縮。就像咱們不會使用常規的HTTP協議來交換大文件,電子郵件以及實時的消息如金融信息或多用戶的遊戲數據。這些都是針對特殊目的實現的高度優化的協議。例如,你可能會針對基於ajax的聊天應用,媒體流以及大文件傳輸場景使用優化的協議。甚至你會根據你本身的須要設計並實現一個全新的協議。另一種不可避免的狀況是你必須處理遺留下來的與老系統進行交互的協議。那種狀況下須要關心的是在不犧牲最終應用穩定性和性能的狀況如何快速地實現那個協議。html
Netty項目是在盡最大努力提供一個異步的,基於事件驅動的網絡應用框架以及爲可維護的,高性能的,高擴展性的協議服務器和客戶端的快速部署提供工具支持。java
換句話說,Netty是一個NIO的客戶端和服務端框架,經過它能使咱們快速簡單地開發網絡應用(如協議服務端與客戶端)。它極大地簡化和流水線化了網絡編程,如TCP和UDP網絡服務器的開發。ajax
簡單快速並不意味着會致使應用程序出現可維護性與性能問題。Netty在設計時就考慮到了許多協議的實現如FTP,SMTP,HTTP以及一些二進制和文本的遺留協議的一些經驗。做爲結果,Netty成功地找到了一種在不妥協的前提下實現開發簡單,高性能,高穩定性,靈活性的方式。編程
一些用戶可能已經發現一些其它的網絡應用框架聲明有一樣的優點,而且你可能會問Netty與其它框架的不一樣以外在哪裏。答案就在於它所基於的哲理。Netty設計的初衷就是爲了在API的角度和實現的第一天開始就提供最好的用戶體驗。這不是有開有的東西,但你會意識到,當你閱讀這篇教程並與Netty玩時,你的生活將變得更加容易。bootstrap
這個章節圍繞Netty的核心結構,經過簡單的例子讓你快速入門。當你在本章末尾的時候,你能夠快速寫一個客戶端和服務器程序。緩存
若是你喜歡自頂向下的方式學習一些東西,你可能會喜歡從第二章開始,再回到這裏。bash
運行本章中引入的例子的最低要求有兩個:最新的Netty版本以及JDK1.6或以上。最新版本的Netty在下載頁進行下載。爲了下載正確版本的JDK,請到JDK供應商的網站上進行下載。服務器
正如你所讀到的,你可能會對本章中所引入的類在點疑惑。請在須要知道API明細的時候查看對應的API。爲方便起見,在這文章的類都連接到了在線的文檔。另外,請不要猶豫向Netty Project Community發送郵件,並讓咱們知道是否有不正確的信息,語法錯誤,打印錯誤以及提升文檔的一些好的想法。網絡
現實中的最簡單的協議並非"Hello,World!",而是DISCARD。這是一個不進行響應,直接丟棄收到的數據的協議。app
爲了實現DISCARD協議,你惟一須要作的事情是忽略任何接收到的數據。咱們直接從控制器的實現開始,這個控制器用來處理Netty產生的IO事件:
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
複製代碼
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
複製代碼
到目前爲止,一切正常。咱們已經實現了DISCARD服務器的前半部分。接下來咱們來運行DiscardServer中的main方法來啓動服務器。
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
複製代碼
既然咱們已經寫了咱們的第一個服務器,咱們須要測試來檢測這工做是否正常。最簡單的方式是使用telnet來進行測試。例如,你能夠在命令行輸入telnet 8080,並輸入一些內容。
然而,咱們能夠說服務器工做正常嗎?咱們並不可以肯定,由於它是一個Discard服務器。咱們不會獲得任何響應。爲了驗證它正在工做,咱們修改服務器讓它輸出所接收到的數據。
咱們如今已經知道channelReload()方法會在接收到數據時被調用。咱們在DidcardServerHandler類中的channelReload方法中添加一些代碼。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
複製代碼
若是你再次運行telnet命令,你會發現服務器輸入你所接收到的數據。 完整的discard服務器的代碼在io.netty.example.discard包下。
到目前爲止,咱們已經消費了數據但並無一點響應。一個服務器,然而通常都是用來響應請求的。讓咱們經過實現g 一個Echo協議來學習如何向客戶端響應請求,在這個例子裏,服務器返回接收到的數據。
與前一小節咱們實現的discard服務器不一樣的是它返回客戶端接收到的數據,而不是在控制檯輸出。所以,修改channelReload()方法已經足夠了。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
複製代碼
若是你再次運行telnet命令,你會發現服務器向你發回你所發送的數據。
echo服務器的完整代碼在io.netty.example.echo包下。
這個小節須要實現的協議是TIME協議。這跟之前的例子有點不一樣。在這個例子中,它會發送一個32位的整數,同時並不會接收任何請求。當消息發送完成以後會關閉鏈接。在這個例子中,你將會學習到如何構造和發送消息,在完成發送時關閉鏈接。
因爲咱們會忽略掉在鏈接創建好之後接收到的任何數據,咱們此次不使用channelReload方法。做爲替代,咱們重寫了channelActive方法。如下是具體實現:
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
複製代碼
可是請等一下,flip去哪裏了?在NIO中咱們發送數據以前不須要調用java.nio.ByteBuffer.flip()方法嗎?ByteBuf沒有這樣的方法,由於這有兩個指針,一個是用來讀的,一個是用來寫的。寫的指針會隨着數據寫入的增長而增長,而讀指針不會發生變化。
相反,NIO Buffer在沒有調用flip方法這前,並不會提供一個清除方式來計算消息是從哪裏開始與結束的。當你沒有調用flip方法,你會遇到一些麻煩。由於沒有數據或錯誤的數據將會被髮送。這種錯誤不會發生在Netty中,由於咱們針對不一樣的類型有不一樣的指針。你會發現隨着你熟悉netty,你的生活將會變得更加容易-一個沒有flipping out的生活。
另一個須要注意的點是ChannelHandlerContext.write()方法和writeAndFlush()方法,它們會返回一個ChannelFutrue。ChannelFuture代碼一個尚未出現IO操做。這意味着,任何請求操做可能尚未被操做,由於全部的操做在Netty中都是異步的。例如,下面的代碼可能在發送消息以前就關閉鏈接了。
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
複製代碼
所以,你須要在write()方法返回的ChannelFuture方法完成以後調用close()方法。它會通知它的監聽器全部的操做都已經完成了。請注意,close()方法也不會馬上關閉鏈接,它會返回一下ChannelFuture。
在寫請求完成以後咱們要怎樣被通知?這隻須要簡單地爲返回的ChannelFuture對象添加一個ChannelFutrueListener。在這裏,咱們建立了一個匿名的ChannelFutureListener,在這個Listener中會在操做完成以後關閉通道。
可選的,你可使用預約義的監聽器:
f.addListener(ChannelFutureListener.CLOSE);
複製代碼
爲了檢測咱們的時間服務器是否預期的工做,可使用unix的rdate命令:
rdate -o <port> -p <host>
複製代碼
端口號是main函數是指定的端口號,通常狀況下是localohst。
不像DISCARD和ECHO服務器,咱們須要爲TIME協議提供一個客戶端。由於用戶沒法將一個整數轉換爲日曆中的某個日期。在這個小節中,咱們將討論如何讓服務器工做正常,並學習如何用Netty寫一個客戶端。
服務端和客戶端最大的也是惟一的不一樣之處是Netty中的客戶端使用了不一樣的Bootstrap和Channel實現。咱們來看一下如下的代碼:
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
複製代碼
正如你所看到的,這跟服務端的代碼不是徹底不同。那麼ChannelHandler的實現是怎麼樣的呢?它應該接收一個32個字節的整數,並將它轉換爲可供人類閱讀的格式,打印轉換後的時間,最後關閉鏈接。
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
複製代碼
這看上去很簡單,看上去跟服務端的例子差很少。然而,這個handler有時會拒絕工做,並搜出IndexOutOfBoundsException,這個會在下一小節進行詳細介紹。
在基於流的傳輸通道(如TCP/IP)中,接收到的數據是放在Socket接收緩存中的。不幸的是,基於流的傳輸通道不是數據包的隊列而是基於字節的隊列。這意味着,即便你是經過兩個獨立的包發送數據的,操做系統並不會把它們當成兩條消息,而是一堆字節。所以,你所讀取到的數據並不必定是遠程的其它節點寫入的數據。例如,咱們想像一下操做系統的TCP/IP棧接收到了三個包。
讓咱們回到TIME客戶端的例子。咱們也有相同的問題。一個32位的整數是一個很是小的數據,它不太可能會被分割。然而,問題是它能夠被分割,分割的可能性會致使流量增長。
最簡單的方法是建立一個內部的累計的緩存,而且等到全部的四個字節都寫入到緩存。如下是修改過的TimeClientHandler實現,這個實現修復了這個問題。
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
複製代碼
儘管第一個方案已經解決了TIME客戶端所遇到的問題,修改後的handler看上去並非那樣的乾淨。想像一下一個更加複雜的協議,它由多個字段組成,如一個可變的長度字段。你的ChannelInboundHandler的實現類立刻會變得不可維護。
正如你所看到的,你能夠添加不止一個Channel到ChannelPipeLine。所以,你能夠將一個總體的ChannelHandler分割成多個模塊化的Handler來下降系統的複雜性。例如,你能夠將TimeClinetHandler分割成兩個Handler。
幸運的是,Netty提供了一個要擴展的類來幫助你編寫程序。
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
複製代碼
既然咱們已經往ChannelPipeLine中插入了一個新的handler,咱們應該修改ChannelInitializer實現:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
複製代碼
若是你是一個勇於冒險的人,你可能會想要嘗試ReplayingDecoder,它使得解碼變得更加容易。你須要查閱API來得到更多信息。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
複製代碼
另外,Netty也提供了開箱即用的解碼器來幫助你實現你的協議更加容易,避免出現總體的,不可維護的handler實現。請查看如下包來得到更多的例子:
到目前爲止的全部例子使用了ByteBuf做爲協議消息的主要結構。在這個例子中,咱們會實現TIME協議的客戶端和服務端的例子,這些例子中使用了POJO代替了ByteBuf。
在ChannelHandler中使用POJO的優點是很是明顯的。個人Handler將變得更加可維護與可重用,而且能夠將咱們的代碼從解析ByteBuf中的數據分離出來。在TIME的客戶端和服務端的例子中,咱們只讀取了一個32位的整數,而且這並非一個直接使用ByteBuf的主要問題。然而在現實中的協議咱們發現頗有必要進行分離。
首先,咱們定義了一個新的類型叫作UnixTime。
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
複製代碼
咱們如今能夠修改TimeDecoder來產生一個UnixTime來代替ByteBuf。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
複製代碼
在更新後的Decoder中,TimeClientHandler再也不使用ByteBuf了。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
複製代碼
代碼變得更加簡單與優雅了,是否是?一樣的技術也能夠應用在服務端,此次咱們先更新一下TimeServerHandler。
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
複製代碼
如今,少了的部分是一個編碼器,它是一個ChannelInboundHandler的實現類。如今編寫解碼器變得更加簡單了,由於如今沒有必要處理包分段和編碼消息時進行裝配。
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
複製代碼