基於Netty實現服務端與客戶端通訊

我的博客html

www.milovetingting.cnjava

基於Netty實現服務端與客戶端通訊

前言

本文介紹基於Netty實現的服務端與客戶端通訊的簡單使用方法,並在此基礎上實現一個簡單的服務端-客戶端指令通訊的Demo。git

Netty是什麼

Netty是一個NIO客戶端-服務器框架,能夠快速輕鬆地開發網絡應用程序,例如協議服務器和客戶端。它極大地簡化了網絡編程,例如TCP和UDP套接字服務器的開發。提供一個異步事件驅動的網絡應用程序框架和工具,以快速開發可維護的高性能和高可擴展性協議服務器和客戶端。github

以上內容摘選自netty.io/wiki/user-g…編程

Netty具備如下特色:json

  • 適用於各類傳輸類型的統一API-阻塞和非阻塞套接字
  • 更高的吞吐量,更低的延遲
  • 減小資源消耗
  • 減小沒必要要的內存複製
  • 完整的SSL / TLS和StartTLS支持

以上內容摘選自netty.io/bash

使用入門

Netty的使用,能夠參照Netty的官方文檔,這裏以4.x爲例來演示Netty在服務端和客戶端上使用。文檔地址:netty.io/wiki/user-g…服務器

這裏用Eclipse來進行開發,服務端和客戶端都放在一個工程裏。網絡

新建Java工程app

服務端

首先須要導入netty的jar包。這裏使用netty-all-4.1.48.Final.jar。

NettyServer

新建NettyServer類

public class NettyServer {

	private int mPort;

	public NettyServer(int port) {
		this.mPort = port;
	}

	public void run() {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
					// 指定鏈接隊列大小
					.option(ChannelOption.SO_BACKLOG, 128)
					//KeepAlive
					.childOption(ChannelOption.SO_KEEPALIVE, true)
					//Handler
					.childHandler(new ChannelInitializer<SocketChannel>() {

						@Override
						protected void initChannel(SocketChannel channel) throws Exception {
							channel.pipeline().addLast(new NettyServerHandler());
						}
					});
			ChannelFuture f = b.bind(mPort).sync();
			if (f.isSuccess()) {
				LogUtil.log("Server,啓動Netty服務端成功,端口號:" + mPort);
			}
			// f.channel().closeFuture().sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// workerGroup.shutdownGracefully();
			// bossGroup.shutdownGracefully();
		}
	}

}
複製代碼

NettyServerHandler

在初始化時,須要指定Handle,用來處理Channel相關業務。

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelActive");
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		LogUtil.log("Server,接收到客戶端發來的消息:" + msg);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		LogUtil.log("Server,exceptionCaught");
		cause.printStackTrace();
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelInactive");
	}

}
複製代碼

通過上面這些步驟後,服務端最基本的設置就完成了。

客戶端

客戶端和服務端在初始化時大致是相似的,不過相比服務端要簡單一些。

NettyClient

public class NettyClient {

	private String mHost;

	private int mPort;

	private NettyClientHandler mClientHandler;

	private ChannelFuture mChannelFuture;

	public NettyClient(String host, int port) {
		this.mHost = host;
		this.mPort = port;
	}

	public void connect() {
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			Bootstrap b = new Bootstrap();
			mClientHandler = new NettyClientHandler();
			b.group(workerGroup).channel(NioSocketChannel.class)
					// KeepAlive
					.option(ChannelOption.SO_KEEPALIVE, true)
					// Handler
					.handler(new ChannelInitializer<SocketChannel>() {

						@Override
						protected void initChannel(SocketChannel channel) throws Exception {
							channel.pipeline().addLast(mClientHandler);
						}
					});
			mChannelFuture = b.connect(mHost, mPort).sync();
			if (mChannelFuture.isSuccess()) {
				LogUtil.log("Client,鏈接服務端成功");
			}
			mChannelFuture.channel().closeFuture().sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			workerGroup.shutdownGracefully();
		}
	}
}
複製代碼

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Client,channelActive");
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		LogUtil.log("Client,接收到服務端發來的消息:" + msg);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		LogUtil.log("Client,exceptionCaught");
		cause.printStackTrace();
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Client,channelInactive");
	}

}
複製代碼

