從源碼中分析Hadoop的RPC機制

RPC是Remote Procedure Call(遠程過程調用)的簡稱,這一機制都要面對兩個問題
java

  • 對象調用方式;node

  • 序列/反序列化機制web

在此以前,咱們有必要了解什麼是架構層次的協議。通俗一點說,就是我把某些接口和接口中的方法稱爲協議,客戶端和服務端只要實現這些接口中的方法就能夠進行通訊了,從這個角度來講,架構層次協議的說法就能夠成立了。Hadoop的RPC機制正是採用了這種「架構層次的協議」,有一整套做爲協議的接口,以下圖
sql




Hadoop的RPC組件,依賴於Hadoop Writable接口類型的支持,要求每一個實現類都要確保將本類的對象正確序列化與反序列化。所以RPC使用Java動態代理與反射實現對象調用方式,客戶端到服務器數據的序列化與反序列化由Hadoop框架或用戶本身來實現,也就是數據組裝時定製的。RPC架構圖以下



動態代理

主要用來作方法的加強,讓你能夠在不修改源碼的狀況下,加強一些方法,在方法執行先後作任何你想作的事情(甚至根本不去執行這個方法),由於在InvocationHandler的invoke方法中,你能夠直接獲取正在調用方法對應的Method對象,具體應用的話,好比能夠添加調用日誌,作事務控制等。apache

這個接口的實現部署在其它服務器上,在編寫客戶端代碼的時候,沒辦法直接調用接口方法,由於接口是不能直接生成對象的,這個時候就能夠考慮代理模式(動態代理)了,經過Proxy.newProxyInstance代理一個該接口對應的InvocationHandler對象,而後在InvocationHandler的invoke方法內封裝通信細節就能夠了。具體的應用,最經典的固然是Java標準庫的RMI,其它好比hessian,各類webservice框架中的遠程調用,大體都是這麼實現的。緩存

VersionedProtocol接口

VersionedProtocol是全部RPC協議接口的父接口,其中只有一個方法:getProtocolVersion()服務器

HDFS相關

  • ClientDatanodeProtocol:一個客戶端和datanode之間的協議接口,用於數據塊恢復。markdown

  • ClientProtocol:client與Namenode交互的接口,全部控制流的請求均在這裏,如:建立文件、刪除文件等;網絡

  • DatanodeProtocol : Datanode與Namenode交互的接口,如心跳、blockreport等;
    NamenodeProtocol:SecondaryNode與Namenode交互的接口。架構

Mapreduce相關

  • InterDatanodeProtocol:Datanode內部交互的接口,用來更新block的元數據;

  • InnerTrackerProtocol:TaskTracker與JobTracker交互的接口,功能與DatanodeProtocol類似;

  • JobSubmissionProtocol:JobClient與JobTracker交互的接口,用來提交Job、得到Job等與Job相關的操做;

  • TaskUmbilicalProtocol:Task中子進程與母進程交互的接口,子進程即map、reduce等操做,母進程即TaskTracker,該接口能夠回報子進程的運行狀態(詞彙掃盲: umbilical 臍帶的, 關係親密的) 。

RPC實現流程

簡單來講,Hadoop RPC=動態代理+定製的二進制流。分佈式對象通常都會要求根據接口生成存根和框架。如 CORBA,能夠經過 IDL,生成存根和框架。在ipc.RPC類中有一些內部類,下邊簡單介紹下

  • Invocation:用於封裝方法名和參數,做爲數據傳輸層,至關於VO吧。

  • ClientCache:用於存儲client對象,用socket factory做爲hash key,存儲結構爲hashMap <SocketFactory, Client>

  • Invoker:是動態代理中的調用實現類,繼承了InvocationHandler.

  • Server:是ipc.Server的實現類。咱們就須要這樣的步驟了。

上類圖




從以上的分析能夠知道,Invocation類僅做爲VO,ClientCache類只是做爲緩存,而Server類用於服務端的處理,他們都和客戶端的數據流和業務邏輯沒有關係。爲了分析 Invoker,咱們須要介紹一些 Java 反射實現 Dynamic Proxy 的背景。

Dynamic Proxy 是由兩個 class 實現的:java.lang.reflect.Proxyjava.lang.reflect.InvocationHandler,後者是一個接口。

