dubbo源碼分析系列(4)dubbo通訊設計

#1 系列目錄java

#2 NIO通訊層的抽象apache

目前dubbo已經集成的有netty、mina、grizzly。先來經過案例簡單瞭解下netty、mina編程(grizzly沒有了解過)編程

##2.1 netty和mina的簡單案例服務器

netty本來是jboss開發的,後來單獨出來了,因此會有兩種版本就是org.jboss.netty和io.netty兩種包類型的,而dubbo內置的是前者。目前還不是很熟悉,可能稍有差異,可是總體大概都是同樣的。微信

咱們先來看下io.netty的案例:session

public static void main(String[] args){
	EventLoopGroup bossGroup=new NioEventLoopGroup();
	EventLoopGroup workerGroup = new NioEventLoopGroup();
	try {
		ServerBootstrap serverBootstrap=new ServerBootstrap();
		serverBootstrap.group(bossGroup,workerGroup)
			.channel(NioServerSocketChannel.class)
			.childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(new TcpServerHandler());
				}
			});
		ChannelFuture f=serverBootstrap.bind(8080).sync();
		f.channel().closeFuture().sync();
	} catch (InterruptedException e) {
		e.printStackTrace();
	}finally {  
        workerGroup.shutdownGracefully();  
        bossGroup.shutdownGracefully();  
    }  
}

mina的案例:多線程

public static void main(String[] args) throws IOException{
	IoAcceptor acceptor = new NioSocketAcceptor();
	acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(
			new TextLineCodecFactory(Charset.forName("UTF-8"),"\r\n", "\r\n")));
	acceptor.setHandler(new TcpServerHandler());  
    acceptor.bind(new InetSocketAddress(8080));  
}

二者都是使用Reactor模型結構。而最原始BIO模型以下:併發

原始BIO模型

每來一個Socket鏈接都爲該Socket建立一個線程來處理。因爲總線程數有限制,致使Socket鏈接受阻,因此BIO模型併發量並不大app

Rector多線程模型以下,更多信息見Netty系列之Netty線程模型框架

Rector多線程模型

用一個boss線程,建立Selector,用於不斷監聽Socket鏈接、客戶端的讀寫操做等

用一個線程池即workers,負責處理Selector派發的讀寫操做。

因爲boss線程能夠接收更多的Socket鏈接,同時能夠充分利用線程池中的每一個線程,減小了BIO模型下每一個線程爲單獨的socket的等待時間。

##2.2 服務器端如何集成netty和mina

先來簡單總結下上述netty和mina的類似之處,而後進行抽象歸納成接口

  • 1 各自有各自的編程啓動方式

  • 2 都須要各自的ChannelHandler實現,用於處理各自的Channel或者IoSession的鏈接、讀寫等事件。

    對於netty來講: 須要繼承org.jboss.netty.channel.SimpleChannelHandler(或者其餘方式),來處理org.jboss.netty.channel.Channel的鏈接讀寫事件

    對於mina來講:須要繼承org.apache.mina.common.IoHandlerAdapter(或者其餘方式),來處理org.apache.mina.common.IoSession的鏈接讀寫事件

爲了統一上述問題,dubbo須要作以下事情:

  • 1 定義dubbo的com.alibaba.dubbo.remoting.Channel接口

    • 1.1 針對netty,上述Channel的實現爲NettyChannel,內部含有一個netty本身的org.jboss.netty.channel.Channel channel對象,即該com.alibaba.dubbo.remoting.Channel接口的功能實現所有委託爲底層的org.jboss.netty.channel.Channel channel對象來實現

    • 1.2 針對mina,上述Channel實現爲MinaChannel,內部包含一個mina本身的org.apache.mina.common.IoSession session對象,即該com.alibaba.dubbo.remoting.Channel接口的功能實現所有委託爲底層的org.apache.mina.common.IoSession session對象來實現

  • 2 定義本身的com.alibaba.dubbo.remoting.ChannelHandler接口,用於處理com.alibaba.dubbo.remoting.Channel接口的鏈接讀寫事件,以下所示

    public interface ChannelHandler {
    
        void connected(Channel channel) throws RemotingException;
    
        void disconnected(Channel channel) throws RemotingException;
    
        void sent(Channel channel, Object message) throws RemotingException;
    
        void received(Channel channel, Object message) throws RemotingException;
    
        void caught(Channel channel, Throwable exception) throws RemotingException;
    
    }
    • 2.1 先定義用於處理netty的NettyHandler,須要按照netty的方式繼承netty的org.jboss.netty.channel.SimpleChannelHandler,此時NettyHandler就能夠委託dubbo的com.alibaba.dubbo.remoting.ChannelHandler接口實現來完成具體的功能,在交給com.alibaba.dubbo.remoting.ChannelHandler接口實現以前,須要先將netty本身的org.jboss.netty.channel.Channel channel轉化成上述的NettyChannel,見NettyHandler

      public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
          NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
          try {
              if (channel != null) {
                  channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
              }
              handler.connected(channel);
          } finally {
              NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
          }
      }
      
      @Override
      public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
          NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
          try {
              channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()));
              handler.disconnected(channel);
          } finally {
              NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
          }
      }
    • 2.2 先定義用於處理mina的MinaHandler,須要按照mina的方式繼承mina的org.apache.mina.common.IoHandlerAdapter,此時MinaHandler就能夠委託dubbo的com.alibaba.dubbo.remoting.ChannelHandler接口實現來完成具體的功能,在交給com.alibaba.dubbo.remoting.ChannelHandler接口實現以前,須要先將mina本身的org.apache.mina.common.IoSession轉化成上述的MinaChannel,見MinaHandler

      public void sessionOpened(IoSession session) throws Exception {
          MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
          try {
              handler.connected(channel);
          } finally {
              MinaChannel.removeChannelIfDisconnectd(session);
          }
      }
      
      @Override
      public void sessionClosed(IoSession session) throws Exception {
          MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
          try {
              handler.disconnected(channel);
          } finally {
              MinaChannel.removeChannelIfDisconnectd(session);
          }
      }

