若是你對Netty有所瞭解,咱們利用Netty寫Tcp服務時,一般會繼承SimpleChannelUpstreamHandler類,重寫messageReceived函數進行數據的接收,以下就是個簡單的tcp短鏈接服務樣例:併發
public class TelnetServerHandler extends SimpleChannelUpstreamHandler { private static final Logger logger = Logger.getLogger( TelnetServerHandler.class.getName()); @Override public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { logger.info(e.toString()); } super.handleUpstream(ctx, e); } @Override public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { // Send greeting for a new connection. e.getChannel().write( "Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n"); e.getChannel().write("It is " + new Date() + " now.\r\n"); } @Override public void messageReceived( ChannelHandlerContext ctx, MessageEvent e) { String request = (String) e.getMessage(); // Generate and write a response. String response; boolean close = false; if (request.length() == 0) { response = "Please type something.\r\n"; } else if (request.toLowerCase().equals("bye")) { response = "Have a good day!\r\n"; close = true; } else { response = "Did you say '" + request + "'?\r\n"; } // We do not need to write a ChannelBuffer here. // We know the encoder inserted at TelnetPipelineFactory will do the conversion. ChannelFuture future = e.getChannel().write(response); if (close) { future.addListener(ChannelFutureListener.CLOSE); } } }
可是,若是有高併發用戶,數據發送出現亂序。例若有A用戶發送1--2--3--4--5--6,B用戶發送a--b--c--d--e--f。而服務端接收的數據順序多是:1--2--a--b--3--4--c--d---e---f---5---6。
因此爲了支持高併發的用戶,不合適採用單線程接收用戶數據的機制,應該實現支持異步的數據接收;而且爲每一個鏈接通道創建狀態來進行緩衝數據的維護,直至客戶端關閉時,將接收的數據給業務層去處理,以下面 receiver.receive(receivedBytes)函數。異步
下面是個簡單的實現:socket
public class ChannelHandler extends SimpleChannelHandler { private ConcurrentHashMap<SocketAddress,ByteArrayOutputStream> socket2ByteArrayMap = new ConcurrentHashMap<>(); public ChannelHandler() { } public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { channels.add(e.getChannel()); ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); logger.info(e.getRemoteAddress().toString()); try { SocketAddress curSocketAddress = e.getRemoteAddress(); ByteArrayOutputStream baos = socket2ByteArrayMap.get(curSocketAddress); if(baos == null){ baos = new ByteArrayOutputStream(2000); socket2ByteArrayMap.put(curSocketAddress, baos); } baos.write(buffer.array()); } catch (IOException ie) { Thread.currentThread().interrupt(); } } public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) { logger.error("Error", event.getCause()); Channel c = context.getChannel(); c.close().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) channels.remove(future.getChannel()); else logger.error("FAILED to close channel"); } }); } @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { SocketAddress curSocketAddress = e.getChannel().getRemoteAddress(); ByteArrayOutputStream baos =socket2ByteArrayMap.remove(curSocketAddress); if(baos != null && baos.size() != 0){ byte[] receivedBytes = baos.toByteArray(); receiver.receive(receivedBytes); } super.channelClosed(ctx, e); } }
以上用socket2ByteArrayMap容器維護每一個客戶端接收到的數據,直至對方關閉爲止。 tcp
很簡單吧。。ide