Dubbo系列之 (七)網絡層那些事(1)

輔助連接

Dubbo系列之 (一)SPI擴展

Dubbo系列之 (二)Registry註冊中心-註冊(1)

Dubbo系列之 (三)Registry註冊中心-註冊(2)

Dubbo系列之 (四)服務訂閱(1)

Dubbo系列之 (五)服務訂閱(2)

Dubbo系列之 (六)服務訂閱(3)

Dubbo系列之 (七)鏈路層那些事(1)

在講解dubboTCP端的設計時,先了解下一些類的關係圖。它們是如何組織在一塊兒的,每一個功能又是什麼,接着在進一步深刻了解其內涵。html

類簡介

一、Exchangers(交換器工具類) 用來建立TCP服務(bind)和創建客戶端鏈接(connect)輔助類bootstrap

二、Transporters(數據流傳輸工具類)用來建立TCP服務(bind)和創建客戶端鏈接(connect)輔助類,Exchangers的底層內容依賴於Transporters,而且Transporters會根據SPI擴展,來適配合適的tcp通信框架,好比netty,mina等。服務器

三、Exchanger(交換器) 用來建立TCP連接,經過工具類Exchangers完成,該接口是一個SPI擴展,目前惟一僅有就是HeaderExchanger。從名字的含義能夠獲得,該協議是具備自定義協議頭的交換器,因此取名HeaderExchanger。多線程

四、Transporter(數據傳輸層) 用來建立TCP鏈接,經過工具類Transporters完成。它也是一個SPI擴展,好比NettyTransporter,MinaTransporter。app

五、ExchangeClient (交換器客戶端),Exchanger的connect()方法返回,即創建了TCP鏈接後,返回的客戶端,接着就是經過該客戶端與服務端通訊,實例有HeaderExchangeClient、LazyConnectExchangeClient、ReferenceCountExchangeClient。以後分別講解這3個,Exchangers工具類創建的鏈接客戶端是HeaderExchangeClient。框架

六、ExchangeServer (交換器服務端端) Exchanger的bind()方法返回,即服務端監聽的服務端實例,它監聽這某個具體的tcp端口。默認實現是HeaderExchangeServer。dom

七、RemotingServer(遠程的TCP服務端),ExchangeServer類也實現了該接口,表明其也是一個遠程服務器,具體的實現有NettyServer,由Transporter的bind()方法返回,具體的Transporter返回相應的遠程服務端。好比NettyTransporter#bind()返回NettyServer。socket

八、Client(TCP客戶端),ExchangeClient類也實現了該接口,表明其也是一個TCP客戶端,具體實現有NettyClient,由Transporter的connect()方法返回,具體的Transporter返回相應的TCP客戶端。好比NettyTransporter#connect()返回NettyClient。tcp

九、Channel (通訊通道) ,每創建一個TCP連接就相應建立一個Channel。好比Netty創建鏈接後,就有一個Channel。這裏的Channel指的是dubbo本身定義的一個channel。它與netty的channel創建關聯,經過NettyChannel類,框架操做的是NettyChannel,而NettyChannel內部持有一個netty的channel對象。ide

十、HeaderExchangeChannel(交換器Channel,ExchangeChannel屬於交換器Channel),它被HeaderExchangeClient客戶端所持有,客戶端就是經過HeaderExchangeChannel進行通訊的,HeaderExchangeChannel內部持有一個具體的Channel。

十一、ChannelHandler (通道處理器) 用來處理創建鏈接、發送請求、結束請求等操做的具體抽象。

十二、ChannelHandlers(通道處理器工具類) 主要用來包裹封裝具體的Channel,它的做用是經過消息類型,根據Dispatcher返回不一樣的

1三、Dispatcher(消息派發器)

類型 Dispatcher Channelhandler 做用
All AllDispatcher AllChannelHandler 全部的消息類型所有經過業務線程池處理
Connection ConnectionOrderedDispatcher ConnectionOrderedChannelHandler 鏈接、斷開消息單獨經過一個線程池池來處理,其餘的讀寫等消息經過業務線程池處理
Direct DirectDispatcher DirectChannelHandler 全部的消息都經過IO線程池處理,不放到業務線程池中
Execution ExecutionDispatcher ExecutionChannelHandler 請求消息在業務線程池處理,其餘消息在IO線程池。
Message MessageOnlyDispatcher MessageOnlyChannelHandler 請求和響應消息在業務線程池處理,其餘心跳,鏈接等消息在IO線程池處理

類關係圖

試一把,Netty操做--客戶端多線程,單鏈路(TCP)

一、定義傳輸消息

@Data
@ToString
public class SampleMessage {

    private String threadName;
    
    private String id;

    private String desc;
}

二、編寫編碼器

public class SampleEncoder extends MessageToByteEncoder<SampleMessage> {

