利用netty實現支持高併發的Tcp短鏈接接收服務

      若是你對Netty有所瞭解,咱們利用Netty寫Tcp服務時,一般會繼承SimpleChannelUpstreamHandler類,重寫messageReceived函數進行數據的接收,以下就是個簡單的tcp短鏈接服務樣例:併發

public class TelnetServerHandler extends SimpleChannelUpstreamHandler {

    private static final Logger logger = Logger.getLogger(
            TelnetServerHandler.class.getName());

    @Override
    public void handleUpstream(
            ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        if (e instanceof ChannelStateEvent) {
            logger.info(e.toString());
        }
        super.handleUpstream(ctx, e);
    }

    @Override
    public void channelConnected(
            ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        // Send greeting for a new connection.
        e.getChannel().write(
                "Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");
        e.getChannel().write("It is " + new Date() + " now.\r\n");
    }

    @Override
    public void messageReceived(
            ChannelHandlerContext ctx, MessageEvent e) {

        String request = (String) e.getMessage();

        // Generate and write a response.
        String response;
        boolean close = false;
        if (request.length() == 0) {
            response = "Please type something.\r\n";
        } else if (request.toLowerCase().equals("bye")) {
            response = "Have a good day!\r\n";
            close = true;
        } else {
            response = "Did you say '" + request + "'?\r\n";
        }

        // We do not need to write a ChannelBuffer here.
        // We know the encoder inserted at TelnetPipelineFactory will do the conversion.
        ChannelFuture future = e.getChannel().write(response);
        if (close) {
            future.addListener(ChannelFutureListener.CLOSE);
        }
    }
}

   

      可是,若是有高併發用戶,數據發送出現亂序。例若有A用戶發送1--2--3--4--5--6,B用戶發送a--b--c--d--e--f。而服務端接收的數據順序多是:1--2--a--b--3--4--c--d---e---f---5---6。
     因此爲了支持高併發的用戶,不合適採用單線程接收用戶數據的機制,應該實現支持異步的數據接收;而且爲每一個鏈接通道創建狀態來進行緩衝數據的維護,直至客戶端關閉時,將接收的數據給業務層去處理,以下面 receiver.receive(receivedBytes)函數。異步

     下面是個簡單的實現:socket

    public class ChannelHandler extends SimpleChannelHandler {
        private ConcurrentHashMap<SocketAddress,ByteArrayOutputStream> socket2ByteArrayMap = new ConcurrentHashMap<>();
        
        public ChannelHandler() {
        }

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
            channels.add(e.getChannel());
            ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
            logger.info(e.getRemoteAddress().toString());
            
           try {
                SocketAddress curSocketAddress = e.getRemoteAddress();
                ByteArrayOutputStream  baos = socket2ByteArrayMap.get(curSocketAddress);
                if(baos == null){
                    baos = new ByteArrayOutputStream(2000);
                    socket2ByteArrayMap.put(curSocketAddress, baos);
                }
                baos.write(buffer.array());
                
            } catch (IOException ie) {
                Thread.currentThread().interrupt();
            }
        }

        public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) {
            logger.error("Error", event.getCause());
            Channel c = context.getChannel();
            c.close().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess())
                        channels.remove(future.getChannel());
                    else
                        logger.error("FAILED to close channel");
                }
            });
        }

        @Override
        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            SocketAddress curSocketAddress = e.getChannel().getRemoteAddress();
            ByteArrayOutputStream  baos =socket2ByteArrayMap.remove(curSocketAddress);
            if(baos != null && baos.size() != 0){
                byte[] receivedBytes = baos.toByteArray();
                receiver.receive(receivedBytes);
            }
            super.channelClosed(ctx, e);
        }

    }

     以上用socket2ByteArrayMap容器維護每一個客戶端接收到的數據,直至對方關閉爲止。 tcp

     很簡單吧。。ide

相關文章
相關標籤/搜索