作了上述事情以後,所有邏輯就統一到dubbo本身的com.alibaba.dubbo.remoting.ChannelHandler接口如何來處理本身的com.alibaba.dubbo.remoting.Channel接口。

這就須要看下com.alibaba.dubbo.remoting.ChannelHandler接口的實現有哪些:

ChannelHandler接口實現

  • 3 定義Server接口用於統一你們的啓動流程

    先來看下總體的Server接口實現狀況

    Server接口實現狀況

    如NettyServer的啓動流程: 按照netty本身的API啓動方式,而後依據外界傳遞進來的com.alibaba.dubbo.remoting.ChannelHandler接口實現,建立出NettyHandler,最終對用戶的鏈接請求的處理所有交給NettyHandler來處理,NettyHandler又交給了外界傳遞進來的com.alibaba.dubbo.remoting.ChannelHandler接口實現。

    至此就將全部底層不一樣的通訊實現所有轉化到了外界傳遞進來的com.alibaba.dubbo.remoting.ChannelHandler接口的實現上了。

    而上述Server接口的另外一個分支實現HeaderExchangeServer則充當一個裝飾器的角色,爲全部的Server實現增添了以下功能:

    向該Server全部的Channel依次進行心跳檢測:

    • 若是當前時間減去最後的讀取時間大於heartbeat時間或者當前時間減去最後的寫時間大於heartbeat時間,則向該Channel發送一次心跳檢測
    • 若是當前時間減去最後的讀取時間大於heartbeatTimeout,則服務器端要關閉該Channel,若是是客戶端的話則進行從新鏈接(客戶端也會使用這個心跳檢測任務)

##2.3 客戶端如何集成netty和mina

服務器端了解了以後,客戶端就也很是清楚了,總體類圖以下:

Client接口實現狀況

如NettyClient在使用netty的API開啓客戶端以後,仍然使用NettyHandler來處理。仍是最終轉化成com.alibaba.dubbo.remoting.ChannelHandler接口實現上了。

HeaderExchangeClient和上面的HeaderExchangeServer很是相似,就再也不提了。

咱們能夠看到這樣集成完成以後,就徹底屏蔽了底層通訊細節,將邏輯所有交給了com.alibaba.dubbo.remoting.ChannelHandler接口的實現上了。從上面咱們也能夠看到,該接口實現也會通過層層裝飾類的包裝,纔會最終交給底層通訊。

如HeartbeatHandler裝飾類:

public void sent(Channel channel, Object message) throws RemotingException {
    setWriteTimestamp(channel);
    handler.sent(channel, message);
}

public void received(Channel channel, Object message) throws RemotingException {
    setReadTimestamp(channel);
    if (isHeartbeatRequest(message)) {
        Request req = (Request) message;
        if (req.isTwoWay()) {
            Response res = new Response(req.getId(), req.getVersion());
            res.setEvent(Response.HEARTBEAT_EVENT);
            channel.send(res);
            if (logger.isInfoEnabled()) {
                int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                if(logger.isDebugEnabled()) {
                    logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                    + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                }
            }
        }
        return;
    }
    if (isHeartbeatResponse(message)) {
        if (logger.isDebugEnabled()) {
        	logger.debug(
                new StringBuilder(32)
                    .append("Receive heartbeat response in thread ")
                    .append(Thread.currentThread().getName())
                    .toString());
        }
        return;
    }
    handler.received(channel, message);
}