到這裏,客戶端最基本設置就完成了。

鏈接服務端

新建一個Main類,用於測試服務端和客戶端是否能正常鏈接。

public class Main {

	public static void main(String[] args) {
		try {
			String host = "127.0.0.1";
			int port = 12345;
			NettyServer server = new NettyServer(port);
			server.run();
			Thread.sleep(1000);
			NettyClient client = new NettyClient(host, port);
			client.connect();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}
複製代碼

運行main方法,輸出日誌以下:

2020-4-13 0:11:02--Server,啓動Netty服務端成功,端口號:12345
2020-4-13 0:11:03--Client,channelActive
2020-4-13 0:11:03--Client,鏈接服務端成功
2020-4-13 0:11:03--Server,channelActive
複製代碼

能夠看到,客戶端成功鏈接上了服務端,服務端和客戶端裏設置的Handler的channelActive方法都會回調。

服務端與客戶端通訊

在服務端與客戶端鏈接成功後,咱們每每須要在雙方間進行通訊。這裏假定,在鏈接成功後,服務端給客戶端發送一個歡迎信息"你好,客戶端",而客戶端在收到服務端的消息後,也給服務端回覆一個消息"你好,服務端"。下面來實現具體的功能。

修改服務端NettyServerHandler中的channelActive方法和channelRead方法,在channelActive方法中給客戶端發送消息,在channelRead方法中解析客戶端發來的消息

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelActive");
		ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客戶端", Charset.forName("utf-8"));
		ctx.writeAndFlush(byteBuf);
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		byte[] buffer = new byte[buf.readableBytes()];
		buf.readBytes(buffer);
		String message = new String(buffer, "utf-8");
		LogUtil.log("Server,接收到客戶端發來的消息:" + message);
	}

}
複製代碼

修改客戶端NettyClientHandler中的channelRead方法,當收到服務端的消息時,回覆服務端

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		byte[] buffer = new byte[buf.readableBytes()];
		buf.readBytes(buffer);
		String message = new String(buffer,"utf-8");
		LogUtil.log("Client,接收到服務端發來的消息:" + message);
		
		ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服務端", Charset.forName("utf-8"));
		ctx.writeAndFlush(byteBuf);
	}

}
複製代碼

運行後,輸出日誌以下:

2020-4-13 0:29:16--Server,啓動Netty服務端成功,端口號:12345
2020-4-13 0:29:17--Client,channelActive
2020-4-13 0:29:17--Client,鏈接服務端成功
2020-4-13 0:29:17--Server,channelActive
2020-4-13 0:29:17--Client,接收到服務端發來的消息:你好,客戶端
2020-4-13 0:29:17--Server,接收到客戶端發來的消息:你好,服務端
複製代碼

能夠看到,服務端與客戶端已經能夠正常通訊。

粘包與拆包

在實際的使用場景中,可能會存在短期內大量數據發送的問題。咱們模擬這個場景。在客戶端鏈接上服務端後,服務端給客戶端發送100個消息,而爲便於分析,客戶端在收到服務端消息後,不做回覆。

修改服務端中NettyServerHandler的channelActive方法

@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelActive");
		for (int i = 0; i < 100; i++) {
			ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客戶端", Charset.forName("utf-8"));
			ctx.writeAndFlush(byteBuf);
		}
	}
複製代碼

修改客戶端中NettyClientHandler的channelRead方法

@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		byte[] buffer = new byte[buf.readableBytes()];
		buf.readBytes(buffer);
		String message = new String(buffer, "utf-8");
		LogUtil.log("Client,接收到服務端發來的消息:" + message);

        //ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服務端", Charset.forName("utf-8"));
        //ctx.writeAndFlush(byteBuf);
	}
複製代碼

運行後,輸出的部分結果以下:

