手寫MQ框架(四)-使用netty改造梳理

1、背景

書接上文手寫MQ框架(三)-客戶端實現 ,前面經過web的形式實現了mq的服務端和客戶端,如今計劃使用netty來改造一下。前段時間學習了一下netty的使用(https://www.w3cschool.cn/netty4userguide/52ki1iey.html)。大概有一些想法。html

netty封裝了socket的使用,咱們經過簡單的調用便可構建高性能的網絡應用。我計劃採用如下例子來對gmq進行改造。java

本文主要參考:https://www.w3cschool.cn/netty4userguide/、https://www.w3cschool.cn/essential_netty_in_action/web

2、netty是什麼

Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。編程

--來自https://www.w3cschool.cn/netty4userguide/52ki1iey.htmlbootstrap

netty是一個java框架,是網絡編程框架,支持異步、事件驅動的特性,因此性能表現很好。服務器

 

3、netty的簡單實現

一、服務端

1)SimpleServerHandler

Handler是處理器,handler 是由 Netty 生成用來處理 I/O 事件的。網絡

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

public class SimpleServerHandler extends SimpleChannelInboundHandler<String> {
    public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("[SERVER] - " + incoming.remoteAddress() + " 加入\n");
        channels.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("[SERVER] - " + incoming.remoteAddress() + " 離開\n");
        channels.remove(ctx.channel());
    }
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("[" + incoming.remoteAddress() + "]" + s);
        if(s == null || s.length() == 0) {
            incoming.writeAndFlush("消息是空的呀!\n");
        } else {
//            MqRouter<?> mqRouter = JSONObject.parseObject(s, MqRouter.class);
//            System.out.println(mqRouter.getUri());
            String responseMsg = "收到了," + s + "\n";
            incoming.writeAndFlush(responseMsg);
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在線");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉線");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"異常");
        
        cause.printStackTrace();
        ctx.close();
    }

}

2)SimpleServerInitializer

SimpleServerInitializer 用來增長多個的處理類到 ChannelPipeline 上,包括編碼、解碼、SimpleServerHandler 等。mvc

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SimpleServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleServerHandler());
        
        System.out.println("SimpleChatClient:" + ch.remoteAddress() + "鏈接上");
    }

}

 

3)SimpleServer

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class SimpleServer {
    private int port;

    public SimpleServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new SimpleServerInitializer()).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            System.out.println("SimpleChatServer 啓動了");

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();

            System.out.println("SimpleChatServer 關閉了");
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new SimpleServer(port).run();
    }
}

 

 二、客戶端

1)SimpleClientHandler

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class SimpleClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println("收到的信息:" + s);
    }

}

 

2)SimpleClientInitializer

package me.lovegao.netty.learnw3c.mqdemo;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast("handler", new SimpleClientHandler());
    }

}

 

3)SimpleClient

package me.lovegao.netty.learnw3c.mqdemo;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

public class SimpleClient {
    private final String host;
    private final int port;
    
    public SimpleClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        new SimpleClient("localhost", 8080).run();
    }
    
    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleClientInitializer());
            Channel channel = bootstrap.connect(host, port).sync().channel();
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            while(true) {
                String line = in.readLine();
                if(line.equals("exit!")) {
                    break;
                }
                channel.writeAndFlush(line + "\r\n");
            }
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }



}

 

三、學習中的一些事

在我把教程上的代碼略微改了一下,測試時發現客戶端可以發出消息,服務端可以收到消息,服務端也走到了回覆客戶端的流程,可是客戶端卻收不到消息。還原代碼後是正常的,想了半天,最後才發現是改代碼的的時候漏掉了「\n」這個標識,以此致使客戶端始終不打印消息。框架

4、netty如何運用到gmq中

一、運用有什麼問題

netty只封裝了網絡交互,gmq總體使用了gmvc框架,而gmvc框架目前還沒法脫離servlet。而我又不太想把以前寫的代碼所有改成本身new的方式。異步

二、解決方式

1)改造gmvc框架

對gmvc框架進行重構,使得可以脫離servlet使用。也就是將IOC功能剝離開。

優勢:一步到位,符合總體的規劃。

缺點:gmq的迭代會延遲一段時間。

2)暫時拋棄gmvc框架

暫時將目前依賴的gmvc框架給去除掉,優先完成gmq的迭代。待後期gmvc框架改造完成後再進行改造。

優勢:可以儘早的完成gmq的功能。

缺點:先移除框架,後期再套上框架,至關於作了兩次多餘的功。費時費力。

三、結論

寫框架就是爲了學習,寫GMVC、寫GMQ目的都同樣。時間寶貴,減小多餘功,先對GMVC框架進行改造。

四、一些其餘事

運用netty還有一個事,就是路由的問題。使用netty代替servlet,須要解決路由的問題。

5、準備改造GMVC

敬請期待……

相關文章
相關標籤/搜索