Hadoop源碼學習筆記(4) java
——Socket到RPC調用 apache
Hadoop是一個分佈式程序,分佈在多臺機器上運行,事必會涉及到網絡編程。那這裏如何讓網絡編程變得簡單、透明的呢? 編程
網絡編程中,首先咱們要學的就是Socket編程,這是網絡編程中最底層的程序接口,分爲服務器端和客戶端,服務器負責監聽某個端口,客戶端負責鏈接服務器上的某個端口,一旦鏈接經過後,服務器和客戶端就能夠雙向通信了,咱們看下示例代碼: 服務器
-
ServerSocket server = new ServerSocket(8111);
-
Socket socket = server.accept();
-
-
//由Socket對象獲得輸入流,並構造相應的BufferedReader對象
-
BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-
//由Socket對象獲得輸出流,並構造PrintWriter對象
-
PrintWriter os = new PrintWriter(socket.getOutputStream());
-
-
while(true){
-
String inline = is.readLine();
-
System.out.println(" 收到信息:" + inline);
-
//服務器反回
-
os.println("serverSend:" + inline);
-
os.flush();
-
if (inline == "bye")
-
break;
-
}
-
os.close();
-
is.close();
-
socket.close();
-
server.close();
-
System.out.println("服務器退出");
-
Socket socket = new Socket("127.0.0.1",8111);
-
-
//由Socket對象獲得輸入流,並構造相應的BufferedReader對象
-
BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-
//由Socket對象獲得輸出流,並構造PrintWriter對象
-
PrintWriter os = new PrintWriter(socket.getOutputStream());
-
BufferedReader sin=new BufferedReader(new InputStreamReader(System.in));
-
while(true){
-
System.out.println("請輸入:");
-
String line = sin.readLine();
-
os.println(line);
-
os.flush();
-
String inline = is.readLine();
-
System.out.println("服務器獲取值:" + inline);
-
if (line=="bye")
-
break;
-
}
-
os.close();
-
is.close();
-
socket.close();
-
System.out.println("客戶端退出");
這兩段代碼分別帖入兩個類中,分開執行,先執行服務器端,再執行客戶端,就能夠互發消息了。 網絡
觀察下代碼,發現代碼中下面4~20行邏輯是一至的,都是經過流來通信,因此Socket中不一樣的是開始地方,服務器是經過server.accept()來獲取Socket,而客戶端是經過直接建立Socket對象的。 多線程
這段代碼,其本運行是沒問題的,但存在一個問題,就是當客戶端接入時服務器端的accept函數才走下去,否則的話,會一直處於卡死等待狀態。包括getInputStream函數,也會等待雙方接通後,才往下走。除非等到客戶端接入,或中斷。固然有人會說,能夠引入多線程啊,沒錯,是能夠,可是想一下,是否是每一個客戶接入都得有一個線程? 不然少一個線程,就會有一堆的卡着。因此這種方式不適合在大最客戶端接入的狀況。 併發
在JDK1.4引入了非阻塞的通訊方式,這樣使得服務器端只須要一個線程就能處理全部客戶端socket的請求。 socket
下面是幾個須要用到的核心類: 分佈式
-
ServerSocketChannel: ServerSocket 的替代類, 支持阻塞通訊與非阻塞通訊.
-
SocketChannel: Socket 的替代類, 支持阻塞通訊與非阻塞通訊.
-
Selector: 爲ServerSocketChannel 監控接收客戶端鏈接就緒事件, 爲 SocketChannel 監控鏈接服務器就緒, 讀就緒和寫就緒事件.
-
SelectionKey: 表明 ServerSocketChannel 及 SocketChannel 向 Selector 註冊事件的句柄. 當一個 SelectionKey 對象位於Selector 對象的 selected-keys 集合中時, 就表示與這個 SelectionKey 對象相關的事件發生了.在SelectionKey 類中有幾個靜態常量
-
SelectionKey.OP_ACCEPT->客戶端鏈接就緒事件 等於監聽serversocket.accept()返回一個socket
-
SelectionKey.OP_CONNECT->準備鏈接服務器就緒跟上面相似,只不過是對於socket的至關於監聽了socket.connect()
-
SelectionKey.OP_READ->讀就緒事件, 表示輸入流中已經有了可讀數據, 能夠執行讀操做了
-
SelectionKey.OP_WRITE->寫就緒事件
因此服務器端代碼就能夠升一下級了,變成以下: ide
-
public class SocketChannelTest implements Runnable {
-
-
@Override
-
public void run() {
-
while (true) {
-
try {
-
selector.select();
-
Set<SelectionKey> keys = selector.selectedKeys();
-
Iterator<SelectionKey> iter = keys.iterator();
-
SocketChannel sc;
-
while (iter.hasNext()) {
-
SelectionKey key = iter.next();
-
if (key.isAcceptable())
-
; // 新的鏈接
-
else if (key.isReadable())
-
;// 可讀
-
iter.remove(); // 處理完事件的要從keys中刪去
-
}
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
static Selector selector;
-
-
public static void main(String[] args) throws IOException,
-
InterruptedException {
-
selector = Selector.open(); // 靜態方法 實例化selector
-
ServerSocketChannel serverChannel = ServerSocketChannel.open();
-
serverChannel.configureBlocking(false); // 設置爲非阻塞方式,若是爲true 那麼就爲傳統的阻塞方式
-
serverChannel.socket().bind(new InetSocketAddress(8001)); // 綁定IP 及 端口
-
serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 註冊
-
// OP_ACCEPT事件
-
Thread thd = new Thread(new SocketChannelTest());
-
thd.start();// 開啓線程 處理請求
-
thd.join();
-
}
-
}
好,這樣通信代碼簡化了。但繼續想,咱們通信的目的是什麼?客戶端發一個指令,服務器執行一些內容,而後把結果返回給客戶端。這不就像調用一下函數麼,調用函數名、傳入參數、返回值。
這個就稱之爲遠程方法調用(RPC Remote Procedure Call Protocol),毫無疑問,這個RPC實現確定是基於上面的這個Socket的。至於具體如何實現呢,咱們看下面的分解。
在看實現以前,咱們先看一下,這個RPC是如何用的,如何作到調用透明的:
咱們在src下新建一個RPCTest的包,定義一個功能接口IRPCTestEntity.java:
-
package RPCTest;
-
import org.apache.hadoop.ipc.VersionedProtocol;
-
public interface IRPCTestEntity extends VersionedProtocol {
-
int Calc(int x,int y);
-
}
該接口中有一個Calc的函數。
定義一個實現類RPCTestEntity.java:
-
package RPCTest;
-
import java.io.IOException;
-
public class RPCTestEntity implements IRPCTestEntity{
-
@Override
-
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
-
return 0;
-
}
-
-
public int Calc(int x,int y){
-
int z =0 ;
-
z = x + y;
-
return z;
-
}
-
-
}
這個類中實現了Calc函數,執行內容爲將x,y相加,將和返回。
咱們再定義一個服務器類(RPCTestSvr.java),將該實現類註冊成RPC服務:
-
package RPCTest;
-
import java.io.IOException;
-
-
public class RPCTestSvr {
-
public static void main(String[] args) throws IOException, InterruptedException {
-
RPCTestEntity obj = new RPCTestEntity();
-
Configuration conf = new Configuration();
-
Server server = RPC.getServer(obj, "", 9001, conf);
-
server.start();
-
server.join();
-
}
-
}
代碼比較簡單,定義了一個RPCTestEntity的實體,而後RPC建立一個Server,傳入實體對象,而後這個服務就調用join卡住,用於不斷接收請求。 建立完後,就可把這個"服務器"啓動起來了。
再建立一個客戶端(RPCTestClient.java):
-
package RPCTest;
-
-
import java.io.IOException;
-
import java.net.InetSocketAddress;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.ipc.RPC;
-
import org.apache.hadoop.ipc.VersionedProtocol;
-
-
public class RPCTestClient {
-
public static void main(String[] args) throws IOException {
-
InetSocketAddress addr = new InetSocketAddress("127.0.0.1",9001);
-
Configuration conf = new Configuration();
-
VersionedProtocol obj = RPC.getProxy(IRPCTestEntity.class, 0, addr, conf);
-
IRPCTestEntity ent = (IRPCTestEntity)obj;
-
int x = ent.Calc(5, 6);
-
System.out.println(x);
-
}
-
}
這裏,咱們經過RPC.getProxy函數獲了一個IRPCTestEntity的接口實例,而後就能夠直接調用了。
運行後,發現這個值立刻返回了過來,同時在"服務器"端也會收到必定的請求信息。說明二者之間通了。
仔細看,這個客戶端中,整個過程就沒有涉及到RPCTestEntity這個實現的實體,換句話說,客戶端產生的是一個虛擬的實現類,而後調用起來了。
OK,示例程序跑起來了,也帶給咱們幾個問題,一、這個客戶端中的obj是什麼對象?二、爲何咱們調用obj對象中的函數(Calc)會跑到服務器上運行,如何實現的?
底層的通信,咱們是知道的,確定用socket,用它可以傳遞各類數據。如何與函數關聯呢? 咱們進入getProxy函數,
咱們看到這個getProxy函數中,返回了VersionedProtocol接口的對象,從字面意思,這個Proxy意爲代理, 因此咱們獲得的obj就是一個代理類。同時也看出,要做爲RPC處理對象,這個接口必實現VersionedProtocol(簡單地看下里面,只有一個函數,返回版本號,是用於判斷雙方版本所用,只有版本匹配,才能調用)。
其建立能夠看到,用到了:
Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(addr, ticket, conf, factory));
而後這個代理類,就自動實現了偉放的protocol這個接口類型。而後當咱們調用代理類中的函數時,這個傳入的Invoker類,就會收到通知,通知裏包含了調用信息,咱們進入Invoker中看一下:
private static class Invoker implements InvocationHandler
這是一個寫在RPC類中的內部類,且是私有的,意思就是隻爲這個RPC調用,其實現的規定接口InvocationHandler,那麼就要實現規定的函數Invoke咯:
-
public Object invoke(Object proxy, Method method, Object[] args)
-
throws Throwable {
-
final boolean logDebug = LOG.isDebugEnabled();
-
long startTime = 0;
-
if (logDebug) {
-
startTime = System.currentTimeMillis();
-
}
-
-
ObjectWritable value = (ObjectWritable)
-
client.call(new Invocation(method, args), address,
-
method.getDeclaringClass(), ticket);
-
if (logDebug) {
-
long callTime = System.currentTimeMillis() - startTime;
-
LOG.debug("Call: " + method.getName() + "" + callTime);
-
}
-
return value.get();
-
}
這個invoke函數,就是當咱們調用代理類中的函數(obj.Calc)時,會收到的請求,看下參數,傳入的有,Method(函數),args(參數),包羅萬象,有了這些內容後,就能夠調用底層的Socket,將這些信息打包起來(放入的Invocation類)中,一併發向服務器中。
同時,服務器端中,就比較容易了,在收到請求後,就能夠解析出要調用的函數和參數,而後經過反射來調用在服務器一開始註冊上的對象中的函數,再將返回值經過Socket傳回客戶端,再由這個invoke函數將值返回。
OK,這個幾個點想通了,整個過程就容易理解了。總之:
服務器端——註冊服務:RPC.getServer(obj, "", 9001, conf);
客戶端——取得代理類:obj = RPC.getProxy()
經過這樣的包裝後,網絡訪問就很是透明瞭。
但這裏,仍是有不少深層次的問題,好比服務端對象生命狀態如何管理,多個客戶端併發怎麼處理,傳送數據中流怎麼辦? 這一系列的問題,就得深刻看一下這個Server Client RPC這幾個類才能找到答案了。 深刻問題先留着,後面再來看, 目前先把它的架子搞清楚。