2020-4-13 0:35:28--Server,啓動Netty服務端成功,端口號:12345
2020-4-13 0:35:29--Client,channelActive
2020-4-13 0:35:29--Client,鏈接服務端成功
2020-4-13 0:35:29--Server,channelActive
2020-4-13 0:35:29--Client,接收到服務端發來的消息:你好,客戶端
2020-4-13 0:35:29--Client,接收到服務端發來的消息:你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端你好,客戶端
2020-4-13 0:35:29--Client,接收到服務端發來的消息:你好,客戶端

複製代碼

能夠看到,出現了多條消息"粘"在一塊兒的狀況。

什麼是粘包與拆包

TCP是個"流"協議,所謂流,就是沒有界限的一串數據。TCP底層並不瞭解上層業務數據的具體含義,它會根據TCP緩衝區的實際狀況進行包的劃分,因此在業務上認爲,一個完整的包可能會被TCP拆分紅多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這就是所謂的TCP粘包和拆包問題。

以上內容摘選自TCP粘包/拆包與Netty解決方案

解決方案

在沒有 Netty 的狀況下,用戶若是本身須要拆包,基本原理就是不斷從 TCP 緩衝區中讀取數據,每次讀取完都須要判斷是不是一個完整的數據包 若是當前讀取的數據不足以拼接成一個完整的業務數據包,那就保留該數據,繼續從 TCP 緩衝區中讀取,直到獲得一個完整的數據包。 若是當前讀到的數據加上已經讀取的數據足夠拼接成一個數據包,那就將已經讀取的數據拼接上本次讀取的數據,構成一個完整的業務數據包傳遞到業務邏輯,多餘的數據仍然保留,以便和下次讀到的數據嘗試拼接。

以上內容摘選自完全理解Netty,這一篇文章就夠了

而使用Netty,則解決這個問題的方法就簡單多了。Netty已經提供了四個拆包器:

  • FixedLengthFrameDecoder:固定長度的拆包器,Netty會把固定長度的數據包發送給下一個channelHandler
  • LineBasedFrameDecoder:行拆包器,每一個數據包以換行符分隔發送
  • DelimiterBasedFrameDecoder:分隔符拆包器,能夠自定義分隔符,行拆包器是分隔符拆包器的一種特例
  • LengthFieldBasedFrameDecoder:基於長度域的拆包器,若是自定義協議中包含長度域的字段,就可使用這個拆包器

在這裏,咱們選用分隔符拆包器

首先定義分隔符

public class Config {
	public static final String DATA_PACK_SEPARATOR = "#$&*";
}
複製代碼

在服務端的channelHandler配置中,須要增長

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    //這個配置須要在添加Handler前設置
	channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes())));
	channel.pipeline().addLast(new NettyServerHandler());
	}
複製代碼

在客戶端的channelHandler的配置中,一樣也須要增長

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    //這個配置須要在添加Handler前設置
	channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes())));
	channel.pipeline().addLast(new NettyServerHandler());
	}
複製代碼

發送數據時,在數據的末尾增長分隔符:

@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelActive");
		for (int i = 0; i < 100; i++) {
			ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客戶端"+Config.DATA_PACK_SEPARATOR, Charset.forName("utf-8"));
			ctx.writeAndFlush(byteBuf);
		}
	}
複製代碼

運行後,能夠發現,已經解決"粘包"與"拆包"的問題。

心跳

在網絡應用中,爲了判斷鏈接是否還存在,通常會經過發送心跳包來檢測。在Netty中,配置心跳包的步驟以下

在客戶端的channelHandler的配置中,須要增長

@Override
protected void initChannel(SocketChannel channel) throws Exception {
			channel.pipeline().addLast(new IdleStateHandler(5, 5, 10));
            //...
						}
複製代碼

在NettyClientHandler中,重寫userEventTriggered方法

@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		IdleStateEvent event = (IdleStateEvent) evt;
		LogUtil.log("Client,Idle:" + event.state());
		switch (event.state()) {
		case READER_IDLE:

			break;
		case WRITER_IDLE:
			ByteBuf byteBuf = Unpooled.copiedBuffer("心跳^v^v", Charset.forName("utf-8"));
			break;
		case ALL_IDLE:
			break;
		default:
			super.userEventTriggered(ctx, evt);
			break;
		}
	}
