書接上文手寫MQ框架(三)-客戶端實現 ,前面經過web的形式實現了mq的服務端和客戶端,如今計劃使用netty來改造一下。前段時間學習了一下netty的使用(https://www.w3cschool.cn/netty4userguide/52ki1iey.html)。大概有一些想法。html
netty封裝了socket的使用,咱們經過簡單的調用便可構建高性能的網絡應用。我計劃採用如下例子來對gmq進行改造。java
本文主要參考:https://www.w3cschool.cn/netty4userguide/、https://www.w3cschool.cn/essential_netty_in_action/web
Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。編程
--來自https://www.w3cschool.cn/netty4userguide/52ki1iey.htmlbootstrap
netty是一個java框架,是網絡編程框架,支持異步、事件驅動的特性,因此性能表現很好。服務器
Handler是處理器,handler 是由 Netty 生成用來處理 I/O 事件的。網絡
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class SimpleServerHandler extends SimpleChannelInboundHandler<String> { public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("[SERVER] - " + incoming.remoteAddress() + " 加入\n"); channels.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("[SERVER] - " + incoming.remoteAddress() + " 離開\n"); channels.remove(ctx.channel()); } @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { Channel incoming = ctx.channel(); System.out.println("[" + incoming.remoteAddress() + "]" + s); if(s == null || s.length() == 0) { incoming.writeAndFlush("消息是空的呀!\n"); } else { // MqRouter<?> mqRouter = JSONObject.parseObject(s, MqRouter.class); // System.out.println(mqRouter.getUri()); String responseMsg = "收到了," + s + "\n"; incoming.writeAndFlush(responseMsg); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常"); cause.printStackTrace(); ctx.close(); } }
SimpleServerInitializer 用來增長多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleServerHandler 等。mvc
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class SimpleServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleServerHandler()); System.out.println("SimpleChatClient:" + ch.remoteAddress() + "鏈接上"); } }
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class SimpleServer { private int port; public SimpleServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new SimpleServerInitializer()).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); System.out.println("SimpleChatServer 啓動了"); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("SimpleChatServer 關閉了"); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new SimpleServer(port).run(); } }
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class SimpleClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println("收到的信息:" + s); } }
package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleClientHandler()); } }
package me.lovegao.netty.learnw3c.mqdemo; import java.io.BufferedReader; import java.io.InputStreamReader; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class SimpleClient { private final String host; private final int port; public SimpleClient(String host, int port) { this.host = host; this.port = port; } public static void main(String[] args) throws Exception { new SimpleClient("localhost", 8080).run(); } public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new SimpleClientInitializer()); Channel channel = bootstrap.connect(host, port).sync().channel(); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while(true) { String line = in.readLine(); if(line.equals("exit!")) { break; } channel.writeAndFlush(line + "\r\n"); } } catch(Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
在我把教程上的代碼略微改了一下,測試時發現客戶端可以發出消息,服務端可以收到消息,服務端也走到了回覆客戶端的流程,可是客戶端卻收不到消息。還原代碼後是正常的,想了半天,最後才發現是改代碼的的時候漏掉了「\n」這個標識,以此致使客戶端始終不打印消息。框架
netty只封裝了網絡交互,gmq總體使用了gmvc框架,而gmvc框架目前還沒法脫離servlet。而我又不太想把以前寫的代碼所有改成本身new的方式。異步
1)改造gmvc框架
對gmvc框架進行重構,使得可以脫離servlet使用。也就是將IOC功能剝離開。
優勢:一步到位,符合總體的規劃。
缺點:gmq的迭代會延遲一段時間。
2)暫時拋棄gmvc框架
暫時將目前依賴的gmvc框架給去除掉,優先完成gmq的迭代。待後期gmvc框架改造完成後再進行改造。
優勢:可以儘早的完成gmq的功能。
缺點:先移除框架,後期再套上框架,至關於作了兩次多餘的功。費時費力。
寫框架就是爲了學習,寫GMVC、寫GMQ目的都同樣。時間寶貴,減小多餘功,先對GMVC框架進行改造。
運用netty還有一個事,就是路由的問題。使用netty代替servlet,須要解決路由的問題。
敬請期待……