Dubbo線程模型和調度策略

1、服務調用

首先服務消費者經過代理對象 Proxy 發起遠程調用,接着經過網絡客戶端 Client 將編碼後的請求發送給服務提供方的網絡層上,也就是 Server。Server 在收到請求後,首先要作的事情是對數據包進行解碼。而後將解碼後的請求發送至分發器 Dispatcher,再由分發器將請求派發到指定的線程池上,最後由線程池調用具體的服務。這就是一個遠程調用請求的發送與接收過程。html

那麼在dubbo中請求是如何派發的?以及線程模型是什麼樣的那?算法

2、I/O線程和業務線程分離

  • 若是事件處理的邏輯能迅速完成,而且不會發起新的 IO請求,好比只是在內存中記個標識,則直接在 IO線程上處理更快,由於減小了線程池調度。數據庫

  • 但若是事件處理邏輯較慢,或者須要發起新的 IO 請求,好比須要查詢數據庫,則必須派發到線程池,不然 IO 線程阻塞,將致使不能接收其它請求。apache

  • 若是用 IO 線程處理事件,又在事件處理過程當中發起新的 IO 請求,好比在鏈接事件中發起登陸請求,會報「可能引起死鎖」異常,但不會真死鎖。bootstrap

因此在真實的業務場景中是須要將業務線程和I/O線程進行分離處理的。dubbo做爲一個服務治理框架,底層的採用Netty做爲網絡通訊的組件,在請求派發的時候支持不一樣的派發策略。緩存

參考文章:www.cnblogs.com/my_life/art…bash

3、請求派發策略

鏈接創建

派發策略

從官方描述來看,duboo支持五種派發策略,下面看下是如何實現的。以Ntty4.x爲例:網絡

  1. NettyServer
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
         super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
     }
    複製代碼
  2. ChannelHandlers#wrapInternal
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
         // 選擇調度策略 默認是all
         return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                 .getAdaptiveExtension().dispatch(handler, url))); 
     }
    複製代碼
    在NettyServer的構造方法中經過ChannelHandlers#wrap方法設置MultiMessageHandlerHeartbeatHandler並經過SPI擴展選擇調度策略。
  3. NettyServer#doOpen
protected void doOpen() throws Throwable {
      bootstrap = new ServerBootstrap();
      // 多線程模型
      // boss線程池,負責和消費者創建新的鏈接
      bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
      // worker線程池,負責鏈接的數據交換
      workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
              new DefaultThreadFactory("NettyServerWorker", true));

      final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
      channels = nettyServerHandler.getChannels();

      bootstrap.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // nagele 算法
              .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TIME_WAIT
              .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //內存池
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {
                      NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                      ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                              .addLast("decoder", adapter.getDecoder()) //設置編解碼器
                              .addLast("encoder", adapter.getEncoder())
                              .addLast("handler", nettyServerHandler);
                  }
              });
      // bind 端口
      ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
      channelFuture.syncUninterruptibly();
      channel = channelFuture.channel();

  }
複製代碼

設置Netty的boss線程池數量爲1,worker線程池(也就是I/O線程)爲cpu核心數+1和向Netty中注測Handler用於消息的編解碼和處理。多線程

若是咱們在一個JVM進程只暴露一個Dubbo服務端口,那麼一個JVM進程只會有一個NettyServer實例,也會只有一個NettyHandler實例。而且設置了三個handler,用來處理編解碼、鏈接的建立、消息讀寫等。在dubbo內部定義了一個ChannelHandler用來和Netty的Channel關聯起來,經過上述的代碼會發現NettyServer自己也是一個ChannelHandler。經過NettyServer#doOpen暴露服務端口後,客戶端就能和服務端創建鏈接了,而提供者在初始化鏈接後會調用NettyHandler#channelActive方法來建立一個NettyChannel併發

  1. NettyChannel
public void channelActive(ChannelHandlerContext ctx) throws Exception {
      logger.debug("channelActive <" + NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()) + ">" + " channle <" + ctx.channel());
      //獲取或者建立一個NettyChannel
      NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
      try {
          if (channel != null) {
              // <ip:port, channel>
              channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
          }
          // 這裏的 handler就是NettyServer
          handler.connected(channel);
      } finally {
          NettyChannel.removeChannelIfDisconnected(ctx.channel());
      }
  }