複製代碼

當寫空閒達到配置的時間時,往服務端發送一個心跳消息

運行後,日誌輸出以下:

2020-4-13 1:22:50--Server,啓動Netty服務端成功,端口號:12345
2020-4-13 1:22:51--Client,channelActive
2020-4-13 1:22:51--Client,鏈接服務端成功
2020-4-13 1:22:51--Server,channelActive
2020-4-13 1:22:51--Client,接收到服務端發來的消息:你好,客戶端
2020-4-13 1:22:56--Client,Idle:WRITER_IDLE
2020-4-13 1:22:56--Server,接收到客戶端發來的消息:心跳^v^
2020-4-13 1:22:56--Client,Idle:READER_IDLE
2020-4-13 1:23:01--Client,Idle:WRITER_IDLE
2020-4-13 1:23:01--Server,接收到客戶端發來的消息:心跳^v^
2020-4-13 1:23:01--Client,Idle:READER_IDLE
複製代碼

能夠看到,心跳包按咱們配置的時間正常輸出了。

配置編碼器與解碼器

咱們上面在發送數據時,須要經過ByteBuf來轉換String,而經過配置編碼,解碼器,咱們就能夠直接發送字符串。配置以下:

在服務端與客戶端的channelHandler分別增長如下配置:

@Override
protected void initChannel(SocketChannel channel) throws Exception {
	//...
	//這個配置須要在添加Handler前設置
	channel.pipeline().addLast("encoder", new StringEncoder());
	channel.pipeline().addLast("decoder", new StringDecoder());
    //...
}
複製代碼

在發送消息時,則能夠直接經過ctx.writeAndFlush("心跳^v^" + Config.DATA_PACK_SEPARATOR)的形式來發送。

源碼

到此,最簡單的服務端與客戶端通訊的Demo已經完成。源碼地址:github.com/milovetingt…

使用進階

在上面的基礎上,咱們來實現一個下面的需求:

  • 客戶端須要登陸到服務端

  • 客戶端登陸成功後,服務端能夠給客戶端發送指令消息,客戶端在收到消息及處理完消息後,都須要上報給服務端

封裝鏈接

爲便於程序擴展,咱們將客戶端鏈接服務端的部分抽取出來。經過一個接口來定義鏈接的方法,而鏈接的具體實現由子類來實現。

定義接口

public interface IConnection {

	/** * 鏈接服務器 * * @param host 服務器地址 * @param port 端口 * @param callback 鏈接回調 */
	public void connect(String host, int port, IConnectionCallback callback);

}
複製代碼

在這裏還須要定義鏈接的回調接口

public interface IConnectionCallback {

	/** * 鏈接成功 */
	public void onConnected();

}
複製代碼

具體的鏈接實現類

public class NettyConnection implements IConnection {

	private NettyClient mClient;

	@Override
	public void connect(String host, int port, IConnectionCallback callback) {
		if (mClient == null) {
			mClient = new NettyClient(host, port);
			mClient.setConnectionCallBack(callback);
			mClient.connect();
		}
	}

}
複製代碼

爲便於管理鏈接,定義一個鏈接的管理類

public class ConnectionManager implements IConnection {

	private static IConnection mConnection;

	private ConnectionManager() {

	}

	static class ConnectionManagerInner {
		private static ConnectionManager INSTANCE = new ConnectionManager();
	}

	public static ConnectionManager getInstance() {
		return ConnectionManagerInner.INSTANCE;
	}

	public static void initConnection(IConnection connection) {
		mConnection = connection;
	}

	private void checkInit() {
		if (mConnection == null) {
			throw new IllegalAccessError("please invoke initConnection first!");
		}
	}

	@Override
	public void connect(String host, int port, IConnectionCallback callback) {
		checkInit();
		mConnection.connect(host, port, callback);
	}

}
複製代碼

調用鏈接:

