[HBase] 服務端RPC機制及代碼梳理

基於版本:CDH5.4.2安全

上述版本較老,可是目前生產上是使用這個版本,因此以此爲例。app

 

 

1. 概要


 

 

說明:socket

  1. 客戶端API發送的請求將會被RPCServer的Listener線程監聽到。ide

  2. Listener線程將分配Reader給到此Channel用戶後續請求的相應。oop

  3. Reader線程將請求包裝成CallRunner實例,並將經過RpcScheduler線程根據請求屬性分類dispatch到不一樣的Executor線程。this

  4. Executor線程將會保存這個CallRunner實例到隊列。atom

  5. 每個Executor隊列都被綁定了指定個數的Handler線程進行消費,消費很簡單,即拿出隊列的CallRunner實例,執行器run()方法。spa

  6. run()方法將會組裝response到Responder線程中。線程

  7. Responder線程將會不斷地將不一樣Channel的結果返回到客戶端。debug

 

2. 代碼梳理


整體來講服務端RPC處理機制是一個生產者消費者模型。

 

2.1 組件初始化

 

  • RpcServer是在master或者regionserver啓動時候進行初始化的,關鍵代碼以下:

public HRegionServer(Configuration conf, CoordinatedStateManager csm)
     throws IOException, InterruptedException {
   this.fsOk = true;
   this.conf = conf;
   checkCodecs(this.conf);
  .....
   rpcServices.start();
  .....
  }
  • rpcServeice聲明RSRpcServices類型,爲RpcServer類的實現接口。start()方法將會啓動三個主要生產和消費 線程

      /** Starts the service.  Must be called before any calls will be handled. */
    @Override
    public synchronized void start() {
      if (started) return;
    ......
      responder.start();
      listener.start();
      scheduler.start();
      started = true;
    }

 

2.2 客戶端API請求接收和包裝

Listener經過NIO機制進行端口監聽,客戶端API鏈接服務端指定端口將會被監聽。

 

  • Listener對於API請求的接收:

    void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
     Connection c;
     ServerSocketChannel server = (ServerSocketChannel) key.channel();

     SocketChannel channel;
     while ((channel = server.accept()) != null) {
       try {
......
// 當一個API請求過來時候將會打開一個Channel,Listener將會分配一個Reader註冊。
       // reader實例個數有限,採起順序分配和複用,即一個reader可能爲多個Channel服務。
       Reader reader = getReader();
       try {
         reader.startAdd();
         SelectionKey readKey = reader.registerChannel(channel);
         // 同時也將保存這個Channel,用於後續的結果返回等
         c = getConnection(channel, System.currentTimeMillis());
         readKey.attach(c);
         synchronized (connectionList) {
           connectionList.add(numConnections, c);
           numConnections++;
......
    }
  }

上述中Reader個數是有限的而且能夠順序複用的,個數能夠經過以下參數進行設定,默認爲10個。

this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);

當生產能力不足時,能夠考慮增長此配置值。

 

  • Reader讀取請求幷包裝請求

    當Reader實例被分配到一個Channel後,它將讀取此通道過來的請求,幷包裝成CallRunner用於調度。

        void doRead(SelectionKey key) throws InterruptedException {
    ......
         try {
           // 此時將調用connection的讀取和處理方法
           count = c.readAndProcess();
          ......
        }
      }
        public int readAndProcess() throws IOException, InterruptedException {
    ......
         // 經過connectionPreambleRead標記爲判斷此連接是否爲新鏈接,若是是新的那麼須要讀取
         // 頭部報文信息,用於判斷當前連接屬性,好比是當前採起的是哪一種安全模式?
         if (!connectionPreambleRead) {
           count = readPreamble();
           if (!connectionPreambleRead) {
             return count;
          }
          ......

         count = channelRead(channel, data);
         if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
           // 實際處理請求,裏面也會根據連接的頭報文讀取時候判斷出的兩種模式進行不一樣的處理。
           process();
        }

         return count;
      }
        private void process() throws IOException, InterruptedException {
    ......
           if (useSasl) {
              // Kerberos安全模式
             saslReadAndProcess(data.array());
          } else {
              // AuthMethod.SIMPLE模式
             processOneRpc(data.array());
          }
          .......
      }

    以下以AuthMethod.SIMPLE模式爲例進行分析:

        private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
         if (connectionHeaderRead) {
           // 處理具體請求
           processRequest(buf);
        } else {
           // 再次判斷連接Header是否讀取,未讀取則取出頭報文用以肯定請求的服務和方法等。
           processConnectionHeader(buf);
           this.connectionHeaderRead = true;
           if (!authorizeConnection()) {
             throw new AccessDeniedException("Connection from " + this + " for service "
               connectionHeader.getServiceName() + " is unauthorized for user: " + user);
          }
        }
      }
      protected void processRequest(byte[] buf) throws IOException, InterruptedException {
         long totalRequestSize = buf.length;
    ......
         // 這裏將會判斷RpcServer作接收到的請求是否超過了maxQueueSize,注意這個值爲
         // RpcServer級別的變量
         if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
           final Call callTooBig =
             new Call(id, this.service, null, null, null, null, this,
               responder, totalRequestSize, null);
           ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
           setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
             "Call queue is full on " + getListenerAddress() +
             ", is hbase.ipc.server.max.callqueue.size too small?");
           responder.doRespond(callTooBig);
           return;
        }
        ......
         Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
                 totalRequestSize,
                 traceInfo);
         // 此時請求段處理結束,將請求包裝成CallRunner後發送到不一樣的Executer的隊列中去。
         scheduler.dispatch(new CallRunner(RpcServer.this, call, userProvider));
      }

    注意這個值爲 RpcServer級別的變量,默認值爲1G,超過此閾值將會出現Call queue is full錯誤。

    callQueueSize的大小會在請求接收的時候增長,在請求處理結束(調用完畢CallRunner的run方法後)減去相應值。

    this.maxQueueSize =this.conf.getInt("hbase.ipc.server.max.callqueue.size",DEFAULT_MAX_CALLQUEUE_SIZE);

 