複製代碼

與Netty和Dubbo都有本身的ChannelHandler同樣,Netty和Dubbo也有着本身的Channel。該方法最後會調用NettyServer#connected方法來檢查新添加channel後是否會超出提供者配置的accepts配置,若是超出,則直接打印錯誤日誌並關閉該Channel,這樣的話消費者端天然會收到鏈接中斷的異常信息,詳細能夠見AbstractServer#connected方法。

  1. AbstractServer#connected
public void connected(Channel ch) throws RemotingException {
     // If the server has entered the shutdown process, reject any new connection
     if (this.isClosing() || this.isClosed()) {
         logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
         ch.close();
         return;
     }

     Collection<Channel> channels = getChannels();
     //大於accepts的tcp鏈接直接關閉
     if (accepts > 0 && channels.size() > accepts) { 
         logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
         ch.close();
         return;
     }
     super.connected(ch);
 }
複製代碼
  • 在dubbo中消費者和提供者默認只創建一個TCP長鏈接(詳細代碼請參考官網源碼導讀,服務引用一節),爲了增長消費者調用服務提供者的吞吐量,能夠在消費方增長以下配置:
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" connections="20"/>
複製代碼
  • 提供者可使用accepts控制長鏈接的數量防止鏈接數量過多,配置以下:
<dubbo:protocol name="dubbo" port="20880" accepts="10"/>
複製代碼

請求接收

當鏈接創建完成後,消費者就能夠請求提供者的服務了,當請求到來,提供者這邊會依次通過以下Handler的處理:

--->NettyServerHandler#channelRead:接收請求消息。

--->AbstractPeer#received:若是服務已經關閉,則返回,不然調用下一個Handler來處理。

--->MultiMessageHandler#received:若是是批量請求,則依次對請求調用下一個Handler來處理。

--->HeartbeatHandler#received: 處理心跳消息。

--->AllChannelHandler#received:該Dubbo的Handler很是重要,由於從這裏是IO線程池和業務線程池的隔離。

--->DecodeHandler#received: 消息解碼。

--->HeaderExchangeHandler#received:消息處理。

--->DubboProtocol : 調用服務。

  1. AllChannelHandler#received
public void received(Channel channel, Object message) throws RemotingException {
       // 獲取業務線程池
       ExecutorService cexecutor = getExecutorService();
       try {
           // 使用線程池處理消息
           cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
       } catch (Throwable t) {
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
       }
   }
複製代碼

這裏對execute進行了異常捕獲,這是由於I/O線程池是無界的,但業務線程池多是有界的,因此進行execute提交可能會遇到RejectedExecutionException異常 。

那麼這裏是如何獲取到業務線程池的那?實際上WrappedChannelHandlerxxxChannelHandlerd的裝飾類,根據dubbo spi能夠知道,獲取AllChannelHandler會首先實例化WrappedChannelHandler

  1. WrappedChannelHandler
public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        // 獲取業務線程池
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

複製代碼

線程模型

  1. FixedThreadPool
public class FixedThreadPool implements ThreadPool {

  @Override
  public Executor getExecutor(URL url) {
      // 線程池名稱DubboServerHanler-server:port
      String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
      // 缺省線程數量200
      int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
      // 任務隊列類型
      int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

      return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
              queues == 0 ? new SynchronousQueue<Runnable>() :
                      (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                              : new LinkedBlockingQueue<Runnable>(queues)),
              new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
  }

}
複製代碼

缺省狀況下使用200個線程和SynchronousQueue這意味着若是若是線程池全部線程都在工做再有新任務會直接拒絕。

  1. CachedThreadPool
public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 核心線程數量 缺省爲0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 最大線程數量 缺省爲Integer.MAX_VALUE
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        // queue 缺省爲0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 空閒線程存活時間
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
複製代碼

緩存線程池,能夠看出若是提交任務的速度大於maxThreads將會不斷建立線程,極端條件下將會耗盡CPU和內存資源。在突發大流量進入時不適合使用。

  1. LimitedThreadPool
public class  LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 缺省核心線程數量爲0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 缺省最大線程數量200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 任務隊列缺省0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}
複製代碼

不配置的話和FixedThreadPool沒有區別。

  1. EagerThreadPool