就會攔截那些上述提到的心跳檢測請求。更新該Channel的最後讀寫時間。

##2.4 同步調用和異步調用的實現

首先設想一下咱們目前的通訊方式,使用netty mina等異步事件驅動的通訊框架,將Channel中信息都分發到Handler中去處理了,Handler中的send方法只負責不斷的發送消息,receive方法只負責不斷接收消息,這時候就產生一個問題:

客戶端如何對應同一個Channel的接收的消息和發送的消息之間的匹配呢?

這也很簡單,就須要在發送消息的時候,必需要產生一個請求id,將調用的信息連同id一塊兒發給服務器端,服務器端處理完畢後,再將響應信息和上述請求id一塊兒發給客戶端,這樣的話客戶端在接收到響應以後就能夠根據id來判斷是針對哪次請求的響應結果了。

來看下DubboInvoker中的具體實現

boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
	boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
    currentClient.send(inv, isSent);
    RpcContext.getContext().setFuture(null);
    return new RpcResult();
} else if (isAsync) {
	ResponseFuture future = currentClient.request(inv, timeout) ;
    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
    return new RpcResult();
} else {
	RpcContext.getContext().setFuture(null);
    return (Result) currentClient.request(inv, timeout).get();
}
  • 若是不須要返回值,直接使用send方法,發送出去,設置當期和線程綁定RpcContext的future爲null
  • 若是須要異步通訊,使用request方法構建一個ResponseFuture,而後設置到和線程綁定RpcContext中
  • 若是須要同步通訊,使用request方法構建一個ResponseFuture,阻塞等待請求完成

能夠看到的是它把ResponseFuture設置到與當前線程綁定的RpcContext中了,若是咱們要獲取異步結果,則須要經過RpcContext來獲取當前線程綁定的RpcContext,而後就能夠獲取Future對象。以下所示:

String result1 = helloService.hello("World");
System.out.println("result :"+result1);
System.out.println("result : "+RpcContext.getContext().getFuture().get());

當設置成異步請求的時候,result1則爲null,而後經過RpcContext來獲取相應的值。

而後咱們來看下異步請求的整個實現過程,即上述currentClient.request方法的具體內容:

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try{
        channel.send(req);
    }catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}
  • 第一步:建立出一個request對象,建立過程當中就自動產生了requestId,以下

    public class Request {
    	private final long    mId;
    	private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    
    	public Request() {
            mId = newId();
        }
    
    	private static long newId() {
            // getAndIncrement()增加到MAX_VALUE時,再增加會變爲MIN_VALUE,負數也能夠作爲ID
            return INVOKE_ID.getAndIncrement();
        }
    }
  • 第二步:根據request請求封裝成一個DefaultFuture對象,經過該對象的get方法就能夠獲取到請求結果。該方法會阻塞一直到請求結果產生。同時DefaultFuture對象會被存至DefaultFuture類以下結構中:

    private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();

    key就是請求id

  • 第三步:將上述請求對象發送給服務器端,同時將DefaultFuture對象返給上一層函數,即DubboInvoker中,而後設置到當前線程中

  • 第四步:用戶經過RpcContext來獲取上述DefaultFuture對象來獲取請求結果,會阻塞至服務器端返產生結果給客戶端

  • 第五步:服務器端產生結果,返回給客戶端會在客戶端的handler的receive方法中接收到,接收到以後判別接收的信息是Response後,

    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

    就會根據response的id從上述FUTURES結構中查出對應的DefaultFuture對象,並把結果設置進去。此時DefaultFuture的get方法則再也不阻塞,返回剛剛設置好的結果。

至此異步通訊大體就瞭解了,可是咱們會發現一個問題:

當某個線程屢次發送異步請求時,都會將返回的DefaultFuture對象設置到當前線程綁定的RpcContext中,就會形成了覆蓋問題,以下調用方式:

String result1 = helloService.hello("World");
String result2 = helloService.hello("java");
System.out.println("result :"+result1);
System.out.println("result :"+result2);
System.out.println("result : "+RpcContext.getContext().getFuture().get());
System.out.println("result : "+RpcContext.getContext().getFuture().get());

即異步調用了hello方法,再次異步調用,則前一次的結果就被沖掉了,則就沒法獲取前一次的結果了。必需要調用一次就立馬將DefaultFuture對象獲取走,以避免被沖掉。即這樣寫:

String result1 = helloService.hello("World");
Future<String> result1Future=RpcContext.getContext().getFuture();
String result2 = helloService.hello("java");
Future<String> result2Future=RpcContext.getContext().getFuture();
System.out.println("result :"+result1);
System.out.println("result :"+result2);
System.out.println("result : "+result1Future.get());
System.out.println("result : "+result2Future.get());

最後來張dubbo的解釋圖片:

同步轉異步

#3 通訊層與dubbo的結合