2.3 請求轉發與調度

客戶端請求在通過接收和包裝爲CallRunner後將會被具體的Scheduler進行dispatch,master和regionserver

調度器並不相同,這裏以regionserver的調度器進行講解。具體爲:SimpleRpcScheduler。

  public RSRpcServices(HRegionServer rs) throws IOException {
    ......
   RpcSchedulerFactory rpcSchedulerFactory;
   try {
     Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
         REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
         SimpleRpcSchedulerFactory.class);
     rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());

 

  • 請求轉發

    前面已經提到請求包裝完CallRunner後由具體的RpcScheduler實現類的dispacth方法進行轉發。

    具體代碼爲:

      @Override
     public void dispatch(CallRunner callTask) throws InterruptedException {
       RpcServer.Call call = callTask.getCall();
        // 取得優先級,通常也是根據請求的內容事先定義好的一些操做做爲高優先級
       int level = priority.getPriority(call.getHeader(), call.param);
       if (priorityExecutor != null && level > highPriorityLevel) {
         // 高優先級則進入高優先級執行器內
         priorityExecutor.dispatch(callTask);
      } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
         // replication級別的進入相應的replication執行器內
         replicationExecutor.dispatch(callTask);
      } else {
         // 其餘的通常請求爲通常執行器內,大部分的請求都將落入此執行器
         callExecutor.dispatch(callTask);
      }
    }
  • 執行器介紹-隊列初始化

    在此調度器中共分爲三個級別的調度執行器:

    1. 高優先請求級執行器

    2. 通常請求執行器

    3. replication請求執行器

        private final RpcExecutor callExecutor;
       private final RpcExecutor priorityExecutor;
       private final RpcExecutor replicationExecutor;

    上述中callExecutor爲最主要通常請求執行器,在當前版本中此執行器中能夠將讀取和寫入初始化爲不一樣比例的隊列,並將handler也分紅不一樣比例進行隊列的綁定。即一個隊列上面只有被綁定的handler具體處理權限。默認的不劃分讀寫分離的場景下就只有一個隊列,全部請求都進入其中,全部的handler也將去處理這個隊列。

    具體咱們以讀寫分離隊列爲例進行代碼分析:

    float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
    int numCallQueues = Math.max(1, (int)Math.round(handlerCount * callQueuesHandlersFactor));

    LOG.info("Using " + callQueueType + " as user call queue, count=" + numCallQueues);

    if (numCallQueues > 1 && callqReadShare > 0) {
    // multiple read/write queues
    if (callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
      CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
        // 實例化RW讀取執行器,構造參數中的爲讀寫比例,其中讀取又分爲通常讀取和scan讀取比例
        // 後續將會調用重載的其餘構造方法,最終將會計算出各個讀取隊列的個數和handler的比例數
      callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues,
          callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
          BoundedPriorityBlockingQueue.class, callPriority);
    } else {

以下爲最終調用的重載構造方法:

    public RWQueueRpcExecutor(final String name, int writeHandlers, int readHandlers,
       int numWriteQueues, int numReadQueues, float scanShare,
       final Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs,
       final Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
     super(name, Math.max(writeHandlers, numWriteQueues) + Math.max(readHandlers, numReadQueues));
 
     int numScanQueues = Math.max(0, (int)Math.floor(numReadQueues * scanShare));
     int scanHandlers = Math.max(0, (int)Math.floor(readHandlers * scanShare));
     if ((numReadQueues - numScanQueues) > 0) {
       numReadQueues -= numScanQueues;
       readHandlers -= scanHandlers;
    } else {
       numScanQueues = 0;
       scanHandlers = 0;
    }
// 肯定各個主要隊列參數
     this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
     this.readHandlersCount = Math.max(readHandlers, numReadQueues);
     this.scanHandlersCount = Math.max(scanHandlers, numScanQueues);
     this.numWriteQueues = numWriteQueues;
     this.numReadQueues = numReadQueues;
     this.numScanQueues = numScanQueues;
     this.writeBalancer = getBalancer(numWriteQueues);
     this.readBalancer = getBalancer(numReadQueues);
     this.scanBalancer = getBalancer(numScanQueues);
 
     queues = new ArrayList<BlockingQueue<CallRunner>>(writeHandlersCount + readHandlersCount);
     LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount +
               " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
              ((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
                 " scanHandlers=" + scanHandlersCount));
// 初始化隊列列表,注意queues爲有序列表,以下隊列位置初始化後不會變更,在後續按照具體的請求
     // 經過具體的getBalancer方法進行查找
     for (int i = 0; i < numWriteQueues; ++i) {
       queues.add((BlockingQueue<CallRunner>)
         ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
    }
 
     for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
       queues.add((BlockingQueue<CallRunner>)
         ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
    }
  }
  • 執行器介紹--handler綁定

    當請求被分類放入不一樣的執行器隊列後,將有此隊列上被綁定的handler進行處理,handler是請求的消費者。

    以下爲RWQueueRpcExecutor類中handler綁定邏輯:

      @Override
     protected void startHandlers(final int port) {
       startHandlers(".write", writeHandlersCount, queues, 0, numWriteQueues, port);
       startHandlers(".read", readHandlersCount, queues, numWriteQueues, numReadQueues, port);
       startHandlers(".scan", scanHandlersCount, queues,
                     numWriteQueues + numReadQueues, numScanQueues, port);
    }

    具體startHandlers方法,此方法中將根據參數指定的index和size進行綁定:

      protected void startHandlers(final String nameSuffix, final int numHandlers,
         final List<BlockingQueue<CallRunner>> callQueues,
         final int qindex, final int qsize, final int port) {
       final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
       for (int i = 0; i < numHandlers; i++) {
         final int index = qindex + (i % qsize);
         Thread t = new Thread(new Runnable() {
           @Override
           public void run() {
             // 值處理指定隊列的請求
             consumerLoop(callQueues.get(index));
          }
        });
         t.setDaemon(true);
         t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
           ",queue=" + index + ",port=" + port);
         t.start();
         LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
         handlers.add(t);
      }
    }
  • 執行器介紹--handler消費

    handler的消費很簡單,不斷的讀取指定隊列的CallRunner實例,並執行CallRunner實例的run方法。

      protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
        .......
         while (running) {
           try {
             // 請求取得
             CallRunner task = myQueue.take();
             try {
               activeHandlerCount.incrementAndGet();
               // 指定callrunner的run方法
               task.run();
            .......
    }

    接着看一下CallRunner的run方法:

      public void run() {
        .......
           // 執行具體操做
           // make the call
           resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
      .......
         // Set the response for undelayed calls and delayed calls with
         // undelayed responses.
         // 將response放入實例中
         if (!call.isDelayed() || !call.isReturnValueDelayed()) {
           Message param = resultPair != null ? resultPair.getFirst() : null;
           CellScanner cells = resultPair != null ? resultPair.getSecond() : null;
           call.setResponse(param, cells, errorThrowable, error);
        }
        ........
         // call中有connection的句柄,將response放入具體connection的返回隊列中
         call.sendResponseIfReady();
    .....

 

call中有connection的句柄,將response放入具體connection的返回隊列中

  // If there is already a write in progress, we don't wait. This allows to free the handlers
 // immediately for other tasks.
 if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
   try {
     if (call.connection.responseQueue.isEmpty()) {
       // If we're alone, we can try to do a direct call to the socket. It's
       // an optimisation to save on context switches and data transfer between cores..
       if (processResponse(call)) {
         return; // we're done.
      }
       // Too big to fit, putting ahead.
       call.connection.responseQueue.addFirst(call);
       added = true; // We will register to the selector later, outside of the lock.
    }
  } finally {
     call.connection.responseWriteLock.unlock();
  }
}

 if (!added) {
   call.connection.responseQueue.addLast(call);
}
 call.responder.registerForWrite(call.connection);

 // set the serve time when the response has to be sent later
 call.timestamp = System.currentTimeMillis();

 

2.4 Response返回

CallRunner的run方法將會具體執行請求操做,並將response放入Responder實例的對應的connection的返回隊列中用於後續返回

具體爲Responder實例也是一個線程實例,它的run方法最終執行以下代碼:

 private void doAsyncWrite(SelectionKey key) throws IOException {
     Connection connection = (Connection) key.attachment();
     if (connection == null) {
       throw new IOException("doAsyncWrite: no connection");
    }
     if (key.channel() != connection.channel) {
       throw new IOException("doAsyncWrite: bad channel");
    }

     if (processAllResponses(connection)) {
       try {
         // We wrote everything, so we don't need to be told when the socket is ready for
         // write anymore.
        key.interestOps(0);
      } catch (CancelledKeyException e) {
         /* The Listener/reader might have closed the socket.
          * We don't explicitly cancel the key, so not sure if this will
          * ever fire.
          * This warning could be removed.
          */
         LOG.warn("Exception while changing ops : " + e);
      }
    }
  }

   /**

 

3. 結束語


上述介紹服務端HRegionserver端的RPC接受與處理的過程,粗粒度的介紹了代碼的結構,但願後續遇到這方面的問題時可以幫助進行代碼級別的問題定位和解決。

相關文章
相關標籤/搜索