public class EagerThreadPool implements ThreadPool {

   @Override
   public Executor getExecutor(URL url) {
       String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
       // 0
       int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
       // Integer.MAX_VALUE
       int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
       // 0
       int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
       // 60s
       int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

       // init queue and executor
       // 初始任務隊列爲1
       TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
       EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
               threads,
               alive,
               TimeUnit.MILLISECONDS,
               taskQueue,
               new NamedInternalThreadFactory(name, true),
               new AbortPolicyWithReport(name, url));
       taskQueue.setExecutor(executor);
       return executor;
   }
}
複製代碼

EagerThreadPoolExecutor

public void execute(Runnable command) {
       if (command == null) {
           throw new NullPointerException();
       }
       // do not increment in method beforeExecute!
       //已提交任務數量
       submittedTaskCount.incrementAndGet();
       try {
           super.execute(command);
       } catch (RejectedExecutionException rx) { //大於最大線程數被拒絕任務 從新添加到任務隊列
           // retry to offer the task into queue.
           final TaskQueue queue = (TaskQueue) super.getQueue();
           try {
               if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                   submittedTaskCount.decrementAndGet();
                   throw new RejectedExecutionException("Queue capacity is full.", rx);
               }
           } catch (InterruptedException x) {
               submittedTaskCount.decrementAndGet();
               throw new RejectedExecutionException(x);
           }
       } catch (Throwable t) {
           // decrease any way
           submittedTaskCount.decrementAndGet();
           throw t;
       }
   }
複製代碼

TaskQueue

public boolean offer(Runnable runnable) {
       if (executor == null) {
           throw new RejectedExecutionException("The task queue does not have executor!");
       }
       // 獲取當前線程池中的線程數量
       int currentPoolThreadSize = executor.getPoolSize();
       // have free worker. put task into queue to let the worker deal with task.
       // 若是已經提交的任務數量小於當前線程池中線程數量(不是很理解這裏的操做)
       if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
           return super.offer(runnable);
       }

       // return false to let executor create new worker.
       //當前線程數小於最大線程程數直接建立新worker
       if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
           return false;
       }

       // currentPoolThreadSize >= max
       return super.offer(runnable);
   }
複製代碼

優先建立Worker線程池。在任務數量大於corePoolSize可是小於maximumPoolSize時,優先建立Worker來處理任務。當任務數量大於maximumPoolSize時,將任務放入阻塞隊列中。阻塞隊列充滿時拋出RejectedExecutionException。(相比於cached:cached在任務數量超過maximumPoolSize時直接拋出異常而不是將任務放入阻塞隊列)。

根據以上的代碼分析,若是消費者的請求過快頗有可能致使服務提供者業務線程池拋出RejectedExecutionException異常。這個異常是duboo的採用的線程拒絕策略AbortPolicyWithReport#rejectedExecution拋出的,而且會被反饋到消費端,此時簡單的解決辦法就是將提供者的服務調用線程池數目調大點,例如以下配置:

<dubbo:provider threads="500"/>
或
<dubbo:protocol name="dubbo" port="20882" accepts="10" threads="500"/>
複製代碼

爲了保證模塊內的主要服務有線程可用(防止次要服務搶佔過多服務調用線程),能夠對次要服務進行併發限制,例如:

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" executes="100"/>
複製代碼

dubbo的dispatcher 策略默認是all,實際上比較好的處理方式是I/O線程和業務線程分離,因此採起message是比較好得配置。而且若是採用all若是使用的dubo版本比較低頗有可能會觸發dubbo的bug。一旦業務線程池滿了,將拋出執行拒絕異常,將進入caught方法來處理,而該方法使用的仍然是業務線程池,因此頗有可能這時業務線程池仍是滿的,致使下游的一個HeaderExchangeHandler沒機會調用,而異常處理後的應答消息正是HeaderExchangeHandler#caught來完成的,因此最後NettyHandler#writeRequested沒有被調用,Consumer只能死等到超時,沒法收到Provider的線程池打滿異常(2.6.x已經修復該問題)。

  • 推薦配置
<dubbo:protocol name="dubbo" port="8888" threads="500" dispatcher="message" />
複製代碼

參考文章:manzhizhen.iteye.com/blog/239117…

相關文章
相關標籤/搜索