所謂 Dynamic Proxy 是這樣一種 class:它是在運行時生成的 class,在生成它時你必須提供一組 interface 給它,而後該 class就宣稱它實現了這些 interface。

這個 Dynamic Proxy 其實就是一個典型的 Proxy 模式,它丌會替你做實質性的工做,在生成它的實例時你必須提供一個handler,由它接管實際的工做。

這個 handler,在 Hadoop 的 RPC 中,就是 Invoker 對象。
咱們能夠簡單地理解:就是你能夠經過一個接口來生成一個類,這個類上的全部方法調用,都會傳遞到你生成類時傳遞的
InvocationHandler 實現中。

在 Hadoop 的 RPC 中,Invoker 實現了 InvocationHandler 的 invoke 方法(invoke 方法也是 InvocationHandler 的惟一方法)。 Invoker 會把全部跟此次調用相關的調用方法名,參數類型列表,參數列表打包,而後利用前面咱們分析過的 Client,經過 socket 傳遞到服務器端。就是說,你在 proxy 類上的任何調用,都經過 Client 發送到遠方的服務器上。

Invoker 使用 Invocation。 Invocation 封裝了一個過程調用的全部相關信息,它的主要屬性有: methodName,調用方法名,parameterClasses,調用方法參數的類型列表和 parameters,調用方法參數。注意,它實現了 Writable 接口,能夠串行化。

RPC.Server 實現了 org.apache.hadoop.ipc.Server,你能夠把一個對象,經過 RPC,升級成爲一個服務器。服務器接收到的請求(經過 Invocation),解串行化之後,就發成了方法名,方法參數列表和參數列表。調用 Java 反射,咱們就能夠調用對應的對象的方法。調用的結果再經過 socket,迒回給客戶端,客戶端把結果解包後,就能夠返回給Dynamic Proxy 的使用者了。

咱們接下來去研究的就是RPC.Invoker類中的invoke()方法了,代碼以下

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { …… ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); …… return value.get(); }

通常咱們看到的動態代理的invoke()方法中總會有 method.invoke(ac, arg); 這句代碼。而上面代碼中卻沒有。其實使用 method.invoke(ac, arg); 是在本地JVM中調用;而在hadoop中,是將數據發送給服務端,服務端將處理的結果再返回給客戶端,因此這裏的invoke()方法必然須要進行網絡通訊。而網絡通訊就是下面的這段代碼實現的:

ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);

Invocation類在這裏封裝了方法名和參數,充當VO。其實這裏網絡通訊只是調用了Client類的call()方法。

ipc.Client源碼

接下來分析一下ipc.Client源碼,在此以前咱們得明確下咱們的目標,總結出瞭如下幾個問題

  • 客戶端和服務端的鏈接是怎樣創建的?

  • 客戶端是怎樣給服務端發送數據的?

  • 客戶端是怎樣獲取服務端的返回數據的?

基於這三個問題,咱們開始分析ipc.Client源碼,主要包含如下幾個類

  • Call:用於封裝Invocation對象,做爲VO,寫到服務端,同時也用於存儲從服務端返回的數據。

  • Connection:用以處理遠程鏈接對象。繼承了Thread

  • ConnectionId:惟一肯定一個鏈接

Question1:客戶端和服務端的鏈接是怎樣創建的?

Client類中的cal()方法以下

public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param);       //將傳入的數據封裝成call對象 Connection connection = getConnection(remoteId, call);   //得到一個鏈接
    connection.sendParam(call);     // 向服務端發送call對象 boolean interrupted = false;
    synchronized (call) { while (!call.done) { try { call.wait(); // 等待結果的返回,在Call類的callComplete()方法裏有notify()方法用於喚醒線程 } catch (InterruptedException ie) { // 因中斷異常而終止,設置標誌interrupted爲true interrupted = true;
            }
        }
        if (interrupted)
        {
            Thread.currentThread().interrupt();
        }

        if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace();
                throw call.error;
            }
            else     // 本地異常
            {
                throw wrapException(remoteId.getAddress(), call.error);
            }
        }
        else
        {
            return call.value; //返回結果數據
        }
    }
}

具體代碼的做用我已作了註釋,因此這裏再也不贅述。分析代碼後,咱們會發現和網絡通訊有關的代碼只會是下面的兩句了:

Connection connection = getConnection(remoteId, call);   //得到一個鏈接
  connection.sendParam(call);      // 向服務端發送call對象 