public class Main {

	public static void main(String[] args) {
		try {
			String host = "127.0.0.1";
			int port = 12345;
			NettyServer server = new NettyServer(port);
			server.run();
			Thread.sleep(1000);
			ConnectionManager.initConnection(new NettyConnection());
			ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

				@Override
				public void onConnected() {
					LogUtil.log("Main,onConnected"););
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}
複製代碼

在調用connect方法前,須要先調用initConnection來指定具體的鏈接類

消息Bean的定義

在鏈接成功後,服務端會給客戶端發送一個歡迎的消息。爲便於管理,咱們定義一個消息Bean

public class Msg {

	/** * 歡迎 */
	public static final int TYPE_WELCOME = 0;

	public int type;

	public String msg;

}
複製代碼

服務端發送歡迎消息

服務端發送消息

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	private ChannelHandlerContextWrapper mChannelHandlerContextWrapper;

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelActive");
		mChannelHandlerContextWrapper = new ChannelHandlerContextWrapper(ctx);
		MsgUtil.sendWelcomeMsg(mChannelHandlerContextWrapper);
	}
}
複製代碼

在這裏,經過定義一個ChannelHandlerContextWrapper類來統一管理消息分隔符

public class ChannelHandlerContextWrapper {

	private ChannelHandlerContext mContext;

	public ChannelHandlerContextWrapper(ChannelHandlerContext context) {
		this.mContext = context;
	}

	/** * 包裝writeAndFlush方法 * * @param object */
	public void writeAndFlush(Object object) {
		mContext.writeAndFlush(object + Config.DATA_PACK_SEPARATOR);
	}

}
複製代碼

再進一步,經過定義MsgUtil類來封裝發送歡迎消息

public class MsgUtil {

	/** * 發送歡迎消息 * * @param wrapper */
	public static void sendWelcomeMsg(ChannelHandlerContextWrapper wrapper) {
		Msg msg = new Msg();
		msg.type = Msg.TYPE_WELCOME;
		msg.msg = "你好,客戶端";
		wrapper.writeAndFlush(Global.sGson.toJson(msg));
	}

}
複製代碼

客戶端消息接收

對於客戶端而言,爲方便處理消息,咱們須要定義一個方法來接收消息。經過在IConnection接口中新增一個registerMsgCallback方法來實現

public interface IConnection {

	/** * 鏈接服務器 * * @param host 服務器地址 * @param port 端口 * @param callback 鏈接回調 */
	public void connect(String host, int port, IConnectionCallback callback);

	/** * 註冊消息回調 * * @param callback */
	public void registerMsgCallback(IMsgCallback callback);

}
複製代碼

在這裏,還須要新增IMsgCallback接口

public interface IMsgCallback {

	/** * 接收到消息時的回調 * * @param msg */
	public void onMsgReceived(Msg msg);

}
複製代碼

對應到實現類

public class NettyConnection implements IConnection {

	private NettyClient mClient;

	@Override
	public void connect(String host, int port, IConnectionCallback callback) {
		if (mClient == null) {
			mClient = new NettyClient(host, port);
			mClient.setConnectionCallBack(callback);
			mClient.connect();
		}
	}

	@Override
	public void registerMsgCallback(IMsgCallback callback) {
		if (mClient == null) {
			throw new IllegalAccessError("please invoke connect first!");
		}
		mClient.registerMsgCallback(callback);
	}

}
複製代碼

消息的分發

在客戶端,爲便於處理消息,咱們對消息類型進行劃分

修改消息Bean

public class Msg {

	/** * 歡迎 */
	public static final int TYPE_WELCOME = 0;

	/** * 心跳 */
	public static final int TYPE_HEART_BEAT = 1;

	/** * 登陸 */
	public static final int TYPE_LOGIN = 2;

	public static final int TYPE_COMMAND_A = 3;

	public static final int TYPE_COMMAND_B = 4;

	public static final int TYPE_COMMAND_C = 5;

	public int type;

	public String msg;
}
複製代碼

假定消息是串行的,須要一個一個地處理。爲便於管理消息,增長MsgQueue類

