看過dubbo源碼的同窗應該都清楚,使用dubbo協議的底層通訊是使用的netty進行交互,而最近看了dubbo的Netty部分後,本身寫了個簡單的Netty通訊例子。
html
本文源地址:實現Netty進行通訊java
工程截圖git
模塊詳解github
rpc-common做爲各個模塊都需使用的模塊,工程中出現的是一些通訊時請求的參數以及返回的參數,還有一些序列化的工具。apache
rpc-client中目前只是單單的一個NettyClient啓動類。json
rpc-client中目前也只是單單的一個NettyServer服務啓動類。bootstrap
須要的依賴api
目前全部的依賴項都出如今 rpc-common 下的 pom.xml中。服務器
<dependencies> <!-- Netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.10.Final</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <!-- Protostuff --> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.0.9</version> </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.0.9</version> </dependency> <!-- Objenesis --> <dependency> <groupId>org.objenesis</groupId> <artifactId>objenesis</artifactId> <version>2.1</version> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.38</version> </dependency> </dependencies>
首先咱們在common中先定義本次的Request和Response的基類對象。框架
public class Request { private String requestId; private Object parameter; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Object getParameter() { return parameter; } public void setParameter(Object parameter) { this.parameter = parameter; } } public class Response { private String requestId; private Object result; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } }
使用fastJson進行本次序列化
Netty對象的序列化轉換很好懂, ByteToMessageDecoder
和 MessageToByteEncoder
分別只要繼承它們,重寫方法後,獲取到Object和Byte,各自轉換就OK。
不過若是是有要用到生產上的同窗,建議不要使用 fastJson
,由於它的漏洞補丁真的是太多了,可使用google的 protostuff
。
public class RpcDecoder extends ByteToMessageDecoder { // 目標對象類型進行解碼 private Class<?> target; public RpcDecoder(Class target) { this.target = target; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { // 不夠長度丟棄 return; } in.markReaderIndex(); // 標記一下當前的readIndex的位置 int dataLength = in.readInt(); // 讀取傳送過來的消息的長度。ByteBuf 的readInt()方法會讓他的readIndex增長4 if (in.readableBytes() < dataLength) { // 讀到的消息體長度若是小於咱們傳送過來的消息長度,則resetReaderIndex. 這個配合markReaderIndex使用的。把readIndex重置到mark的地方 in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = JSON.parseObject(data, target); // 將byte數據轉化爲咱們須要的對象 out.add(obj); } } public class RpcEncoder extends MessageToByteEncoder { //目標對象類型進行編碼 private Class<?> target; public RpcEncoder(Class target) { this.target = target; } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if (target.isInstance(msg)) { byte[] data = JSON.toJSONBytes(msg); // 使用fastJson將對象轉換爲byte out.writeInt(data.length); // 先將消息長度寫入,也就是消息頭 out.writeBytes(data); // 消息體中包含咱們要發送的數據 } } }
NetyServer
public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request) msg; System.out.println("Client Data:" + JSON.toJSONString(request)); Response response = new Response(); response.setRequestId(request.getRequestId()); response.setResult("Hello Client !"); // client接收到信息後主動關閉掉鏈接 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } } public class NettyServer { private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); private String ip; private int port; public NettyServer(String ip, int port) { this.ip = ip; this.port = port; } public void server() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { final ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32 * 1024) .option(ChannelOption.SO_RCVBUF, 32 * 1024) .option(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new RpcDecoder(Request.class)) .addLast(new RpcEncoder(Response.class)) .addLast(new NettyServerHandler()); } }); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 開啓長鏈接 ChannelFuture future = serverBootstrap.bind(ip, port).sync(); // if (future.isSuccess()) { // // new Register().register("/yanzhenyidai/com.yanzhenyidai.server", ip + ":" + port); // } future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new NettyServer("127.0.0.1", 20000).server(); } }
關鍵名詞:
EventLoopGroup
Server端的EventLoopGroup分爲兩個,通常workerGroup做爲處理請求,bossGroup做爲接收請求。
ChannelOption
以上四個常量做爲TCP鏈接中的屬性。
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
NettyServerHandler中出現的 ChannelFutureListener.CLOSE
,做爲Server端主動關閉與Client端的通訊,若是沒有主動Close,那麼NettyClient將會一直處於阻塞狀態,得不到NettyServer的返回信息。
NettyClient
public class NettyClient extends SimpleChannelInboundHandler<Response> { private final String ip; private final int port; private Response response; public NettyClient(String ip, int port) { this.ip = ip; this.port = port; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception { this.response = response; } public Response client(Request request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { // 建立並初始化 Netty 客戶端 Bootstrap 對象 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new RpcDecoder(Response.class)); pipeline.addLast(new RpcEncoder(Request.class)); pipeline.addLast(NettyClient.this); } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); // String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":"); // 鏈接 RPC 服務器 ChannelFuture future = bootstrap.connect(ip, port).sync(); // 寫入 RPC 請求數據並關閉鏈接 Channel channel = future.channel(); channel.writeAndFlush(request).sync(); channel.closeFuture().sync(); return response; } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { Request request = new Request(); request.setRequestId(UUID.randomUUID().toString()); request.setParameter("Hello Server !"); System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 30000).client(request))); } }
測試
若是以上全部內容都準備就緒,那麼就能夠進行調試了。
啓動順序,先啓動NettyServer,再啓動NettyClient。
記得剛出來工做時,有工做不少年的同事問我了不瞭解Netty,當時工做過短,直說聽過Putty,如今回想起來真的挺丟人的,哈哈。😋
Netty做爲通訊框架,若是你瞭解TCP,並且項目中有相似傳輸信息的需求,又不想集成HTTP或者Socket,那麼Netty真的挺實用的。
參考資料:
本項目Github地址:Netty-RPC