Hadoop源碼學習筆記(4) ——Socket到RPC調用

Hadoop源碼學習筆記(4) java

——Socket到RPC調用 apache

Hadoop是一個分佈式程序,分佈在多臺機器上運行,事必會涉及到網絡編程。那這裏如何讓網絡編程變得簡單、透明的呢? 編程

網絡編程中,首先咱們要學的就是Socket編程,這是網絡編程中最底層的程序接口,分爲服務器端和客戶端,服務器負責監聽某個端口,客戶端負責鏈接服務器上的某個端口,一旦鏈接經過後,服務器和客戶端就能夠雙向通信了,咱們看下示例代碼: 服務器

  1. ServerSocket server = new ServerSocket(8111);
  2. Socket socket = server.accept();
  3.  
  4. //由Socket對象獲得輸入流,並構造相應的BufferedReader對象
  5. BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  6. //由Socket對象獲得輸出流,並構造PrintWriter對象
  7. PrintWriter os = new PrintWriter(socket.getOutputStream());
  8.  
  9. while(true){
  10.    String inline = is.readLine();
  11.    System.out.println(" 收到信息:" + inline);
  12.    //服務器反回
  13.    os.println("serverSend:" + inline);
  14.    os.flush();
  15.    if (inline == "bye")
  16. break;
  17. }
  18. os.close();
  19. is.close();
  20. socket.close();
  21. server.close();
  22. System.out.println("服務器退出");

 

  1. Socket socket = new Socket("127.0.0.1",8111);
  2.  
  3. //由Socket對象獲得輸入流,並構造相應的BufferedReader對象
  4. BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  5. //由Socket對象獲得輸出流,並構造PrintWriter對象
  6. PrintWriter os = new PrintWriter(socket.getOutputStream());
  7. BufferedReader sin=new BufferedReader(new InputStreamReader(System.in));
  8. while(true){
  9.    System.out.println("請輸入:");
  10.    String line = sin.readLine();
  11.    os.println(line);
  12.    os.flush();
  13.    String inline = is.readLine();
  14.    System.out.println("服務器獲取值:" + inline);
  15.    if (line=="bye")
  16.       break;
  17. }
  18. os.close();
  19. is.close();
  20. socket.close();
  21. System.out.println("客戶端退出");

 

這兩段代碼分別帖入兩個類中,分開執行,先執行服務器端,再執行客戶端,就能夠互發消息了。 網絡

觀察下代碼,發現代碼中下面4~20行邏輯是一至的,都是經過流來通信,因此Socket中不一樣的是開始地方,服務器是經過server.accept()來獲取Socket,而客戶端是經過直接建立Socket對象的。 多線程

這段代碼,其本運行是沒問題的,但存在一個問題,就是當客戶端接入時服務器端的accept函數才走下去,否則的話,會一直處於卡死等待狀態。包括getInputStream函數,也會等待雙方接通後,才往下走。除非等到客戶端接入,或中斷。固然有人會說,能夠引入多線程啊,沒錯,是能夠,可是想一下,是否是每一個客戶接入都得有一個線程? 不然少一個線程,就會有一堆的卡着。因此這種方式不適合在大最客戶端接入的狀況。 併發

 

在JDK1.4引入了非阻塞的通訊方式,這樣使得服務器端只須要一個線程就能處理全部客戶端socket的請求。 socket

下面是幾個須要用到的核心類: 分佈式

  • ServerSocketChannel: ServerSocket 的替代類, 支持阻塞通訊與非阻塞通訊.
  • SocketChannel: Socket 的替代類, 支持阻塞通訊與非阻塞通訊.
  • Selector: 爲ServerSocketChannel 監控接收客戶端鏈接就緒事件, 爲 SocketChannel 監控鏈接服務器就緒, 讀就緒和寫就緒事件.
  • SelectionKey: 表明 ServerSocketChannel 及 SocketChannel 向 Selector 註冊事件的句柄. 當一個 SelectionKey 對象位於Selector 對象的 selected-keys 集合中時, 就表示與這個 SelectionKey 對象相關的事件發生了.在SelectionKey 類中有幾個靜態常量
    • SelectionKey.OP_ACCEPT->客戶端鏈接就緒事件 等於監聽serversocket.accept()返回一個socket
    • SelectionKey.OP_CONNECT->準備鏈接服務器就緒跟上面相似,只不過是對於socket的至關於監聽了socket.connect()
    • SelectionKey.OP_READ->讀就緒事件, 表示輸入流中已經有了可讀數據, 能夠執行讀操做了
    • SelectionKey.OP_WRITE->寫就緒事件

 