public class MsgQueue {

	private PriorityBlockingQueue<Msg> mQueue;

	private boolean using;

	private MsgQueue() {
		mQueue = new PriorityBlockingQueue<>(128, new Comparator<Msg>() {

			@Override
			public int compare(Msg msg1, Msg msg2) {
				int res = msg2.priority - msg1.priority;
				if (res == 0 && msg1.time != msg2.time) {
					return (int) (msg2.time - msg1.time);
				}
				return res;
			}
		});
	}

	public static MsgQueue getInstance() {
		return MsgQueueInner.INSTANCE;
	}

	private static class MsgQueueInner {
		private static final MsgQueue INSTANCE = new MsgQueue();
	}

	/** * 將消息加入消息隊列 * * @param msg */
	public void enqueueMsg(Msg msg) {
		mQueue.add(msg);
	}

	/** * 從消息隊列獲取消息 * * @return */
	public synchronized Msg next() {
		if (using) {
			return null;
		}
		Msg msg = mQueue.poll();
		if (msg != null) {
			makeUse(true);
		}
		return msg;
	}

	/** * 標記使用狀態 * * @param use */
	public synchronized void makeUse(boolean use) {
		using = use;
	}

	/** * 是否可以使用 * * @return */
	public synchronized boolean canUse() {
		return !using;
	}

}
複製代碼

增長消息的分發類MsgDispatcher

public class MsgDispatcher {

	private static Map<Integer, Class<? extends IMsgHandler>> mHandlerMap;

	static {
		mHandlerMap = new HashMap<>();
		mHandlerMap.put(Msg.TYPE_WELCOME, WelcomeMsgHandler.class);
		mHandlerMap.put(Msg.TYPE_HEART_BEAT, HeartBeatMsgHandler.class);
		mHandlerMap.put(Msg.TYPE_LOGIN, HeartBeatMsgHandler.class);
		mHandlerMap.put(Msg.TYPE_COMMAND_A, CommandAMsgHandler.class);
		mHandlerMap.put(Msg.TYPE_COMMAND_B, CommandBMsgHandler.class);
		mHandlerMap.put(Msg.TYPE_COMMAND_C, CommandCMsgHandler.class);
	}

	public static void dispatch() {
		if (MsgQueue.getInstance().canUse()) {
			Msg msg = MsgQueue.getInstance().next();
			if (msg == null) {
				return;
			}
			dispatch(msg);
		}
	}

	public static void dispatch(Msg msg) {
		try {
			IMsgHandler handler = (IMsgHandler) Class.forName(mHandlerMap.get(msg.type).getName()).newInstance();
			handler.handle(msg);
		} catch (InstantiationException e) {
			e.printStackTrace();
		} catch (IllegalAccessException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
	}

}
複製代碼

消息的處理

定義IMsgHandler,在這裏定義了處理的方法,具體實現由子類實現

public interface IMsgHandler {

	/** * 處理消息 * * @param msg */
	public void handle(Msg msg);

}
複製代碼

爲統一管理,定義Base類BaseCommandHandler

public abstract class BaseCommandHandler implements IMsgHandler {

	@Override
	public void handle(Msg msg) {
		execute(msg);
	}

	public final void execute(Msg msg) {
		LogUtil.log("Client,received command:" + msg);
		doHandle(msg);
		MsgQueue.getInstance().makeUse(false);
		LogUtil.log("Client,report command:" + msg);
		MsgDispatcher.dispatch();
	}

	public abstract void doHandle(Msg msg);

}
複製代碼

在BaseCommandHandler中,定義execute方法,順序調用:上報消息已接收成功、處理消息、上報消息已處理完成。這裏的消息上報部分,都只是輸出一個日誌來代替,在實際的業務中,能夠抽取出一個抽象方法,讓子類來實現。

定義子類,繼承自BaseCommandHandler

public class LoginMsgHandler extends BaseCommandHandler {

