在講解dubboTCP端的設計時,先了解下一些類的關係圖。它們是如何組織在一塊兒的,每一個功能又是什麼,接着在進一步深刻了解其內涵。html
一、Exchangers(交換器工具類) 用來建立TCP服務(bind)和創建客戶端鏈接(connect)輔助類bootstrap
二、Transporters(數據流傳輸工具類)用來建立TCP服務(bind)和創建客戶端鏈接(connect)輔助類,Exchangers的底層內容依賴於Transporters,而且Transporters會根據SPI擴展,來適配合適的tcp通信框架,好比netty,mina等。服務器
三、Exchanger(交換器) 用來建立TCP連接,經過工具類Exchangers完成,該接口是一個SPI擴展,目前惟一僅有就是HeaderExchanger。從名字的含義能夠獲得,該協議是具備自定義協議頭的交換器,因此取名HeaderExchanger。多線程
四、Transporter(數據傳輸層) 用來建立TCP鏈接,經過工具類Transporters完成。它也是一個SPI擴展,好比NettyTransporter,MinaTransporter。app
五、ExchangeClient (交換器客戶端),Exchanger的connect()方法返回,即創建了TCP鏈接後,返回的客戶端,接着就是經過該客戶端與服務端通訊,實例有HeaderExchangeClient、LazyConnectExchangeClient、ReferenceCountExchangeClient。以後分別講解這3個,Exchangers工具類創建的鏈接客戶端是HeaderExchangeClient。框架
六、ExchangeServer (交換器服務端端) Exchanger的bind()方法返回,即服務端監聽的服務端實例,它監聽這某個具體的tcp端口。默認實現是HeaderExchangeServer。dom
七、RemotingServer(遠程的TCP服務端),ExchangeServer類也實現了該接口,表明其也是一個遠程服務器,具體的實現有NettyServer,由Transporter的bind()方法返回,具體的Transporter返回相應的遠程服務端。好比NettyTransporter#bind()返回NettyServer。socket
八、Client(TCP客戶端),ExchangeClient類也實現了該接口,表明其也是一個TCP客戶端,具體實現有NettyClient,由Transporter的connect()方法返回,具體的Transporter返回相應的TCP客戶端。好比NettyTransporter#connect()返回NettyClient。tcp
九、Channel (通訊通道) ,每創建一個TCP連接就相應建立一個Channel。好比Netty創建鏈接後,就有一個Channel。這裏的Channel指的是dubbo本身定義的一個channel。它與netty的channel創建關聯,經過NettyChannel類,框架操做的是NettyChannel,而NettyChannel內部持有一個netty的channel對象。ide
十、HeaderExchangeChannel(交換器Channel,ExchangeChannel屬於交換器Channel),它被HeaderExchangeClient客戶端所持有,客戶端就是經過HeaderExchangeChannel進行通訊的,HeaderExchangeChannel內部持有一個具體的Channel。
十一、ChannelHandler (通道處理器) 用來處理創建鏈接、發送請求、結束請求等操做的具體抽象。
十二、ChannelHandlers(通道處理器工具類) 主要用來包裹封裝具體的Channel,它的做用是經過消息類型,根據Dispatcher返回不一樣的
1三、Dispatcher(消息派發器)
類型 | Dispatcher | Channelhandler | 做用 |
---|---|---|---|
All | AllDispatcher | AllChannelHandler | 全部的消息類型所有經過業務線程池處理 |
Connection | ConnectionOrderedDispatcher | ConnectionOrderedChannelHandler | 鏈接、斷開消息單獨經過一個線程池池來處理,其餘的讀寫等消息經過業務線程池處理 |
Direct | DirectDispatcher | DirectChannelHandler | 全部的消息都經過IO線程池處理,不放到業務線程池中 |
Execution | ExecutionDispatcher | ExecutionChannelHandler | 請求消息在業務線程池處理,其餘消息在IO線程池。 |
Message | MessageOnlyDispatcher | MessageOnlyChannelHandler | 請求和響應消息在業務線程池處理,其餘心跳,鏈接等消息在IO線程池處理 |
@Data @ToString public class SampleMessage { private String threadName; private String id; private String desc; }
public class SampleEncoder extends MessageToByteEncoder<SampleMessage> { protected void encode(ChannelHandlerContext channelHandlerContext, SampleMessage sampleMessage, ByteBuf byteBuf) throws Exception { String threadName = sampleMessage.getThreadName(); String id = sampleMessage.getId(); String desc = sampleMessage.getDesc(); byteBuf.writeInt(threadName.getBytes().length); byteBuf.writeBytes(threadName.getBytes()); byteBuf.writeInt(id.getBytes().length); byteBuf.writeBytes(id.getBytes()); byteBuf.writeInt(desc.getBytes().length); byteBuf.writeBytes(desc.getBytes()); String str = sampleMessage.getThreadName() + ":" + sampleMessage.getDesc() + ":" + sampleMessage.getId(); System.out.println(str); } }
public class SampleDecoder extends ByteToMessageDecoder { protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { byteBuf.markReaderIndex(); String threadName = read(byteBuf); if (threadName == null) { byteBuf.resetReaderIndex(); return; } String id = read(byteBuf); if (id == null) { byteBuf.resetReaderIndex(); return; } String desc = read(byteBuf); if (desc == null) { byteBuf.resetReaderIndex(); return; } SampleMessage sampleMessage = new SampleMessage(); sampleMessage.setId(id); sampleMessage.setThreadName(threadName); sampleMessage.setDesc(desc); list.add(sampleMessage); } private String read(ByteBuf byteBuf) { if (canReadInt(byteBuf)) { int readInt = byteBuf.readInt(); if (canReadN(byteBuf, readInt)) { byte[] bytes = new byte[readInt]; byteBuf.readBytes(bytes); return new String(bytes); } } return null; } private boolean canReadInt(ByteBuf byteBuf) { return canReadN(byteBuf, 4); } private boolean canReadN(ByteBuf byteBuf, int n) { if (!byteBuf.isReadable()) { return false; } return byteBuf.readableBytes() >= n; } }
public class PrintChannelHandlers extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof SampleMessage) { SampleMessage sampleMessage = (SampleMessage) msg; System.out.println(sampleMessage.getThreadName() + ":" + sampleMessage.getId() + ":" + sampleMessage.getDesc()); } } }
public class NettyServerMain { public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(12)) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // .addLast("log",new LoggingHandler(LogLevel.INFO)) .addLast("decoder", new SampleDecoder()) .addLast("encoder", new SampleEncoder()) .addLast("handler", new PrintChannelHandlers()); } }); ChannelFuture channelFuture = serverBootstrap.bind(8888); channelFuture.syncUninterruptibly(); System.out.println("連接前"); Channel channel = channelFuture.channel(); System.out.println("連接後"); } }
public class NettyClientMain { public static void main(String[] args) { NettyClientMain nettyClientMain = new NettyClientMain(); nettyClientMain.open(); } public void open() { Bootstrap bootstrap = new Bootstrap(); bootstrap = new Bootstrap(); bootstrap.group(new NioEventLoopGroup(10)) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .channel(NioSocketChannel.class); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", new SampleDecoder()) .addLast("encoder", new SampleEncoder()); //.addLast("handler", new PrintChannelHandlers()); } }); SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8888); ChannelFuture future = bootstrap.connect(socketAddress); boolean ret = future.awaitUninterruptibly(3000, MILLISECONDS); if (ret && future.isSuccess()) { Channel newChannel = future.channel(); doProcess(newChannel); } } private void doProcess(Channel channel) { AtomicLong atomicLong = new AtomicLong(); for (int i = 0; i < 15; i++) { final char ch = (char) (i + 65); final String id = "id" + i; Thread t = new Thread(new Runnable() { @Override public void run() { while (true) { SampleMessage sampleMessage = new SampleMessage(); sampleMessage.setThreadName(Thread.currentThread().getName()); sampleMessage.setDesc(getdes(ch)); sampleMessage.setId("id" + sampleMessage.getDesc().length() + "-" + atomicLong.getAndIncrement()); channel.writeAndFlush(sampleMessage); } } }); t.start(); } } private String getdes(char a) { Random random = new Random(); StringBuffer buffer = new StringBuffer(); for (int i = 0; i < random.nextInt(500) + 1; i++) { buffer.append(a); } return buffer.toString(); } }
結果符合預期,dubbo 也是經過服務底層公用一條TCP連接,多線程進行調用該鏈路channel。