介紹:
初步學習了一下netty,都知道Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。按照 http://netty.io/wiki/user-guide-for-4.x.html所作的介紹 嘗試編寫了幾個demo 最後加入本身寫的pojo對象,這裏只是一些入門介紹 ,對於原理以及它如何高性能,高可靠性 後續會繼續瞭解。後面會參考一個簡單rpc框架的搭建。這裏的版本html
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency>
第一個:咱們嘗試寫一個簡單的服務端程序 而且經過telnet 在服務端打印一些東西java
server端:bootstrap
package com.manyi.iw.agentcall.soa.server.service; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by huhaosumail on 16/12/22. */ public class Server { private int port; public Server(int port){ this.port=port; } public void run() throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup=new NioEventLoopGroup(); try{ ServerBootstrap b= new ServerBootstrap(); b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // socketChannel.pipeline().addLast(new TimeEncoder(),new TimeServerHandler()); socketChannel.pipeline().addLast(new TimeServerHandler()); } }).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true); //綁定和開始接受連接 ChannelFuture f= b.bind(port).sync(); f.channel().closeFuture().sync(); }finally { //關閉 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port=1080; new Server(port).run(); } }
serverhandler:服務器
package com.manyi.iw.agentcall.soa.server.service; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; /** * Created by huhaosumail on 16/12/23. */ public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); } } // @Override // public void channelActive(final ChannelHandlerContext ctx) throws Exception { //// final ByteBuf time= ctx.alloc().buffer(4); //// time.writeInt((int)(System.currentTimeMillis()/1000L+2208988800L)); //// final ChannelFuture f= ctx.writeAndFlush(time); //// f.addListener(new ChannelFutureListener() { //// @Override //// public void operationComplete(ChannelFuture channelFuture) throws Exception { //// assert f==channelFuture; //// ctx.close(); //// } //// }); // // ChannelFuture f=ctx.writeAndFlush(new Person(26,"胡浩是傻逼")); // f.addListener(ChannelFutureListener.CLOSE); // } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }
這裏啓動以後 終端經過telnet localhost 1080 能夠在終端輸入信息 並在控制檯打印 網絡
第一個:咱們嘗試編寫一個服務端 寫一些內容發送給客戶端 並打印這個pojo對象 不過這裏我用到序列化框架
不太規範 優先達到效果異步
server:socket
package com.manyi.iw.agentcall.soa.server.service; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by huhaosumail on 16/12/22. */ public class Server { private int port; public Server(int port){ this.port=port; } public void run() throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup=new NioEventLoopGroup(); try{ ServerBootstrap b= new ServerBootstrap(); b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new TimeEncoder(),new TimeServerHandler()); // socketChannel.pipeline().addLast(new TimeServerHandler()); } }).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true); //綁定和開始接受連接 ChannelFuture f= b.bind(port).sync(); f.channel().closeFuture().sync(); }finally { //關閉 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port=8080; new Server(port).run(); } }
TimeServerHandler:
package com.manyi.iw.agentcall.soa.server.service; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; /** * Created by huhaosumail on 16/12/23. */ public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); } } @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { // final ByteBuf time= ctx.alloc().buffer(4); // time.writeInt((int)(System.currentTimeMillis()/1000L+2208988800L)); // final ChannelFuture f= ctx.writeAndFlush(time); // f.addListener(new ChannelFutureListener() { // @Override // public void operationComplete(ChannelFuture channelFuture) throws Exception { // assert f==channelFuture; // ctx.close(); // } // }); ChannelFuture f=ctx.writeAndFlush(new Person(26,"胡浩是傻逼")); f.addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }
TimeEncoder:
package com.manyi.iw.agentcall.soa.server.service; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.io.Serializable; /** * Created by huhaosumail on 16/12/23. */ public class TimeEncoder extends MessageToByteEncoder<Person> implements Serializable { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Person person, ByteBuf byteBuf) throws Exception { // byteBuf.writeInt(person.getAge()); // byteBuf.writeBytes(person.getName().getBytes()); ByteArrayOutputStream bo=new ByteArrayOutputStream(); ObjectOutputStream oo=new ObjectOutputStream(bo); oo.writeObject(person); bo.close(); oo.close(); byteBuf.writeBytes(bo.toByteArray()); } }
client端:ide
package com.manyi.iw.agentcall.soa.server.service; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import sun.misc.Unsafe; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.channels.Selector; /** * Created by huhaosumail on 16/12/23. */ public class TimeClient { public static void main(String[] args) throws Exception{ EventLoopGroup workerGroup=new NioEventLoopGroup(); try{ Bootstrap b=new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE,true); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new TimeDecoder(),new TimeClientHandler()); } }); //開啓客戶端 ChannelFuture f=b.connect("localhost",8080).sync(); //等待直到鏈接關閉 f.channel().closeFuture().sync(); }finally { workerGroup.shutdownGracefully(); } } }
TimeDecoder:
package com.manyi.iw.agentcall.soa.server.service; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.io.Serializable; import java.util.List; /** * Created by huhaosumail on 16/12/23. */ public class TimeDecoder extends ByteToMessageDecoder implements Serializable { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { if(byteBuf.readableBytes()<4){ return; } // list.add(byteBuf.readBytes(4)); // list.add(new Person(byteBuf.readInt())); // // byte[] req=new byte[byteBuf.readableBytes()]; // byteBuf.readBytes(req); // list.add(new Person(new String(req,"UTF-8"))); byte[] req=new byte[byteBuf.readableBytes()]; byteBuf.readBytes(req); ByteArrayInputStream bi = new ByteArrayInputStream(req); ObjectInputStream oi = new ObjectInputStream(bi); Person p=(Person)oi.readObject(); bi.close(); oi.close(); list.add(p); } }
TimeClientHandler:
package com.manyi.iw.agentcall.soa.server.service; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; /** * Created by huhaosumail on 16/12/23. */ public class TimeClientHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // ByteBuf f= (ByteBuf)msg; // try{ // long currentTimeMills=(f.readUnsignedInt()-2208988800L)*1000L; // System.out.println(new Date(currentTimeMills)); // ctx.close(); // }finally { // f.release(); // } Person p=(Person)msg; System.out.println("name:"+p.getName()+",age:"+p.getAge()); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); ctx.close(); } }
效果工具