netty實現消息轉發服務

一、結構圖java

  

二、消息服務器web

消息服務器(SNS)由Http Netty Server(HNS)WebSocket Netty Server(WNS)組成。HNS採用Netty Http+XML協議棧開發實現,WNS採用Netty WebSocket+JSON實現。bootstrap

HNS只接收預約義的HttpXmlRequest類型的數據,這由編解碼器控制,編解碼器是繼承了MessageToMessageDecoder<T>MessageToMessageEncoder<T>這兩個編解碼基礎類、並用於解析處理預約義HttpXmlRequest數據的類。HNS根據接收結果向客戶端發送預約義的HttpXmlResponse類型數據。數組

HNS能夠經過HttpXmlClient建立與業務服務器的連接,並經過HttpXmlClientHandler轉發業務請求。HttpXmlClientHandler繼承自SimpleChannelInboundHandler,經過它能夠實現HNS與業務服務器的異步通訊。服務器

目前,WNS主要用於與Web客戶端端進行websocket通訊,WNS經過全局變量Global.WSCG維護通道信息,經過Global.appUsers維護客戶端鏈接。WNS定義了一個消息基類BaseMsg,該類描述了客戶端發起請求所須要的數據信息。一樣,WNS也定義了一個AppUser類用於存儲客戶端信息,必須說明的一點是,同一個AppUser可能存在多個通道,所以,在AppUser中定義了一個ChannelId數組,該數組維護了當前用戶的全部通道ID。客戶端發起鏈接請求時,必要的數據包括appiduserIdcmdappid是一個業務服務器的惟一標識。 websocket

三、業務服務器網絡

業務服務器是各個應用端創建的與SNS交互的Http Netty Server,換句話說,每個應用都須要啓動一個HNS用於與SNS交互。一樣的,業務服務器也是經過HttpXmlClientSNSHNS發起鏈接請求,再也不贅述。app

四、HttpXmlServer異步

 

 

package com.sns.protocol.http.xml.server;

import java.net.InetSocketAddress;

import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequest;
import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequestDecoder;
import com.zehin.sns.protocol.http.xml.codec.HttpXmlResponseEncoder;
import com.zehin.sns.protocol.http.xml.pojo.HttpRequestMessage;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

public class HttpXmlServer implements Runnable {

	private EventLoopGroup bossGroup = null;
	private EventLoopGroup workerGroup = null;
	private SimpleChannelInboundHandler<HttpXmlRequest> handler = null;

	private int port = 9999;

	@SuppressWarnings("unused")
	private HttpXmlServer() {
	}

	public HttpXmlServer(int _port, SimpleChannelInboundHandler<HttpXmlRequest> _handler) {
		this.port = _port;
		this.handler = _handler;
	}

	@Override
	public void run() {
		// 處理網絡鏈接---接受請求
		bossGroup = new NioEventLoopGroup();
		// 進行socketChannel的網絡讀寫
		workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
					// boss線程接收參數設置,BACKLOG用於構造服務端套接字ServerSocket對象,
					// 標識當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度。
					// 若是未設置或所設置的值小於1,Java將使用默認值50。
					.option(ChannelOption.SO_BACKLOG, 1024)
					// 發送緩衝器
					.option(ChannelOption.SO_SNDBUF, 1024)
					// 接收緩衝器
					.option(ChannelOption.SO_RCVBUF, 1024)
					// 接收緩衝分配器
					.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(256, 2048, 65536))
					// work線程參數設置
					.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(256, 2048, 65536))
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
							ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
							ch.pipeline().addLast("xml-decoder",
									new HttpXmlRequestDecoder(HttpRequestMessage.class, true));
							ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
							ch.pipeline().addLast("xml-encoder", new HttpXmlResponseEncoder());
							ch.pipeline().addLast("xmlServerHandler", handler);
						}
					});
			ChannelFuture future = b.bind(new InetSocketAddress(port)).sync();
			System.out.println("HTTP netty server started. the port is " + port);
			future.channel().closeFuture().sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

	public void shutdown() {
		if (bossGroup != null)
			bossGroup.shutdownGracefully();
		if (workerGroup != null)
			workerGroup.shutdownGracefully();
	}
}

 

五、HttpXmlServerHandlersocket

package com.sns.protocol.http.xml.server;

import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequest;
import com.zehin.sns.protocol.http.xml.codec.HttpXmlResponse;
import com.zehin.sns.protocol.http.xml.pojo.HttpRequestMessage;
import com.zehin.sns.protocol.http.xml.pojo.HttpResponseMessage;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

@Sharable
public final class HttpXmlServerHandler extends SimpleChannelInboundHandler<HttpXmlRequest> {

	@Override
	public void messageReceived(final ChannelHandlerContext ctx, HttpXmlRequest xmlRequest) throws Exception {
		HttpRequest request = xmlRequest.getRequest();
		HttpRequestMessage reqMessage = (HttpRequestMessage) xmlRequest.getBody();
		System.out.println("Http server receive request : " + reqMessage);
		HttpResponseMessage resMessage = dobusiness(reqMessage);
		ChannelFuture future = ctx.writeAndFlush(new HttpXmlResponse(null, resMessage));
		if (!isKeepAlive(request)) {
			future.addListener(new GenericFutureListener<Future<? super Void>>() {
				public void operationComplete(Future future) throws Exception {
					ctx.close();
				}
			});
		}
	}

	private HttpResponseMessage dobusiness(HttpRequestMessage req) {
		HttpResponseMessage resMessage = new HttpResponseMessage();
		if (req.getCmd() == 0) {
			resMessage.setResult(true);
		} else {
			// other verify code here...
		}
		return resMessage;
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		if (ctx.channel().isActive()) {
			sendError(ctx, INTERNAL_SERVER_ERROR);
		}
	}

	private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
		FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status,
				Unpooled.copiedBuffer("失敗: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
		response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
	}
}

六、備註

  主要參考《Netty權威指南》而寫了個簡單的消息轉發。

相關文章
相關標籤/搜索