先看看是怎麼得到一個到服務端的鏈接吧,下面貼出ipc.Client類中的getConnection()方法。

private Connection getConnection(ConnectionId remoteId,
                                 Call call)
throws IOException, InterruptedException
{
    if (!running.get())
    {
        // 若是client關閉了
        throw new IOException("The client is stopped");
    }
    Connection connection;
    //若是connections鏈接池中有對應的鏈接對象,就不需從新建立了;若是沒有就需從新建立一個鏈接對象。
    //但請注意,該//鏈接對象只是存儲了remoteId的信息,其實還並無和服務端創建鏈接。
    do
    {
        synchronized (connections)
        {
            connection = connections.get(remoteId);
            if (connection == null)
            {
                connection = new Connection(remoteId);
                connections.put(remoteId, connection);
            }
        }
    }
    while (!connection.addCall(call));   //將call對象放入對應鏈接中的calls池,就不貼出源碼了
    //這句代碼纔是真正的完成了和服務端創建鏈接哦~
    connection.setupIOstreams();
    return connection;
}

Client.Connection類中的setupIOstreams()方法以下:

private synchronized void setupIOstreams() throws InterruptedException
{
    ……
    try
    {
        ……
        while (true)
        {
            setupConnection();  //創建鏈接
            InputStream inStream = NetUtils.getInputStream(socket);     //得到輸入流
            OutputStream outStream = NetUtils.getOutputStream(socket);  //得到輸出流
            writeRpcHeader(outStream);
            ……
            this.in = new DataInputStream(new BufferedInputStream
                                          (new PingInputStream(inStream)));   //將輸入流裝飾成DataInputStream
            this.out = new DataOutputStream
            (new BufferedOutputStream(outStream));   //將輸出流裝飾成DataOutputStream
            writeHeader();
            // 跟新活動時間
            touch();
            //當鏈接創建時,啓動接受線程等待服務端傳回數據,注意:Connection繼承了Tread
            start();
            return;
        }
    }
    catch (IOException e)
    {
        markClosed(e);
        close();
    }
}

再有一步咱們就知道客戶端的鏈接是怎麼創建的啦,下面貼出Client.Connection類中的setupConnection()方法

private synchronized void setupConnection() throws IOException {
      short ioFailures = 0;
      short timeoutFailures = 0;
      while (true) {
        try {
          this.socket = socketFactory.createSocket(); //終於看到建立socket的方法了
          this.socket.setTcpNoDelay(tcpNoDelay);
         ……
          // 設置鏈接超時爲20s
          NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
          this.socket.setSoTimeout(pingInterval);
          return;
        } catch (SocketTimeoutException toe) {
          /* 設置最多鏈接重試爲45次。 * 總共有20s*45 = 15 分鐘的重試時間。 */
          handleConnectionFailure(timeoutFailures++, 45, toe);
        } catch (IOException ie) {
          handleConnectionFailure(ioFailures++, maxRetries, ie);
        }
      }
    }

不難看出客戶端的鏈接是建立一個普通的socket進行通訊的。

Question2:客戶端是怎樣給服務端發送數據的?

Client.Connection類的sendParam()方法以下

public void sendParam(Call call) {
      if (shouldCloseConnection.get()) {
        return;
      }
      DataOutputBuffer d=null;
      try {
        synchronized (this.out) {
          if (LOG.isDebugEnabled())
            LOG.debug(getName() + " sending #" + call.id);
          //建立一個緩衝區
          d = new DataOutputBuffer();
          d.writeInt(call.id);
          call.param.write(d);
          byte[] data = d.getData();
          int dataLength = d.getLength();
          out.writeInt(dataLength);        //首先寫出數據的長度
          out.write(data, 0, dataLength); //向服務端寫數據
          out.flush();
        }
      } catch(IOException e) {
        markClosed(e);
      } finally {
        IOUtils.closeStream(d);
      }
    }

Question3:客戶端是怎樣獲取服務端的返回數據的?

Client.Connection類和Client.Call類中的相關方法以下

Method1:

public void run() {
      ……
      while (waitForWork()) {
        receiveResponse();  //具體的處理方法
      }
      close();
     ……
}

Method2:

private void receiveResponse() {
      if (shouldCloseConnection.get()) {
        return;
      }
      touch();
      try {
        int id = in.readInt();                    // 阻塞讀取id
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + id);
          Call call = calls.get(id);    //在calls池中找到發送時的那個對象
        int state = in.readInt();     // 阻塞讀取call對象的狀態
        if (state == Status.SUCCESS.state) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);           // 讀取數據
        //將讀取到的值賦給call對象,同時喚醒Client等待線程,貼出setValue()代碼Method3
          call.setValue(value);              
          calls.remove(id);               //刪除已處理的call 
        } else if (state == Status.ERROR.state) {
        ……
        } else if (state == Status.FATAL.state) {
        ……
        }
      } catch (IOException e) {
        markClosed(e);
      }
}

Method3:

public synchronized void setValue(Writable value) {
      this.value = value;
      callComplete();   //具體實現
}
protected synchronized void callComplete() {
      this.done = true;
      notify();         // 喚醒client等待線程
    }

啓動一個處理線程,讀取從服務端傳來的call對象,將call對象讀取完畢後,喚醒client處理線程。就這麼簡單,客戶端就獲取了服務端返回的數據。客戶端的源碼分析暫時到這,下面咱們來分析Server端的源碼

ipc.Server源碼分析

內部類以下

  • Call :用於存儲客戶端發來的請求

  • Listener : 監聽類,用於監聽客戶端發來的請求,同時Listener內部還有一個靜態類,Listener.Reader,當監聽器監聽到用戶請求,便讓Reader讀取用戶請求。

  • Responder :響應RPC請求類,請求處理完畢,由Responder發送給請求客戶端。

  • Connection :鏈接類,真正的客戶端請求讀取邏輯在這個類中。
    Handler :請求處理類,會循環阻塞讀取callQueue中的call對象,並對其進行操做。

初始化Server

hadoop是怎樣初始化RPC的Server端的呢?

Namenode初始化時必定初始化了RPC的Sever端,那咱們去看看Namenode的初始化源碼

private void initialize(Configuration conf) throws IOException {
   ……
    // 建立 rpc server
    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
    if (dnSocketAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      //得到serviceRpcServer
      this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), 
          dnSocketAddr.getPort(), serviceHandlerCount,
          false, conf, namesystem.getDelegationTokenSecretManager());
      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      setRpcServiceServerAddress(conf);
}
//得到server
    this.server = RPC.getServer(this, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());

   ……
    this.server.start();  //啓動 RPC server Clients只容許鏈接該server
    if (serviceRpcServer != null) {
      serviceRpcServer.start();  //啓動 RPC serviceRpcServer 爲HDFS服務的server
    }
    startTrashEmptier(conf);
  }

RPC的server對象是經過ipc.RPC類的getServer()方法得到的。ipc.RPC類中的getServer()源碼以下

public static Server getServer(final Object instance, final String bindAddress, final int port,
                               final int numHandlers,
                               final boolean verbose, Configuration conf,
                               SecretManager <? extends TokenIdentifier > secretManager)
throws IOException
{
    return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}

getServer()是一個建立Server對象的工廠方法,但建立的倒是RPC.Server類的對象。

運行Server

初始化Server後,Server端就運行起來了,看看ipc.Server的start()源碼

/** 啓動服務 */
public synchronized void start()
{
    responder.start();  //啓動responder
    listener.start();   //啓動listener
    handlers = new Handler[handlerCount];

    for (int i = 0; i < handlerCount; i++)
    {
        handlers[i] = new Handler(i);
        handlers[i].start();   //逐個啓動Handler
    }
}

Server處理請求

  • 分析源碼得知,Server端採用Listener監聽客戶端的鏈接,下面先分析一下Listener的構造函數
public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // 建立ServerSocketChannel,並設置成非阻塞式
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // 將server socket綁定到本地端口
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); 
      // 得到一個selector
      selector= Selector.open();
      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      //啓動多個reader線程,爲了防止請求多時服務端響應延時的問題
      for (int i = 0; i < readThreads; i++) {       
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        readPool.execute(reader);
      }
      // 註冊鏈接事件
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

在啓動Listener線程時,服務端會一直等待客戶端的鏈接,下面貼出Server.Listener類的run()方法