從上面能夠了解到如何對不一樣的通訊框架進行抽象,屏蔽底層細節,統一將邏輯交給ChannelHandler接口實現來處理。而後咱們就來了解下如何與dubbo的業務進行對接,也就是在什麼時機來使用上述通訊功能:

##3.1 服務的發佈過程使用通訊功能

如DubboProtocol在發佈服務的過程當中:

  • 1 DubboProtocol中有一個以下結構

    Map<String, ExchangeServer> serverMap

    在發佈一個服務的時候會先根據服務的url獲取要發佈的服務所在的host和port,以此做爲key來從上述結構中尋找是否已經有對應的ExchangeServer(上面已經說明)。

  • 2 若是沒有的話,則會建立一個,建立過程以下:

    ExchangeServer server = Exchangers.bind(url, requestHandler);

    其中requestHandler就是DubboProtocol自身實現的ChannelHandler。

    獲取一個ExchangeServer,它的實現主要是Server的裝飾類,依託外部傳遞的Server來實現Server功能,而本身加入一些額外的功能,如ExchangeServer的實現HeaderExchangeServer,就是加入了心跳檢測的功能。

    因此此時咱們能夠自定義擴展功能來實現Exchanger。接口定義以下:

    @SPI(HeaderExchanger.NAME)
    public interface Exchanger {
    
        @Adaptive({Constants.EXCHANGER_KEY})
        ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
    
        @Adaptive({Constants.EXCHANGER_KEY})
        ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
    
    }

    默認使用的就是HeaderExchanger,它建立的ExchangeServer是HeaderExchangeServer以下所示:

    public class HeaderExchanger implements Exchanger {
    
        public static final String NAME = "header";
    
        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    
        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    
    }

    HeaderExchangeServer僅僅是一個Server接口的裝飾類,須要依託外部傳遞Server實現來完成具體的功能。此Server實現能夠是netty也能夠是mina等。因此咱們能夠自定義Transporter實現來選擇不一樣底層通訊框架,接口定義以下:

    @SPI("netty")
    public interface Transporter {
    
        @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
        Server bind(URL url, ChannelHandler handler) throws RemotingException;
    
        @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
        Client connect(URL url, ChannelHandler handler) throws RemotingException;
    
    }

    默認採用netty實現,以下:

    public class NettyTransporter implements Transporter {
    
        public static final String NAME = "netty";
    
        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }
    
        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }
    
    }

    至此就到了咱們上文介紹的內容了。同時DubboProtocol的ChannelHandler實現通過層層裝飾器包裝,最終傳給底層通訊Server。

    客戶端發送請求給服務器端時,底層通訊Server會將請求通過層層處理最終傳遞給DubboProtocol的ChannelHandler實現,在該實現中,會根據請求參數找到對應的服務器端本地Invoker,而後執行,再將返回結果經過底層通訊Server發送給客戶端。

##3.2 客戶端的引用服務使用通訊功能

在DubboProtocol引用服務的過程當中:

  • 1 使用以下方式建立client

    ExchangeClient client=Exchangers.connect(url ,requestHandler);

    requestHandler仍是DubboProtocol中ChannelHandler實現。

    和Server相似,咱們能夠經過自定義Exchanger實現來建立出不一樣功能的ExchangeClient。默認的Exchanger實現是HeaderExchanger

    public class HeaderExchanger implements Exchanger {
    
        public static final String NAME = "header";
    
        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    
        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    
    }

    建立出來的ExchangeClient是HeaderExchangeClient,它也是Client的包裝類,僅僅在Client外層加上心跳檢測的功能,向它所鏈接的服務器端發送心跳檢測。

    HeaderExchangeClient須要外界給它傳一個Client實現,這是由Transporter接口實現來定的,默認是NettyTransporter

    public class NettyTransporter implements Transporter {
    
        public static final String NAME = "netty";
    
        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyServer(url, listener);
        }
    
        public Client connect(URL url, ChannelHandler listener) throws RemotingException {
            return new NettyClient(url, listener);
        }
    
    }

    建立出來的的Client實現是NettyClient。

    同時DubboProtocol的ChannelHandler實現通過層層裝飾器包裝,最終傳給底層通訊Client。

    客戶端的DubboInvoker調用遠程服務的時候,會將調用信息經過ExchangeClient發送給服務器端,而後返回一個ResponseFuture,根據客戶端選擇的同步仍是異步方式,來決定阻塞仍是直接返回,這一部分在上文同步調用和異步調用的實現中已經詳細說過了。

#4 結束語

本篇文章主要介紹了集成netty和mina的那一塊的通訊接口及實現的設計,下篇主要介紹編解碼的過程

歡迎關注微信公衆號:乒乓狂魔

微信公衆號

相關文章
相關標籤/搜索