	@Override
	public void doHandle(Msg msg) {
		LogUtil.log("Client,handle msg:" + msg);
	}

}
複製代碼

對應的心跳類型消息、歡迎類型消息等,均可以新增對應的處理類來實現,這裏再也不展開。

接收到消息時的處理

public class Main {

	public static void main(String[] args) {
		try {
			String host = "127.0.0.1";
			int port = 12345;
			NettyServer server = new NettyServer(port);
			server.run();
			Thread.sleep(1000);
			ConnectionManager.initConnection(new NettyConnection());
			ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

				@Override
				public void onConnected() {
					LogUtil.log("Main,onConnected");

					ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() {

						@Override
						public void onMsgReceived(Msg msg) {
							MsgQueue.getInstance().enqueueMsg(msg);
							MsgDispatcher.dispatch();
						}
					});
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}
複製代碼

客戶端登陸

修改消息Bean,增長登陸的請求和響應

public class Msg {

	/** * 歡迎 */
	public static final int TYPE_WELCOME = 0;

	/** * 心跳 */
	public static final int TYPE_HEART_BEAT = 1;

	/** * 登陸 */
	public static final int TYPE_LOGIN = 2;

	public static final int TYPE_COMMAND_A = 3;

	public static final int TYPE_COMMAND_B = 4;

	public static final int TYPE_COMMAND_C = 5;

	public int type;

	public String msg;

	public int priority;

	public long time;

	/** * 登陸請求信息 * * @author Administrator * */
	public static class LoginRuquestInfo {
		/** * 用戶名 */
		public String user;

		/** * 密碼 */
		public String pwd;

		@Override
		public String toString() {
			return "LoginRuquestInfo [user=" + user + ", pwd=" + pwd + "]";
		}
	}

	/** * 登陸響應信息 * * @author Administrator * */
	public static class LoginResponseInfo {

		/** * 登陸成功 */
		public static final int CODE_SUCCESS = 0;

		/** * 登陸失敗 */
		public static final int CODE_FAILED = 100;

		/** * 響應碼 */
		public int code;

		/** * 響應數據 */
		public String data;

		public static class ResponseData {
			public String token;
		}

		@Override
		public String toString() {
			return "LoginResponseInfo [code=" + code + ", data=" + data + "]";
		}

	}
}
複製代碼

發送登陸請求

public class Main {

	public static void main(String[] args) {
		try {
			String host = "127.0.0.1";
			int port = 12345;
			NettyServer server = new NettyServer(port);
			server.run();
			Thread.sleep(1000);
			ConnectionManager.initConnection(new NettyConnection());
			ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

				@Override
				public void onConnected() {
					LogUtil.log("Main,onConnected");

					ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() {

						@Override
						public void onMsgReceived(Msg msg) {
							MsgQueue.getInstance().enqueueMsg(msg);
							MsgDispatcher.dispatch();
						}
					});

					Msg msg = new Msg();
					msg.type = Msg.TYPE_LOGIN;

					Msg.LoginRuquestInfo request = new LoginRuquestInfo();
					request.user = "wangyz";
					request.pwd = "wangyz";

					Gson gson = new Gson();
					msg.msg = gson.toJson(request);

					ConnectionManager.getInstance().sendMsg(msg);
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}
複製代碼

這裏,引入Gson,將消息Bean轉成json字符串後發送。

對應到服務端,爲便於解析出消息,也須要對應的修改消息的Bean。服務端對消息的具體分發與處理,和客戶端相似,這裏再也不展開。

源碼

因爲篇幅限制,Demo中指令的優先級處理,模擬服務端指令下發等,這裏沒有再進一步詳細介紹,具體能夠參考源碼:github.com/milovetingt…

後記

本文介紹了基於Netty實現服務端與客戶端通訊的基本用法,以及在此基礎上,實現處理服務端指令並上報。Demo中通訊的數據格式,用到了json,而優化的作法,能夠用protobuf來實現,這裏只展現通訊的流程及簡單的封裝,於是未使用protobuf。Demo中只實現大致的流程,可能存在未測試到的Bug,權當一個參考的思路吧。

End~

相關文章
相關標籤/搜索