因此服務器端代碼就能夠升一下級了,變成以下: ide

  1. public class SocketChannelTest implements Runnable {
  2.  
  3.    @Override
  4.    public void run() {
  5.       while (true) {
  6.          try {
  7.             selector.select();
  8.             Set<SelectionKey> keys = selector.selectedKeys();
  9.             Iterator<SelectionKey> iter = keys.iterator();
  10.             SocketChannel sc;
  11.             while (iter.hasNext()) {
  12.                SelectionKey key = iter.next();
  13.                if (key.isAcceptable())
  14.                   ; // 新的鏈接
  15.                else if (key.isReadable())
  16.                   ;// 可讀
  17.                iter.remove(); // 處理完事件的要從keys中刪去
  18.             }
  19.          } catch (Exception e) {
  20.             e.printStackTrace();
  21.          }
  22.       }
  23.    }
  24.    static Selector selector;
  25.  
  26.    public static void main(String[] args) throws IOException,
  27.          InterruptedException {
  28.       selector = Selector.open(); // 靜態方法 實例化selector
  29.       ServerSocketChannel serverChannel = ServerSocketChannel.open();
  30.       serverChannel.configureBlocking(false); // 設置爲非阻塞方式,若是爲true 那麼就爲傳統的阻塞方式
  31.       serverChannel.socket().bind(new InetSocketAddress(8001)); // 綁定IP 及 端口
  32.       serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 註冊
  33.                                                    // OP_ACCEPT事件
  34.       Thread thd = new Thread(new SocketChannelTest());
  35.       thd.start();// 開啓線程 處理請求
  36.       thd.join();
  37.    }
  38. }

 

好,這樣通信代碼簡化了。但繼續想,咱們通信的目的是什麼?客戶端發一個指令,服務器執行一些內容,而後把結果返回給客戶端。這不就像調用一下函數麼,調用函數名、傳入參數、返回值。

這個就稱之爲遠程方法調用(RPC Remote Procedure Call Protocol),毫無疑問,這個RPC實現確定是基於上面的這個Socket的。至於具體如何實現呢,咱們看下面的分解。

在看實現以前,咱們先看一下,這個RPC是如何用的,如何作到調用透明的:

咱們在src下新建一個RPCTest的包,定義一個功能接口IRPCTestEntity.java:

  1. package RPCTest;
  2. import org.apache.hadoop.ipc.VersionedProtocol;
  3. public interface IRPCTestEntity  extends VersionedProtocol {
  4.     int Calc(int x,int y);
  5. }

該接口中有一個Calc的函數。

定義一個實現類RPCTestEntity.java:

  1. package RPCTest;
  2. import java.io.IOException;
  3. public class RPCTestEntity implements IRPCTestEntity{
  4.    @Override
  5.    public long getProtocolVersion(String protocol, long clientVersion)  throws IOException {
  6.       return 0;
  7.    }
  8.  
  9.    public int Calc(int x,int y){
  10.       int z =0 ;
  11.       z = x + y;
  12.       return z;
  13.    }
  14.  
  15. }

這個類中實現了Calc函數,執行內容爲將x,y相加,將和返回。

咱們再定義一個服務器類(RPCTestSvr.java),將該實現類註冊成RPC服務:

  1. package RPCTest;
  2. import java.io.IOException;
  3.  
  4. public class RPCTestSvr {
  5.    public static void main(String[] args) throws IOException, InterruptedException {
  6.       RPCTestEntity obj = new RPCTestEntity();
  7.       Configuration conf = new Configuration();
  8.       Server server = RPC.getServer(obj, "", 9001, conf);
  9.       server.start();
  10.       server.join();
  11.    }
  12. }

代碼比較簡單,定義了一個RPCTestEntity的實體,而後RPC建立一個Server,傳入實體對象,而後這個服務就調用join卡住,用於不斷接收請求。 建立完後,就可把這個"服務器"啓動起來了。

