一、結構圖java
二、消息服務器web
消息服務器(SNS)由Http Netty Server(HNS)和WebSocket Netty Server(WNS)組成。HNS採用Netty Http+XML協議棧開發實現,WNS採用Netty WebSocket+JSON實現。bootstrap
HNS只接收預約義的HttpXmlRequest類型的數據,這由編解碼器控制,編解碼器是繼承了MessageToMessageDecoder<T>和MessageToMessageEncoder<T>這兩個編解碼基礎類、並用於解析處理預約義HttpXmlRequest數據的類。HNS根據接收結果向客戶端發送預約義的HttpXmlResponse類型數據。數組
HNS能夠經過HttpXmlClient建立與業務服務器的連接,並經過HttpXmlClientHandler轉發業務請求。HttpXmlClientHandler繼承自SimpleChannelInboundHandler,經過它能夠實現HNS與業務服務器的異步通訊。服務器
目前,WNS主要用於與Web客戶端端進行websocket通訊,WNS經過全局變量Global.WSCG維護通道信息,經過Global.appUsers維護客戶端鏈接。WNS定義了一個消息基類BaseMsg,該類描述了客戶端發起請求所須要的數據信息。一樣,WNS也定義了一個AppUser類用於存儲客戶端信息,必須說明的一點是,同一個AppUser可能存在多個通道,所以,在AppUser中定義了一個ChannelId數組,該數組維護了當前用戶的全部通道ID。客戶端發起鏈接請求時,必要的數據包括appid、userId、cmd,appid是一個業務服務器的惟一標識。 websocket
三、業務服務器網絡
業務服務器是各個應用端創建的與SNS交互的Http Netty Server,換句話說,每個應用都須要啓動一個HNS用於與SNS交互。一樣的,業務服務器也是經過HttpXmlClient向SNS的HNS發起鏈接請求,再也不贅述。app
四、HttpXmlServer異步
package com.sns.protocol.http.xml.server; import java.net.InetSocketAddress; import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequest; import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequestDecoder; import com.zehin.sns.protocol.http.xml.codec.HttpXmlResponseEncoder; import com.zehin.sns.protocol.http.xml.pojo.HttpRequestMessage; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; public class HttpXmlServer implements Runnable { private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; private SimpleChannelInboundHandler<HttpXmlRequest> handler = null; private int port = 9999; @SuppressWarnings("unused") private HttpXmlServer() { } public HttpXmlServer(int _port, SimpleChannelInboundHandler<HttpXmlRequest> _handler) { this.port = _port; this.handler = _handler; } @Override public void run() { // 處理網絡鏈接---接受請求 bossGroup = new NioEventLoopGroup(); // 進行socketChannel的網絡讀寫 workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // boss線程接收參數設置,BACKLOG用於構造服務端套接字ServerSocket對象, // 標識當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度。 // 若是未設置或所設置的值小於1,Java將使用默認值50。 .option(ChannelOption.SO_BACKLOG, 1024) // 發送緩衝器 .option(ChannelOption.SO_SNDBUF, 1024) // 接收緩衝器 .option(ChannelOption.SO_RCVBUF, 1024) // 接收緩衝分配器 .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(256, 2048, 65536)) // work線程參數設置 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(256, 2048, 65536)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-decoder", new HttpRequestDecoder()); ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("xml-decoder", new HttpXmlRequestDecoder(HttpRequestMessage.class, true)); ch.pipeline().addLast("http-encoder", new HttpResponseEncoder()); ch.pipeline().addLast("xml-encoder", new HttpXmlResponseEncoder()); ch.pipeline().addLast("xmlServerHandler", handler); } }); ChannelFuture future = b.bind(new InetSocketAddress(port)).sync(); System.out.println("HTTP netty server started. the port is " + port); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public void shutdown() { if (bossGroup != null) bossGroup.shutdownGracefully(); if (workerGroup != null) workerGroup.shutdownGracefully(); } }
五、HttpXmlServerHandlersocket
package com.sns.protocol.http.xml.server; import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequest; import com.zehin.sns.protocol.http.xml.codec.HttpXmlResponse; import com.zehin.sns.protocol.http.xml.pojo.HttpRequestMessage; import com.zehin.sns.protocol.http.xml.pojo.HttpResponseMessage; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @Sharable public final class HttpXmlServerHandler extends SimpleChannelInboundHandler<HttpXmlRequest> { @Override public void messageReceived(final ChannelHandlerContext ctx, HttpXmlRequest xmlRequest) throws Exception { HttpRequest request = xmlRequest.getRequest(); HttpRequestMessage reqMessage = (HttpRequestMessage) xmlRequest.getBody(); System.out.println("Http server receive request : " + reqMessage); HttpResponseMessage resMessage = dobusiness(reqMessage); ChannelFuture future = ctx.writeAndFlush(new HttpXmlResponse(null, resMessage)); if (!isKeepAlive(request)) { future.addListener(new GenericFutureListener<Future<? super Void>>() { public void operationComplete(Future future) throws Exception { ctx.close(); } }); } } private HttpResponseMessage dobusiness(HttpRequestMessage req) { HttpResponseMessage resMessage = new HttpResponseMessage(); if (req.getCmd() == 0) { resMessage.setResult(true); } else { // other verify code here... } return resMessage; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (ctx.channel().isActive()) { sendError(ctx, INTERNAL_SERVER_ERROR); } } private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("失敗: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } }
六、備註
主要參考《Netty權威指南》而寫了個簡單的消息轉發。