    protected void encode(ChannelHandlerContext channelHandlerContext, SampleMessage sampleMessage, ByteBuf byteBuf) throws Exception {

        String threadName = sampleMessage.getThreadName();
        String id = sampleMessage.getId();
        String desc = sampleMessage.getDesc();

        byteBuf.writeInt(threadName.getBytes().length);
        byteBuf.writeBytes(threadName.getBytes());

        byteBuf.writeInt(id.getBytes().length);
        byteBuf.writeBytes(id.getBytes());


        byteBuf.writeInt(desc.getBytes().length);
        byteBuf.writeBytes(desc.getBytes());
        String str = sampleMessage.getThreadName() + ":" + sampleMessage.getDesc() + ":" + sampleMessage.getId();

        System.out.println(str);
    }
}

三、編寫解碼器

public class SampleDecoder extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

        byteBuf.markReaderIndex();

        String threadName = read(byteBuf);
        if (threadName == null) {
            byteBuf.resetReaderIndex();
            return;
        }

        String id = read(byteBuf);
        if (id == null) {
            byteBuf.resetReaderIndex();
            return;
        }

        String desc = read(byteBuf);
        if (desc == null) {
            byteBuf.resetReaderIndex();
            return;
        }

        SampleMessage sampleMessage = new SampleMessage();
        sampleMessage.setId(id);
        sampleMessage.setThreadName(threadName);
        sampleMessage.setDesc(desc);
        list.add(sampleMessage);
    }

    private String read(ByteBuf byteBuf) {
        if (canReadInt(byteBuf)) {
            int readInt = byteBuf.readInt();
            if (canReadN(byteBuf, readInt)) {
                byte[] bytes = new byte[readInt];
                byteBuf.readBytes(bytes);
                return new String(bytes);
            } 
        }
        return null;
    }


    private boolean canReadInt(ByteBuf byteBuf) {
        return canReadN(byteBuf, 4);
    }

    private boolean canReadN(ByteBuf byteBuf, int n) {
        if (!byteBuf.isReadable()) {
            return false;
        }
        return byteBuf.readableBytes() >= n;
    }
}

四、編寫消息處理器

public class PrintChannelHandlers extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof SampleMessage) {
            SampleMessage sampleMessage = (SampleMessage) msg;
            System.out.println(sampleMessage.getThreadName() + ":" + sampleMessage.getId() + ":" + sampleMessage.getDesc());
        }
    }
    
}

五、編寫服務端

public class NettyServerMain {

    public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(12))
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                               // .addLast("log",new LoggingHandler(LogLevel.INFO))
                                .addLast("decoder", new SampleDecoder())
                                .addLast("encoder", new SampleEncoder())
                                .addLast("handler", new PrintChannelHandlers());
                    }
                });

        ChannelFuture channelFuture = serverBootstrap.bind(8888);
        channelFuture.syncUninterruptibly();
        System.out.println("連接前");
        Channel channel = channelFuture.channel();
        System.out.println("連接後");
    }
}

六、編寫客戶端

public class NettyClientMain {
    public static void main(String[] args) {
        NettyClientMain nettyClientMain = new NettyClientMain();
        nettyClientMain.open();
    }

    public void open() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup(10))
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .channel(NioSocketChannel.class);

        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {

                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", new SampleDecoder())
                        .addLast("encoder", new SampleEncoder());
                //.addLast("handler", new PrintChannelHandlers());

            }
        });

        SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8888);

        ChannelFuture future = bootstrap.connect(socketAddress);
        boolean ret = future.awaitUninterruptibly(3000, MILLISECONDS);

        if (ret && future.isSuccess()) {
            Channel newChannel = future.channel();
            doProcess(newChannel);
        }
    }

    private void doProcess(Channel channel) {

        AtomicLong atomicLong = new AtomicLong();
        for (int i = 0; i < 15; i++) {
            final char ch = (char) (i + 65);
            final String id = "id" + i;
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        SampleMessage sampleMessage = new SampleMessage();
                        sampleMessage.setThreadName(Thread.currentThread().getName());
                        sampleMessage.setDesc(getdes(ch));
                        sampleMessage.setId("id" + sampleMessage.getDesc().length() + "-" + atomicLong.getAndIncrement());
                        channel.writeAndFlush(sampleMessage);
                    }
                }
            });
            t.start();
        }
    }


    private String getdes(char a) {
        Random random = new Random();
        StringBuffer buffer = new StringBuffer();
        for (int i = 0; i < random.nextInt(500) + 1; i++) {
            buffer.append(a);
        }
        return buffer.toString();
    }
}

七、測試結果

結果符合預期,dubbo 也是經過服務底層公用一條TCP連接,多線程進行調用該鏈路channel。

相關文章
相關標籤/搜索