再建立一個客戶端(RPCTestClient.java):

  1. package RPCTest;
  2.  
  3. import java.io.IOException;
  4. import java.net.InetSocketAddress;
  5.  
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.ipc.RPC;
  8. import org.apache.hadoop.ipc.VersionedProtocol;
  9.  
  10. public class RPCTestClient {
  11.    public static void main(String[] args) throws IOException {
  12.       InetSocketAddress addr = new InetSocketAddress("127.0.0.1",9001);
  13.       Configuration conf = new Configuration();
  14.       VersionedProtocol obj = RPC.getProxy(IRPCTestEntity.class, 0, addr, conf);
  15.       IRPCTestEntity ent = (IRPCTestEntity)obj;
  16.       int x = ent.Calc(5, 6);
  17.       System.out.println(x);
  18.    }
  19. }

這裏,咱們經過RPC.getProxy函數獲了一個IRPCTestEntity的接口實例,而後就能夠直接調用了。

運行後,發現這個值立刻返回了過來,同時在"服務器"端也會收到必定的請求信息。說明二者之間通了。

仔細看,這個客戶端中,整個過程就沒有涉及到RPCTestEntity這個實現的實體,換句話說,客戶端產生的是一個虛擬的實現類,而後調用起來了。

 

OK,示例程序跑起來了,也帶給咱們幾個問題,一、這個客戶端中的obj是什麼對象?二、爲何咱們調用obj對象中的函數(Calc)會跑到服務器上運行,如何實現的?

 

底層的通信,咱們是知道的,確定用socket,用它可以傳遞各類數據。如何與函數關聯呢? 咱們進入getProxy函數,

咱們看到這個getProxy函數中,返回了VersionedProtocol接口的對象,從字面意思,這個Proxy意爲代理, 因此咱們獲得的obj就是一個代理類。同時也看出,要做爲RPC處理對象,這個接口必實現VersionedProtocol(簡單地看下里面,只有一個函數,返回版本號,是用於判斷雙方版本所用,只有版本匹配,才能調用)。

其建立能夠看到,用到了:

Proxy.newProxyInstance(

protocol.getClassLoader(), new Class[] { protocol },

new Invoker(addr, ticket, conf, factory));

而後這個代理類,就自動實現了偉放的protocol這個接口類型。而後當咱們調用代理類中的函數時,這個傳入的Invoker類,就會收到通知,通知裏包含了調用信息,咱們進入Invoker中看一下:

private static class Invoker implements InvocationHandler

這是一個寫在RPC類中的內部類,且是私有的,意思就是隻爲這個RPC調用,其實現的規定接口InvocationHandler,那麼就要實現規定的函數Invoke咯:

  1. public Object invoke(Object proxy, Method method, Object[] args)
  2.      throws Throwable {
  3.      final boolean logDebug = LOG.isDebugEnabled();
  4.      long startTime = 0;
  5.      if (logDebug) {
  6.        startTime = System.currentTimeMillis();
  7.      }
  8.  
  9.      ObjectWritable value = (ObjectWritable)
  10.        client.call(new Invocation(method, args), address,
  11.                    method.getDeclaringClass(), ticket);
  12.      if (logDebug) {
  13.        long callTime = System.currentTimeMillis() - startTime;
  14.        LOG.debug("Call: " + method.getName() + "" + callTime);
  15.      }
  16.      return value.get();
  17.    }

這個invoke函數,就是當咱們調用代理類中的函數(obj.Calc)時,會收到的請求,看下參數,傳入的有,Method(函數),args(參數),包羅萬象,有了這些內容後,就能夠調用底層的Socket,將這些信息打包起來(放入的Invocation類)中,一併發向服務器中。

同時,服務器端中,就比較容易了,在收到請求後,就能夠解析出要調用的函數和參數,而後經過反射來調用在服務器一開始註冊上的對象中的函數,再將返回值經過Socket傳回客戶端,再由這個invoke函數將值返回。

 

OK,這個幾個點想通了,整個過程就容易理解了。總之:

服務器端——註冊服務:RPC.getServer(obj, "", 9001, conf);

客戶端——取得代理類:obj = RPC.getProxy()

經過這樣的包裝後,網絡訪問就很是透明瞭。

 

但這裏,仍是有不少深層次的問題,好比服務端對象生命狀態如何管理,多個客戶端併發怎麼處理,傳送數據中流怎麼辦? 這一系列的問題,就得深刻看一下這個Server Client RPC這幾個類才能找到答案了。 深刻問題先留着,後面再來看, 目前先把它的架子搞清楚。

相關文章
相關標籤/搜索