public void run()
{
    ……
    while (running)
    {
        SelectionKey key = null;
        try
        {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext())
            {
                key = iter.next();
                iter.remove();
                try
                {
                    if (key.isValid())
                    {
                        if (key.isAcceptable())
                            doAccept(key);     //具體的鏈接方法
                    }
                }
                catch (IOException e)
                {
                }
                key = null;
            }
        }
        catch (OutOfMemoryError e)
        {
            ……
        }

Server.Listener類中doAccept ()方法中的關鍵源碼以下:

void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) { //創建鏈接
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        Reader reader = getReader();  //從readers池中得到一個reader
        try {
          reader.startAdd(); // 激活readSelector,設置adding爲true
          SelectionKey readKey = reader.registerChannel(channel);//將讀事件設置成興趣事件
          c = new Connection(readKey, channel, System.currentTimeMillis());//建立一個鏈接對象
          readKey.attach(c);   //將connection對象注入readKey
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
        …… 
        } finally {
//設置adding爲false,採用notify()喚醒一個reader,其實代碼十三中啓動的每一個reader都使
//用了wait()方法等待。因篇幅有限,就不貼出源碼了。
          reader.finishAdd();
        }
      }
    }

當reader被喚醒,reader接着執行doRead()方法。

  • 接收請求
    Server.Listener.Reader類中的doRead()方法和Server.Connection類中的readAndProcess()方法源碼以下:

Method1:

void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();  //得到connection對象
      if (c == null) {
        return;  
      }
      c.setLastContact(System.currentTimeMillis());
      try {
        count = c.readAndProcess();    // 接受並處理請求 
      } catch (InterruptedException ieo) {
       ……
      }
     ……    
}

Method2:

public int readAndProcess() throws IOException, InterruptedException {
      while (true) {
        ……
        if (!rpcHeaderRead) {
          if (rpcHeaderBuffer == null) {
            rpcHeaderBuffer = ByteBuffer.allocate(2);
          }
         //讀取請求頭
          count = channelRead(channel, rpcHeaderBuffer);
          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
            return count;
          }
        // 讀取請求版本號 
          int version = rpcHeaderBuffer.get(0);
          byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
        ……  

          data = ByteBuffer.allocate(dataLength);
        }
        // 讀取請求 
        count = channelRead(channel, data);

        if (data.remaining() == 0) {
         ……
          if (useSasl) {
         ……
          } else {
            processOneRpc(data.array());//處理請求
          }
        ……
          }
        } 
        return count;
      }
    }
  • 得到call對象

Method1:

private void processOneRpc(byte[] buf) throws IOException,
        InterruptedException {
      if (headerRead) {
        processData(buf);
      } else {
        processHeader(buf);
        headerRead = true;
        if (!authorizeConnection()) {
          throw new AccessControlException("Connection from " + this
              + " for protocol " + header.getProtocol()
              + " is unauthorized for user " + user);
        }
      }
}

Method2:

private void processData(byte[] buf) throws  IOException, InterruptedException {
      DataInputStream dis =
        new DataInputStream(new ByteArrayInputStream(buf));
      int id = dis.readInt();      // 嘗試讀取id
      Writable param = ReflectionUtils.newInstance(paramClass, conf);//讀取參數
      param.readFields(dis);        

      Call call = new Call(id, param, this);  //封裝成call
      callQueue.put(call);   // 將call存入callQueue
      incRpcCount();  // 增長rpc請求的計數
    }
  • 處理call對象
    對call對象的處理是Server類中的Handler內部類來處理的。Server.Handler類中run()方法中的關鍵代碼以下:
while (running) {
        try {
          final Call call = callQueue.take(); //彈出call,可能會阻塞
          ……
          //調用ipc.Server類中的call()方法,但該call()方法是抽象方法,具體實如今RPC.Server類中
          value = call(call.connection.protocol, call.param, call.timestamp);
          synchronized (call.connection.responseQueue) {
            setupResponse(buf, call, 
                        (error == null) ? Status.SUCCESS : Status.ERROR, 
                        value, errorClass, error);
             ……
            //給客戶端響應請求
            responder.doRespond(call);
          }
  }
  • 返回請求
    Server.Responder類中的doRespond()方法源碼以下:
void doRespond(Call call) throws IOException
{
    synchronized (call.connection.responseQueue)
    {
        call.connection.responseQueue.addLast(call);
        if (call.connection.responseQueue.size() == 1)
        {
            // 返回響應結果,並激活writeSelector
            processResponse(call.connection.responseQueue, true);
        }
    }
}
相關文章
相關標籤/搜索