Netty是基於JDK NIO的網絡框架javascript
簡化了NIO編程, 不用程序本身維護selector, 將網絡通訊和數據處理的部分作了分離html
多用於作底層的數據通訊, 心跳檢測(keepalived)java
public class Server { public static void main(String[] args) throws Exception { // 1 建立線兩個事件循環組 // 一個是用於處理服務器端接收客戶端鏈接的 // 一個是進行網絡通訊的(網絡讀寫的) EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); // 2 建立輔助工具類ServerBootstrap,用於服務器通道的一系列配置 ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) // 綁定倆個線程組 .channel(NioServerSocketChannel.class) // 指定NIO的模式.NioServerSocketChannel對應TCP, NioDatagramChannel對應UDP .option(ChannelOption.SO_BACKLOG, 1024) // 設置TCP緩衝區 .option(ChannelOption.SO_SNDBUF, 32 * 1024) // 設置發送緩衝大小 .option(ChannelOption.SO_RCVBUF, 32 * 1024) // 這是接收緩衝大小 .option(ChannelOption.SO_KEEPALIVE, true) // 保持鏈接 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //SocketChannel創建鏈接後的管道 // 3 在這裏配置 通訊數據的處理邏輯, 能夠addLast多個... sc.pipeline().addLast(new ServerHandler()); } }); // 4 綁定端口, bind返回future(異步), 加上sync阻塞在獲取鏈接處 ChannelFuture cf1 = b.bind(8765).sync(); //ChannelFuture cf2 = b.bind(8764).sync(); //能夠綁定多個端口 // 5 等待關閉, 加上sync阻塞在關閉請求處 cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
SO_BACKLOG詳解:
服務器的TCP內核維護兩個隊列A和B
客戶端向服務端請求connect時, 發送SYN(第一次握手)
服務端收到SYN後, 向客戶端發送SYN ACK(第二次握手), TCP內核將鏈接放入隊列A
客戶端收到後向服務端發送ACK(第三次握手), TCP內核將鏈接從A->B, accept返回, 鏈接完成
A/B隊列的長度和即爲BACKLOG, 當accept速度跟不上, A/B隊列使得BACKLOG滿了, 客戶端鏈接就會被TCP內核拒絕
能夠調大backlog緩解這一現象, 經驗值~100web
public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.println("Server :" + body ); String response = "返回給客戶端的響應:" + body ; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); // future完成後觸發監聽器, 此處是寫完即關閉(短鏈接). 所以須要關閉鏈接時, 要經過server端關閉. 直接關閉用方法ctx[.channel()].close() //.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("讀完了"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } }
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //可使用多個端口 //發送消息, Buffer類型. write須要flush才發送, 可用writeFlush代替 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes())); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes())); Thread.sleep(2000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes())); //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("999".getBytes())); cf1.channel().closeFuture().sync(); //cf2.channel().closeFuture().sync(); group.shutdownGracefully(); } }
public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "utf-8"); System.out.println("Client :" + body ); } finally { // 記得釋放xxxHandler裏面的方法的msg參數: 寫(write)數據, msg引用將被自動釋放不用手動處理; 但只讀數據時,!必須手動釋放引用數 ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
TCP/IP確保了包的傳送, 包的順序等, 但編程中還須要解決拆包粘包問題正則表達式
-> 接收的一連串包中的數據, 處理的分隔在哪裏? 基本解決方案:編程
1)特殊字符做爲結束分隔符數組
2)消息定長. 固定包的長度, 長度不夠用空格補全. 接收方須要trim, 效率不高不推薦瀏覽器
3)自定義協議. 在消息頭中包含消息總長度的字段. 須要安全性時能夠考慮.緩存
public class Server { public static void main(String[] args) throws Exception { EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // 使用DelimiterBasedFrameDecoder設置結尾分隔符$_ ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); // 設置字符串形式的解碼. 通過StringDecoder, Handler回調方法中接收的msg的具體類型就是String了(再也不是ByteBuffer). 但寫時仍須要傳入ByteBuffer sc.pipeline().addLast(new StringDecoder()); // 通訊數據的處理邏輯 sc.pipeline().addLast(new ServerHandler()); } }); //4 綁定鏈接 ChannelFuture cf = b.bind(8765).sync(); //等待服務器監聽端口關閉 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Server :" + msg); String response = "服務器響應: " + msg + "$_"; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } }
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("bbbb$_".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("cccc$_".getBytes())); cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String response = (String) msg; System.out.println("Client: " + response); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //設置定長字符串接收, 定長爲5, 積累到5個字節纔會把數據發出去 sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); //設置字符串形式的解碼 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String request = (String)msg; System.out.println("Server :" + msg); String response = request ; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { } }
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaa".getBytes())); cf.channel().writeAndFlush(Unpooled.copiedBuffer("bbccccc".getBytes())); cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String response = (String) msg; System.out.println("Client: " + response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { } }
即對象序列化技術, 目的是爲了實現對象的網絡傳輸和本地持久化
若是使用java的序列化, 碼流較大. 所以多用Marshalling, Kyro(基於Protobuf)安全
下面的例子, 使用編解碼傳輸javabean(Marshalling的javabean須要實現serializable), 並將message進行gzip壓縮
自定義編解碼器
public final class MarshallingCodeCFactory { // 解碼 public static MarshallingDecoder buildMarshallingDecoder() { //建立工廠對象, 參數serial指建立的是java對象序列化的工廠對象 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //建立配置對象,版本號爲5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根據工廠對象和配置對象建立解碼provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //建立解碼器對象. 第一個參數是provider, 第二個參數是單個消息序列化後的最大長度, 超事後拒絕處理 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); return decoder; } // 編碼 public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //建立編碼器對象. 用於將實現Serializable接口的JavaBean序列化爲二進制數組 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
javabean
public class Request implements Serializable { // 標記Serializable接口 private String id ; private String name ; private String requestMessage ; private byte[] attachment; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } public byte[] getAttachment() { return attachment; } public void setAttachment(byte[] attachment) { this.attachment = attachment; } }
public class Response implements Serializable { // 標記Serializable接口 private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
GZip壓縮的Util
public class GzipUtils { public static byte[] gzip(byte[] data) throws Exception { ByteArrayOutputStream bos = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(bos); gzip.write(data); gzip.finish(); gzip.close(); byte[] ret = bos.toByteArray(); bos.close(); return ret; } public static byte[] ungzip(byte[] data) throws Exception{ ByteArrayInputStream bis = new ByteArrayInputStream(data); GZIPInputStream gzip = new GZIPInputStream(bis); byte[] buf = new byte[1024]; int num = -1; ByteArrayOutputStream bos = new ByteArrayOutputStream(); while((num = gzip.read(buf)) != -1 ){ bos.write(buf, 0, num); } gzip.close(); bis.close(); byte[] ret = bos.toByteArray(); bos.close(); return ret; } }
服務端與客戶端
public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設置日誌 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { // 添加編解碼. 發送自定義的類型, 而Handler的方法接收的msg參數的實際類型也是相應的自定義類了 sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); }
public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request req = (Request)msg; System.out.println("Server : " + req.getId() + ", " + req.getName() + ", " + req.getRequestMessage()); byte[] attachment = GzipUtils.ungzip(req.getAttachment()); String path = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "001.jpg"; FileOutputStream fos = new FileOutputStream(path); fos.write(attachment); fos.close(); Response resp = new Response(); resp.setId(req.getId()); resp.setName("resp" + req.getId()); resp.setResponseMessage("響應內容" + req.getId()); ctx.writeAndFlush(resp);//.addListener(ChannelFutureListener.CLOSE); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
public class Client { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); for(int i = 0; i < 5; i++){ Request req = new Request(); req.setId("" + i); req.setName("req" + i); req.setRequestMessage("數據信息" + i); String path = System.getProperty("user.dir") + File.separatorChar + "sources" + File.separatorChar + "001.jpg"; File file = new File(path); FileInputStream in = new FileInputStream(file); byte[] data = new byte[in.available()]; in.read(data); in.close(); req.setAttachment(GzipUtils.gzip(data)); //壓縮 cf.channel().writeAndFlush(req); } cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Response resp = (Response) msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
1.長鏈接, 一致保持着鏈接不主動中斷, 實時性強
2.短鏈接. 數據放在緩存, 一次性批量提交全部數據, 服務端接收後即關閉鏈接
以上兩種根據是否給ChannelHandlerContext添加ChannelFutureListener.ClOSE監聽器實現
3.長鏈接, 必定時間不活躍則關閉鏈接. 給SocketChannel添加ReadTimeoutHandler實現. 實例以下:
public final class MarshallingCodeCFactory { public static MarshallingDecoder buildMarshallingDecoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
public class Request implements Serializable{ private String id ; private String name ; private String requestMessage ; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } }
public class Response implements Serializable{ private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設置日誌 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(5)); // 時限, 讀客戶端超時沒數據則斷開 sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request) msg; System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage()); Response response = new Response(); response.setId(request.getId()); response.setName("response" + request.getId()); response.setResponseMessage("響應內容" + request.getId()); ctx.writeAndFlush(response); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
public class Client { private EventLoopGroup group; private Bootstrap b; private ChannelFuture cf ; // 單例 private static class SingletonHolder { static final Client instance = new Client(); } public static Client getInstance(){ return SingletonHolder.instance; } private Client(){ group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通訊,則會關閉通道) sc.pipeline().addLast(new ReadTimeoutHandler(5)); // 時限5s, 讀服務端超時沒數據則斷開 sc.pipeline().addLast(new ClientHandler()); } }); } public void connect(){ try { this.cf = b.connect("127.0.0.1", 8765).sync(); System.out.println("遠程服務器已經鏈接, 能夠進行數據交換"); } catch (Exception e) { e.printStackTrace(); } } public ChannelFuture getChannelFuture(){ if(this.cf == null) { //初次鏈接 this.connect(); } if(!this.cf.channel().isActive()){ //重連 this.connect(); } return this.cf; } public static void main(String[] args) throws Exception{ final Client c = Client.getInstance(); ChannelFuture cf = c.getChannelFuture(); for(int i = 1; i <= 3; i++ ){ Request request = new Request(); request.setId("" + i); request.setName("request" + i); request.setRequestMessage("數據信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4); //間隔4s發送一次數據 } cf.channel().closeFuture().sync(); //阻塞至超時關閉 // 這裏用子線程重連併發送數據一次 new Thread(new Runnable() { @Override public void run() { try { System.out.println("進入子線程重連一次"); ChannelFuture cf = c.getChannelFuture(); assert true == cf.channel().isActive(); //斷言 //再次發送數據 Request request = new Request(); request.setId("" + 4); request.setName("request" + 4); request.setRequestMessage("數據信息" + 4); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子線程完成"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("斷開鏈接,主線程結束.."); } }
public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Response resp = (Response) msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
public class Server { public void run(int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioDatagramChannel.class) // UDP: NioDatagramChannel .option(ChannelOption.SO_BROADCAST, true) // 廣播 .handler(new ServerHandler()); b.bind(port).sync().channel().closeFuture().await(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new Server().run(8765); } }
public class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> { // 諺語列表 private static final String[] DICTIONARY = { "只要功夫深,鐵棒磨成針。", "舊時王謝堂前燕,飛入尋常百姓家。", "洛陽親友如相問,一片冰心在玉壺。", "一寸光陰一寸金,寸金難買寸光陰。", "老驥伏櫪,志在千里。烈士暮年,壯心不已!" }; private String nextQuote() { int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length); return DICTIONARY[quoteId]; } @Override public void messageReceived(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { String req = packet.content().toString(CharsetUtil.UTF_8); System.out.println(req); if ("諺語字典查詢?".equals(req)) { ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("諺語查詢結果: " + nextQuote(), CharsetUtil.UTF_8), packet.sender())); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); cause.printStackTrace(); } }
public class Client { public void run(int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new ClientHandler()); Channel ch = b.bind(0).sync().channel(); // 向網段內的全部機器廣播UDP消息 ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("諺語字典查詢?", CharsetUtil.UTF_8), new InetSocketAddress("255.255.255.255", port))).sync(); if (!ch.closeFuture().await(15000)) { System.out.println("查詢超時!"); } } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new Client().run(8765); } }
public class ClientHandler extends SimpleChannelInboundHandler<DatagramPacket> { @Override public void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { String response = msg.content().toString(CharsetUtil.UTF_8); if (response.startsWith("諺語查詢結果: ")) { System.out.println(response); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
集羣中主服務器須要知道從服務器的狀態
所以client每隔5~10秒給server發送心跳包
可經過netty與定時任務來實現
public final class MarshallingCodeCFactory { public static MarshallingDecoder buildMarshallingDecoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1); return decoder; } public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
public class RequestInfo implements Serializable { private String ip ; private HashMap<String, Object> cpuPercMap ; private HashMap<String, Object> memoryMap; public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public HashMap<String, Object> getCpuPercMap() { return cpuPercMap; } public void setCpuPercMap(HashMap<String, Object> cpuPercMap) { this.cpuPercMap = cpuPercMap; } public HashMap<String, Object> getMemoryMap() { return memoryMap; } public void setMemoryMap(HashMap<String, Object> memoryMap) { this.memoryMap = memoryMap; } }
public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設置日誌 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHeartBeatHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
public class ServerHeartBeatHandler extends ChannelHandlerAdapter { private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>(); private static final String SUCCESS_KEY = "auth_success_key"; static { AUTH_IP_MAP.put("127.0.0.1", "1234"); } private boolean auth(ChannelHandlerContext ctx, Object msg){ String [] ret = ((String) msg).split(","); String auth = AUTH_IP_MAP.get(ret[0]); if(auth != null && auth.equals(ret[1])){ // 認證成功, 返回確認信息 ctx.writeAndFlush(SUCCESS_KEY); return true; } else { ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); return false; } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ auth(ctx, msg); } else if (msg instanceof RequestInfo) { RequestInfo info = (RequestInfo) msg; System.out.println("--------------------------------------------"); System.out.println("當前主機ip爲: " + info.getIp()); System.out.println("當前主機cpu狀況: "); HashMap<String, Object> cpu = info.getCpuPercMap(); System.out.println("總使用率: " + cpu.get("combined")); System.out.println("用戶使用率: " + cpu.get("user")); System.out.println("系統使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空閒率: " + cpu.get("idle")); System.out.println("當前主機memory狀況: "); HashMap<String, Object> memory = info.getMemoryMap(); System.out.println("內存總量: " + memory.get("total")); System.out.println("當前內存使用量: " + memory.get("used")); System.out.println("當前內存剩餘量: " + memory.get("free")); System.out.println("--------------------------------------------"); ctx.writeAndFlush("info received!"); } else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } } }
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClienHeartBeatHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
public class ClienHeartBeatHandler extends ChannelHandlerAdapter { private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledFuture<?> heartBeat; //定時任務 //主動向服務器發送認證信息 private InetAddress addr ; private static final String SUCCESS_KEY = "auth_success_key"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { addr = InetAddress.getLocalHost(); //String ip = addr.getHostAddress(); String ip = "127.0.0.1"; String key = "1234"; //證書 String auth = ip + "," + key; // 發送認證 ctx.writeAndFlush(auth); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if(msg instanceof String){ String ret = (String) msg; if(SUCCESS_KEY.equals(ret)){ // 收到認證 確認信息,設置每隔5秒發送心跳消息 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 5, TimeUnit.SECONDS); System.out.println(msg); } else { // 收到心跳包 確認信息 System.out.println(msg); } } } finally { // 只讀, 須要手動釋放引用計數 ReferenceCountUtil.release(msg); } } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { try { RequestInfo info = new RequestInfo(); //ip info.setIp(addr.getHostAddress()); Sigar sigar = new Sigar(); //cpu prec CpuPerc cpuPerc = sigar.getCpuPerc(); HashMap<String, Object> cpuPercMap = new HashMap<String, Object>(); cpuPercMap.put("combined", cpuPerc.getCombined()); cpuPercMap.put("user", cpuPerc.getUser()); cpuPercMap.put("sys", cpuPerc.getSys()); cpuPercMap.put("wait", cpuPerc.getWait()); cpuPercMap.put("idle", cpuPerc.getIdle()); // memory Mem mem = sigar.getMem(); HashMap<String, Object> memoryMap = new HashMap<String, Object>(); memoryMap.put("total", mem.getTotal() / 1024L); memoryMap.put("used", mem.getUsed() / 1024L); memoryMap.put("free", mem.getFree() / 1024L); info.setCpuPercMap(cpuPercMap); info.setMemoryMap(memoryMap); ctx.writeAndFlush(info); } catch (Exception e) { e.printStackTrace(); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 取消定時發送心跳包的任務 if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } } }
public final class HttpHelloWorldServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080")); public static void main(String[] args) throws Exception { final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.option(ChannelOption.SO_BACKLOG, 1024); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new HttpHelloWorldServerInitializer(sslCtx)); Channel ch = b.bind(PORT).sync().channel(); System.err.println("Open your web browser and navigate to " + (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public HttpHelloWorldServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(new HttpServerCodec()); // !使用http通訊, HttpRequest和HttpResponse p.addLast(new HttpHelloWorldServerHandler()); } }
public class HttpHelloWorldServerHandler extends ChannelHandlerAdapter { private static final byte[] CONTENT = "HELLO WORLD".getBytes(); @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof HttpRequest) { HttpRequest req = (HttpRequest) msg; if (HttpHeaderUtil.is100ContinueExpected(req)) { ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE)); } boolean keepAlive = HttpHeaderUtil.isKeepAlive(req); // 構造響應 FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(CONTENT)); response.headers().set(CONTENT_TYPE, "text/plain;charset=UTF-8"); response.headers().setInt(CONTENT_LENGTH, response.content().readableBytes()); if (!keepAlive) { // Request短鏈接, 寫完後直接關閉 ctx.write(response).addListener(ChannelFutureListener.CLOSE); } else { // 長鏈接, response也設置爲KEEP_ALIVE response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE); ctx.write(response); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
public class HttpDownloadServer { private static final String DEFAULT_URL = "/sources/"; public void run(final int port, final String url) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // addLast的第一項爲key, 自定義的 // request解碼器 ch.pipeline().addLast("http-decoder", new HttpRequestDecoder()); // response的編碼器 ch.pipeline().addLast("http-encoder", new HttpResponseEncoder()); // chunked, 傳輸文件時分多個response分解地傳輸文件 ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); // ObjectAggregator, 將多個response合併爲一個FullHttpResponse ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536)); // 自定義業務邏輯handler ch.pipeline().addLast("fileServerHandler", new HttpDownoadServerHandler(url)); } }); ChannelFuture future = b.bind("127.0.0.1", port).sync(); System.out.println("HTTP文件目錄服務器啓動,網址是 : " + "http://localhost:" + port + url); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8765; new HttpDownloadServer().run(port, DEFAULT_URL); } }
// 注意這裏繼承了SimpleChannelInboundHandler<T>, 含泛型, 即指定了傳入參數msg的類型 public class HttpDownoadServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String url; public HttpDownoadServerHandler(String url) { this.url = url; } @Override public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { //是否能理解(解碼)請求 if (!request.decoderResult().isSuccess()) { // 400 sendError(ctx, BAD_REQUEST); return; } //對請求的方法進行判斷:若是不是GET方法則返回異常 if (request.method() != GET) { // 405 sendError(ctx, METHOD_NOT_ALLOWED); return; } //獲取請求uri路徑 final String uri = request.uri(); //對url進行分析,返回本地路徑 final String path = parseURI(uri); //若是 路徑構造不合法,則path爲null if (path == null) { //403 sendError(ctx, FORBIDDEN); return; } // 建立file對象 File file = new File(path); // 文件隱藏或不存在 if (file.isHidden() || !file.exists()) { // 404 sendError(ctx, NOT_FOUND); return; } // 是文件夾 if (file.isDirectory()) { if (uri.endsWith("/")) { //若是以正常"/"結束 說明是訪問的一個文件目錄:則進行展現文件列表 sendListing(ctx, file); } else { //若是非"/"結束 則重定向,讓客戶端補全"/"並再次請求 sendRedirect(ctx, uri + '/'); } return; } // 若是所建立的file對象不是文件類型 if (!file.isFile()) { // 403 sendError(ctx, FORBIDDEN); return; } //隨機文件讀寫對象 RandomAccessFile randomAccessFile = null; try { randomAccessFile = new RandomAccessFile(file, "r");// 以只讀的方式打開文件 } catch (FileNotFoundException fnfe) { // 404 sendError(ctx, NOT_FOUND); return; } //獲取文件長度 long fileLength = randomAccessFile.length(); //創建響應對象 HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); //設置響應信息 HttpHeaderUtil.setContentLength(response, fileLength); //設置Content-Type setContentTypeHeader(response, file); //設置爲KeepAlive if (HttpHeaderUtil.isKeepAlive(request)) { response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE); } //輸出response header, HttpObjectAggregator能將其與下面輸出整合合併 ctx.write(response); //寫出ChunkedFile. 建立ChunkedFile須要使用RandomAccessFile並設置分段. 這裏每次傳輸8192個字節 ChannelFuture sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise()); //添加傳輸監聽 sendFileFuture.addListener(new ChannelProgressiveFutureListener() { @Override public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) { if (total < 0) { System.err.println("Transfer progress: " + progress); } else { System.err.println("Transfer progress: " + progress + " / " + total); } } @Override public void operationComplete(ChannelProgressiveFuture future) throws Exception { System.out.println("Transfer complete."); } }); //使用Chunked, 完成時須要發送標記結束的空消息體! ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); //若是當前鏈接請求非Keep-Alive, 最後一包消息發送完後, 服務器主動關閉鏈接 if (!HttpHeaderUtil.isKeepAlive(request)) { lastContentFuture.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (ctx.channel().isActive()) { // 500 sendError(ctx, INTERNAL_SERVER_ERROR); ctx.close(); } } //判斷非法URI的正則 private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*"); private String parseURI(String uri) { try { //使用UTF-8字符集 uri = URLDecoder.decode(uri, "UTF-8"); } catch (UnsupportedEncodingException e) { try { //嘗試ISO-8859-1 uri = URLDecoder.decode(uri, "ISO-8859-1"); } catch (UnsupportedEncodingException e1) { //拋出預想外異常信息 throw new Error(); } } // 對uri進行細粒度判斷:4步驗證操做 // step 1 基礎驗證 if (!uri.startsWith(url)) { return null; } // step 2 基礎驗證 if (!uri.startsWith("/")) { return null; } // step 3 將文件分隔符替換爲本地操做系統的文件路徑分隔符 uri = uri.replace('/', File.separatorChar); // step 4 驗證路徑合法性 if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri.startsWith(".") || uri.endsWith(".") || INSECURE_URI.matcher(uri).matches()) { return null; } //利用當前工程所在目錄 + URI相對路徑 構造絕對路徑 return System.getProperty("user.dir") + File.separator + uri; } //用正則表達式過濾文件名 private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*"); //文件列表, 拼html文件 private static void sendListing(ChannelHandlerContext ctx, File dir) { // 設置響應對象 FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK); // 響應頭 response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8"); // 構造文本內容 StringBuilder ret = new StringBuilder(); String dirPath = dir.getPath(); ret.append("<!DOCTYPE html>\r\n"); ret.append("<html><head><title>"); ret.append(dirPath); ret.append(" 目錄:"); ret.append("</title></head><body>\r\n"); ret.append("<h3>"); ret.append(dirPath).append(" 目錄:"); ret.append("</h3>\r\n"); ret.append("<ul>"); ret.append("<li>連接:<a href=\"../\">..</a></li>\r\n"); // 遍歷文件, 生成超連接 for (File f : dir.listFiles()) { //step 1: 跳過隱藏文件和不可讀文件 if (f.isHidden() || !f.canRead()) { continue; } String name = f.getName(); //step 2: 跳過正則過濾的文件名 if (!ALLOWED_FILE_NAME.matcher(name).matches()) { continue; } ret.append("<li>連接:<a href=\""); ret.append(name); ret.append("\">"); ret.append(name); ret.append("</a></li>\r\n"); } ret.append("</ul></body></html>\r\n"); //構造ByteBuf,寫入緩衝區 ByteBuf buffer = Unpooled.copiedBuffer(ret, CharsetUtil.UTF_8); //進行寫出操做 response.content().writeBytes(buffer); //重置ByteBuf buffer.release(); //發送完成並主動關閉鏈接 ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } //重定向操做 private static void sendRedirect(ChannelHandlerContext ctx, String newUri) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND); response.headers().set(LOCATION, newUri); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } //錯誤信息 private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString()+ "\r\n", CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private static void setContentTypeHeader(HttpResponse response, File file) { //使用mime對象獲取文件對應的Content-Type MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap(); response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath())); } }
實際應用中文件上傳服務端有成熟的框架fastDFS(小文件)和HDFS(大文件)
如要實現斷點續傳, 須要記錄上傳進度. 參考HTTP頭的Range和Content-Range
public final class HttpUploadServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8080")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.handler(new LoggingHandler(LogLevel.INFO)); b.childHandler(new HttpUploadServerInitializer(sslCtx)); Channel ch = b.bind(PORT).sync().channel(); System.err.println("Open your web browser and navigate to " + (SSL ? "https" : "http") + "://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class HttpUploadServerInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; public HttpUploadServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new HttpRequestDecoder()); pipeline.addLast(new HttpResponseEncoder()); // 壓縮 pipeline.addLast(new HttpContentCompressor()); pipeline.addLast(new HttpUploadServerHandler()); } }
public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObject> { private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName()); private HttpRequest request; private boolean readingChunks; private final StringBuilder responseContent = new StringBuilder(); private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); // 大小超過minsize放磁盤上 private HttpPostRequestDecoder decoder; static { DiskFileUpload.deleteOnExitTemporaryFile = true; //退出時是否刪除臨時文件 DiskFileUpload.baseDirectory = "D:" + File.separatorChar + "aa"; //文件存儲路徑 DiskAttribute.deleteOnExitTemporaryFile = true; //退出時是否刪除臨時文件 DiskAttribute.baseDirectory = "D:" + File.separatorChar + "aa"; //文件存儲路徑 } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (decoder != null) { decoder.cleanFiles(); } } @Override public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { // HttpRequest傳輸頭 HttpRequest request = this.request = (HttpRequest) msg; URI uri = new URI(request.uri()); if (!uri.getPath().startsWith("/form")) { // 返回上傳菜單 writeMenu(ctx); return; } // 拼接反饋內容 responseContent.setLength(0); responseContent.append("WELCOME TO THE WILD WILD WEB SERVER\r\n"); responseContent.append("===================================\r\n"); responseContent.append("VERSION: " + request.protocolVersion().text() + "\r\n"); responseContent.append("REQUEST_URI: " + request.uri() + "\r\n\r\n"); responseContent.append("\r\n\r\n"); for (Entry<CharSequence, CharSequence> entry : request.headers()) { responseContent.append("HEADER: " + entry.getKey() + '=' + entry.getValue() + "\r\n"); } responseContent.append("\r\n\r\n"); Set<Cookie> cookies = null; String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE); if (value == null) { cookies = Collections.emptySet(); } else { cookies = ServerCookieDecoder.decode(value); } for (Cookie cookie : cookies) { responseContent.append("COOKIE: " + cookie + "\r\n"); } responseContent.append("\r\n\r\n"); QueryStringDecoder decoderQuery = new QueryStringDecoder(request.uri()); Map<String, List<String>> uriAttributes = decoderQuery.parameters(); for (Entry<String, List<String>> attr: uriAttributes.entrySet()) { for (String attrVal: attr.getValue()) { responseContent.append("URI: " + attr.getKey() + '=' + attrVal + "\r\n"); } } responseContent.append("\r\n\r\n"); // GET方法, 就此return if (request.method().equals(HttpMethod.GET)) { responseContent.append("\r\n\r\nEND OF GET CONTENT\r\n"); return; } // POST方法 try { decoder = new HttpPostRequestDecoder(factory, request); } catch (ErrorDataDecoderException e1) { e1.printStackTrace(); responseContent.append(e1.getMessage()); writeResponse(ctx.channel()); ctx.channel().close(); return; } readingChunks = HttpHeaderUtil.isTransferEncodingChunked(request); responseContent.append("Is Chunked: " + readingChunks + "\r\n"); responseContent.append("IsMultipart: " + decoder.isMultipart() + "\r\n"); if (readingChunks) { responseContent.append("Chunks: "); } } if (decoder != null) { if (msg instanceof HttpContent) { //HttpContent具體傳輸的內容 // 讀取到一個chunk HttpContent chunk = (HttpContent) msg; try { decoder.offer(chunk); } catch (ErrorDataDecoderException e1) { e1.printStackTrace(); responseContent.append(e1.getMessage()); writeResponse(ctx.channel()); ctx.channel().close(); return; } responseContent.append('o'); //每讀一個chunk標記一個'o' readHttpDataChunkByChunk(); // 最後一塊chunk if (chunk instanceof LastHttpContent) { writeResponse(ctx.channel()); readingChunks = false; reset(); } } } else { writeResponse(ctx.channel()); } } private void reset() { request = null; decoder.destroy(); //釋放資源 decoder = null; } private void readHttpDataChunkByChunk() throws Exception { try { while (decoder.hasNext()) { InterfaceHttpData data = decoder.next(); if (data != null) { try { writeHttpData(data); } finally { data.release(); } } } } catch (EndOfDataDecoderException e1) { responseContent.append("\r\n\r\nEND OF CONTENT CHUNK BY CHUNK\r\n\r\n"); } } private void writeHttpData(InterfaceHttpData data) throws Exception { if (data.getHttpDataType() == HttpDataType.Attribute) { Attribute attribute = (Attribute) data; String value = null; try { value = attribute.getValue(); } catch (IOException e1) { e1.printStackTrace(); responseContent.append("\r\nBODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " Error while reading value: " + e1.getMessage() + "\r\n"); return; } if (value.length() > 100) { responseContent.append("\r\nBODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute.getName() + " data too long\r\n"); } else { responseContent.append("\r\nBODY Attribute: " + attribute.getHttpDataType().name() + ": " + attribute + "\r\n"); } } else { responseContent.append("\r\n -----------start-------------" + "\r\n"); responseContent.append("\r\nBODY FileUpload: " + data.getHttpDataType().name() + ": " + data + "\r\n"); responseContent.append("\r\n ------------end------------" + "\r\n"); if (data.getHttpDataType() == HttpDataType.FileUpload) { FileUpload fileUpload = (FileUpload) data; if (fileUpload.isCompleted()) { System.out.println("file name : " + fileUpload.getFilename()); System.out.println("file length: " + fileUpload.length()); System.out.println("file maxSize : " + fileUpload.getMaxSize()); System.out.println("file path :" + fileUpload.getFile().getPath()); System.out.println("file absolutepath :" + fileUpload.getFile().getAbsolutePath()); System.out.println("parent path :" + fileUpload.getFile().getParentFile()); if (fileUpload.length() < 1024 * 1024 * 10) { responseContent.append("\tContent of file\r\n"); try { responseContent.append(fileUpload.getString(fileUpload.getCharset())); } catch (Exception e1) { e1.printStackTrace(); } responseContent.append("\r\n"); } else { responseContent.append("\tFile too long to be printed out:" + fileUpload.length() + "\r\n"); } fileUpload.renameTo(new File(fileUpload.getFile().getPath())); // 核心操做, 寫文件 decoder.removeHttpDataFromClean(fileUpload); } else { responseContent.append("\tFile to be continued but should not!\r\n"); } } } } private void writeResponse(Channel channel) { ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8); responseContent.setLength(0); // 是不是短鏈接 boolean close = request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE, true) || request.protocolVersion().equals(HttpVersion.HTTP_1_0) && !request.headers().contains(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE, true); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8"); // 最後一次鏈接不須要Content-Length if (!close) { response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes()); } Set<Cookie> cookies = null; String value = request.headers().getAndConvert(HttpHeaderNames.COOKIE); if (value == null) { cookies = Collections.emptySet(); } else { cookies = ServerCookieDecoder.decode(value); } if (!cookies.isEmpty()) { for (Cookie cookie : cookies) { response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.encode(cookie)); } } ChannelFuture future = channel.writeAndFlush(response); if (close) { future.addListener(ChannelFutureListener.CLOSE); } } //拼接上傳頁html菜單 private void writeMenu(ChannelHandlerContext ctx) { responseContent.setLength(0); // create Pseudo Menu responseContent.append("<html>"); responseContent.append("<head>"); responseContent.append("<title>Netty Test Form</title>\r\n"); responseContent.append("</head>\r\n"); responseContent.append("<body bgcolor=white><style>td{font-size: 12pt;}</style>"); responseContent.append("<table border=\"0\">"); responseContent.append("<tr>"); responseContent.append("<td>"); responseContent.append("<h1>Netty Test Form</h1>"); responseContent.append("Choose one FORM"); responseContent.append("</td>"); responseContent.append("</tr>"); responseContent.append("</table>\r\n"); // GET responseContent.append("<CENTER>GET FORM<HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>"); responseContent.append("<FORM ACTION=\"/formget\" METHOD=\"GET\">"); responseContent.append("<input type=hidden name=getform value=\"GET\">"); responseContent.append("<table border=\"0\">"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"info\" size=10></td></tr>"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"secondinfo\" size=20>"); responseContent.append("<tr><td>Fill with value: <br> <textarea name=\"thirdinfo\" cols=40 rows=10></textarea>"); responseContent.append("</td></tr>"); responseContent.append("<tr><td><INPUT TYPE=\"submit\" NAME=\"Send\" VALUE=\"Send\"></INPUT></td>"); responseContent.append("<td><INPUT TYPE=\"reset\" NAME=\"Clear\" VALUE=\"Clear\" ></INPUT></td></tr>"); responseContent.append("</table></FORM>\r\n"); responseContent.append("<CENTER><HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>"); // POST responseContent.append("<CENTER>POST FORM<HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>"); responseContent.append("<FORM ACTION=\"/formpost\" METHOD=\"POST\">"); responseContent.append("<input type=hidden name=getform value=\"POST\">"); responseContent.append("<table border=\"0\">"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"info\" size=10></td></tr>"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"secondinfo\" size=20>"); responseContent.append("<tr><td>Fill with value: <br> <textarea name=\"thirdinfo\" cols=40 rows=10></textarea>"); responseContent.append("<tr><td>Fill with file (only file name will be transmitted): <br> <input type=file name=\"myfile\">"); responseContent.append("</td></tr>"); responseContent.append("<tr><td><INPUT TYPE=\"submit\" NAME=\"Send\" VALUE=\"Send\"></INPUT></td>"); responseContent.append("<td><INPUT TYPE=\"reset\" NAME=\"Clear\" VALUE=\"Clear\" ></INPUT></td></tr>"); responseContent.append("</table></FORM>\r\n"); responseContent.append("<CENTER><HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>"); // POST with enctype="multipart/form-data" responseContent.append("<CENTER>POST MULTIPART FORM<HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>"); responseContent.append("<FORM ACTION=\"/formpostmultipart\" ENCTYPE=\"multipart/form-data\" METHOD=\"POST\">"); responseContent.append("<input type=hidden name=getform value=\"POST\">"); responseContent.append("<table border=\"0\">"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"info\" size=10></td></tr>"); responseContent.append("<tr><td>Fill with value: <br> <input type=text name=\"secondinfo\" size=20>"); responseContent.append("<tr><td>Fill with value: <br> <textarea name=\"thirdinfo\" cols=40 rows=10></textarea>"); responseContent.append("<tr><td>Fill with file: <br> <input type=file name=\"myfile\">"); responseContent.append("</td></tr>"); responseContent.append("<tr><td><INPUT TYPE=\"submit\" NAME=\"Send\" VALUE=\"Send\"></INPUT></td>"); responseContent.append("<td><INPUT TYPE=\"reset\" NAME=\"Clear\" VALUE=\"Clear\" ></INPUT></td></tr>"); responseContent.append("</table></FORM>\r\n"); responseContent.append("<CENTER><HR WIDTH=\"75%\" NOSHADE color=\"blue\"></CENTER>"); responseContent.append("</body>"); responseContent.append("</html>"); ByteBuf buf = copiedBuffer(responseContent.toString(), CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8"); response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, buf.readableBytes()); ctx.channel().writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.log(Level.WARNING, responseContent.toString(), cause); ctx.channel().close(); } }
public class WebSocketServer { public void run(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); pipeline.addLast("handler", new WebSocketServerHandler()); } }); Channel ch = b.bind(port).sync().channel(); System.out.println("Web socket server started at port " + port + '.'); System.out.println("Open your browser and navigate to http://localhost:" + port + '/'); ch.closeFuture().sync(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } public static void main(String[] args) throws Exception { new WebSocketServer().run(8765); } }
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName()); private WebSocketServerHandshaker handshaker; @Override public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { // 傳統的HTTP接入 if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } // WebSocket接入 else if (msg instanceof WebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { // 若是HTTP解碼失敗,返回HTTP異常 if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); return; } // 構造握手響應返回,本機測試 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判斷是不是關閉鏈路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判斷是不是Ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 本例程僅支持文本消息,不支持二進制消息 if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); } // 返回應答消息 String request = ((TextWebSocketFrame) frame).text(); if (logger.isLoggable(Level.FINE)) { logger.fine(String.format("%s received %s", ctx.channel(), request)); } ctx.channel().write( new TextWebSocketFrame(request + " , 歡迎使用Netty WebSocket服務,如今時刻:" + new java.util.Date().toString())); } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { // 返回應答給客戶端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); HttpHeaderUtil.setContentLength(res, res.content().readableBytes()); } // 若是是非Keep-Alive,關閉鏈接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!HttpHeaderUtil.isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
客戶端是網頁
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> Netty WebSocket 時間服務器 </head> <br> <body> <br> <script type="text/javascript"> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { socket = new WebSocket("ws://localhost:8765/websocket"); socket.onmessage = function(event) { var ta = document.getElementById('responseText'); ta.value = ""; ta.value = event.data }; socket.onopen = function(event) { var ta = document.getElementById('responseText'); ta.value = "打開WebSocket服務正常,瀏覽器支持WebSocket!"; }; socket.onclose = function(event) { var ta = document.getElementById('responseText'); ta.value = ""; ta.value = "WebSocket 關閉!"; }; } else { alert("抱歉,您的瀏覽器不支持WebSocket協議!"); } function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("WebSocket鏈接沒有創建成功!"); } } </script> <form onsubmit="return false;"> <input type="text" name="message" value="Netty最佳實踐" /> <br> <br> <input type="button" value="發送WebSocket請求消息" onclick="send(this.form.message.value)" /> <hr color="blue" /> <h3>服務端返回的應答消息</h3> <textarea id="responseText" style="width: 500px; height: 300px;"></textarea> </form> </body> </html>