必備技術點:
1. 動態代理(參考 :http://weixiaolu.iteye.com/blog/1477774 )
2. Java NIO(參考 :http://weixiaolu.iteye.com/blog/1479656 )
3. Java網絡編程
目錄:
一.RPC協議
二.ipc.RPC源碼分析
三.ipc.Client源碼分析
四.ipc.Server源碼分析
分析:
一.RPC協議
在分析協議以前,我以爲咱們頗有必要先搞清楚協議是什麼。下面我就談一點本身的認識吧。若是你學過java的網絡編程,你必定知道:當客戶端發送一個字節給服務端時,服務端必須也要有一個讀字節的方法在阻塞等待;反之亦然。 這種我把它稱爲底層的通訊協議。但是對於一個大型的網絡通訊系統來講,很顯然這種說法的協議粒度過小,不方便咱們理解整個網絡通訊的流程及架構,因此我造了個說法:架構層次的協議。通俗一點說,就是我把某些接口和接口中的方法稱爲協議,客戶端和服務端只要實現這些接口中的方法就能夠進行通訊了,從這個角度來講,架構層次協議的說法就能夠成立了(注:若是從架構層次的協議來分析系統,咱們就先不要太在乎方法的具體實現,呵呵,我相信你懂得~)。
Hadoop的RPC機制正是採用了這種「架構層次的協議」,有一整套做爲協議的接口。如圖:java
下面就幾個重點的協議介紹一下吧:node
VersionedProtocol :它是全部RPC協議接口的父接口,其中只有一個方法:getProtocolVersion()
(1)HDFS相關
ClientDatanodeProtocol :一個客戶端和datanode之間的協議接口,用於數據塊恢復
ClientProtocol :client與Namenode交互的接口,全部控制流的請求均在這裏,如:建立文件、刪除文件等;
DatanodeProtocol : Datanode與Namenode交互的接口,如心跳、blockreport等;
NamenodeProtocol :SecondaryNode與Namenode交互的接口。
(2)Mapreduce相關
InterDatanodeProtocol :Datanode內部交互的接口,用來更新block的元數據;
InnerTrackerProtocol :TaskTracker與JobTracker交互的接口,功能與DatanodeProtocol類似;
JobSubmissionProtocol :JobClient與JobTracker交互的接口,用來提交Job、得到Job等與Job相關的操做;
TaskUmbilicalProtocol :Task中子進程與母進程交互的接口,子進程即map、reduce等操做,母進程即TaskTracker,該接口能夠回報子進程的運行狀態(詞彙掃盲: umbilical 臍帶的, 關係親密的) 。編程
一會兒羅列了這麼多的協議,有些人可能要問了,hadoop是怎麼使用它們的呢?呵呵,不要着急哦,其實本篇博客所分析的是hadoop的RPC機制底層的具體實現,而這些協議倒是應用層上的東西,好比hadoop是怎麼樣保持「心跳」的啊。因此在個人下一篇博客:源碼級分析hadoop的心跳機制中會詳細說明以上協議是怎樣被使用的。盡請期待哦~。如今就開始咱們的RPC源碼之旅吧•••
二.ipc.RPC源碼分析
ipc.RPC類中有一些內部類,爲了你們對RPC類有個初步的印象,就先羅列幾個咱們感興趣的分析一下吧:緩存
Invocation :用於封裝方法名和參數,做爲數據傳輸層,至關於VO吧。
ClientCache :用於存儲client對象,用socket factory做爲hash key,存儲結構爲hashMap <SocketFactory, Client>。
Invoker :是動態代理中的調用實現類,繼承了InvocationHandler.
Server :是ipc.Server的實現類。網絡
從以上的分析能夠知道,Invocation類僅做爲VO,ClientCache類只是做爲緩存,而Server類用於服務端的處理,他們都和客戶端的數據流和業務邏輯沒有關係。如今就只剩下Invoker類了。若是你對動態代理(參考:http://weixiaolu.iteye.com/blog/1477774 )比較瞭解的話,你一下就會想到,咱們接下來去研究的就是RPC.Invoker類中的invoke()方法了。代碼以下:
代碼一:架構
public Object invoke(Object proxy, Method method, Object[] args) tcp
throws Throwable { 函數
••• oop
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
•••
return value.get();
}
呵呵,若是你發現這個invoke()方法實現的有些奇怪的話,那你就對了。通常咱們看到的動態代理的invoke()方法中總會有 method.invoke(ac, arg); 這句代碼。而上面代碼中卻沒有,這是爲何呢?其實使用 method.invoke(ac, arg); 是在本地JVM中調用;而在hadoop中,是將數據發送給服務端,服務端將處理的結果再返回給客戶端,因此這裏的invoke()方法必然須要進行網絡通訊。而網絡通訊就是下面的這段代碼實現的:
代碼二:
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
Invocation類在這裏封裝了方法名和參數,充當VO。其實這裏網絡通訊只是調用了Client類的call()方法。那咱們接下來分析一下ipc.Client源碼吧。不過在分析ipc.Client源碼以前,爲了避免讓咱們像盲目的蒼蠅同樣亂撞,我想先肯定一下咱們分析的目的是什麼,我總結出了三點須要解決的問題:
1. 客戶端和服務端的鏈接是怎樣創建的?
2. 客戶端是怎樣給服務端發送數據的?
3. 客戶端是怎樣獲取服務端的返回數據的?
基於以上三個問題,咱們開始吧!!!
三.ipc.Client源碼分析
一樣,爲了對Client類有個初步的瞭解,咱們也先羅列幾個咱們感興趣的內部類:
Call :用於封裝Invocation對象,做爲VO,寫到服務端,同時也用於存儲從服務端返回的數據
Connection :用以處理遠程鏈接對象。繼承了Thread
ConnectionId :惟一肯定一個鏈接
問題1:客戶端和服務端的鏈接是怎樣創建的?
下面咱們來看看Client類中的cal()方法吧:
代碼三:
public Writable call(Writable param, ConnectionId remoteId)
throws InterruptedException, IOException {
Call call = new Call(param); //將傳入的數據封裝成call對象
Connection connection = getConnection(remoteId, call); //得到一個鏈接
connection.sendParam(call); // 向服務端發送call對象
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // 等待結果的返回,在Call類的callComplete()方法裏有notify()方法用於喚醒線程
} catch (InterruptedException ie) {
// 因中斷異常而終止,設置標誌interrupted爲true
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // 本地異常
throw wrapException(remoteId.getAddress(), call.error);
}
} else {
return call.value; //返回結果數據
}
}
}
具體代碼的做用我已作了註釋,因此這裏再也不贅述。但到目前爲止,你依然不知道RPC機制底層的網絡鏈接是怎麼創建的。呵呵,那咱們只好再去深究了,分析代碼後,咱們會發現和網絡通訊有關的代碼只會是下面的兩句了:
代碼四:
Connection connection = getConnection(remoteId, call); //得到一個鏈接
connection.sendParam(call); // 向服務端發送call對象
先看看是怎麼得到一個到服務端的鏈接吧,下面貼出ipc.Client類中的getConnection()方法。
代碼五:
private Connection getConnection(ConnectionId remoteId,
Call call)
throws IOException, InterruptedException {
if (!running.get()) {
// 若是client關閉了
throw new IOException("The client is stopped");
}
Connection connection;
//若是connections鏈接池中有對應的鏈接對象,就不需從新建立了;若是沒有就需從新建立一個鏈接對象。
//但請注意,該//鏈接對象只是存儲了remoteId的信息,其實還並無和服務端創建鏈接。
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call)); //將call對象放入對應鏈接中的calls池,就不貼出源碼了
//這句代碼纔是真正的完成了和服務端創建鏈接哦~
connection.setupIOstreams();
return connection;
}
若是你還有興趣繼續分析下去,那咱們就一探創建鏈接的過程吧,下面貼出Client.Connection類中的setupIOstreams()方法:
代碼六:
private synchronized void setupIOstreams() throws InterruptedException {
•••
try {
•••
while (true) {
setupConnection(); //創建鏈接
InputStream inStream = NetUtils.getInputStream(socket); //得到輸入流
OutputStream outStream = NetUtils.getOutputStream(socket); //得到輸出流
writeRpcHeader(outStream);
•••
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(inStream))); //將輸入流裝飾成DataInputStream
this.out = new DataOutputStream
(new BufferedOutputStream(outStream)); //將輸出流裝飾成DataOutputStream
writeHeader();
// 跟新活動時間
touch();
//當鏈接創建時,啓動接受線程等待服務端傳回數據,注意:Connection繼承了Tread
start();
return;
}
} catch (IOException e) {
markClosed(e);
close();
}
}
再有一步咱們就知道客戶端的鏈接是怎麼創建的啦,下面貼出Client.Connection類中的setupConnection()方法:
代碼七:
private synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;
while (true) {
try {
this.socket = socketFactory.createSocket(); //終於看到建立socket的方法了
this.socket.setTcpNoDelay(tcpNoDelay);
•••
// 設置鏈接超時爲20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
this.socket.setSoTimeout(pingInterval);
return;
} catch (SocketTimeoutException toe) {
/* 設置最多鏈接重試爲45次。
* 總共有20s*45 = 15 分鐘的重試時間。
*/
handleConnectionFailure(timeoutFailures++, 45, toe);
} catch (IOException ie) {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
}
終於,咱們知道了客戶端的鏈接是怎樣創建的了,其實就是建立一個普通的socket進行通訊。呵呵,那服務端是否是也是建立一個ServerSocket進行通訊的呢?呵呵,先不要急,到這裏咱們只解決了客戶端的第一個問題,下面還有兩個問題沒有解決呢,咱們一個一個地來解決吧。
問題2:客戶端是怎樣給服務端發送數據的?
咱們回顧一下代碼四吧。第一句爲了完成鏈接的創建,咱們已經分析完畢;而第二句是爲了發送數據,呵呵,分析下去,看能不能解決咱們的問題呢。下面貼出Client.Connection類的sendParam()方法吧:
代碼八:
public void sendParam(Call call) {
if (shouldCloseConnection.get()) {
return;
}
DataOutputBuffer d=null;
try {
synchronized (this.out) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
//建立一個緩衝區
d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //首先寫出數據的長度
out.write(data, 0, dataLength); //向服務端寫數據
out.flush();
}
} catch(IOException e) {
markClosed(e);
} finally {
IOUtils.closeStream(d);
}
}
其實這就是java io的socket發送數據的通常過程哦,沒有什麼特別之處。到這裏問題二也解決了,來看看問題三吧。
問題3:客戶端是怎樣獲取服務端的返回數據的?
咱們再回顧一下代碼六吧。代碼六中,當鏈接創建時會啓動一個線程用於處理服務端返回的數據,咱們看看這個處理線程是怎麼實現的吧,下面貼出Client.Connection類和Client.Call類中的相關方法吧:
代碼九:
方法一:
public void run() {
•••
while (waitForWork()) {
receiveResponse(); //具體的處理方法
}
close();
•••
}
方法二:
private void receiveResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch();
try {
int id = in.readInt(); // 阻塞讀取id
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
Call call = calls.get(id); //在calls池中找到發送時的那個對象
int state = in.readInt(); // 阻塞讀取call對象的狀態
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // 讀取數據
//將讀取到的值賦給call對象,同時喚醒Client等待線程,貼出setValue()代碼方法三
call.setValue(value);
calls.remove(id); //刪除已處理的call
} else if (state == Status.ERROR.state) {
•••
} else if (state == Status.FATAL.state) {
•••
}
} catch (IOException e) {
markClosed(e);
}
}
方法三:
public synchronized void setValue(Writable value) {
this.value = value;
callComplete(); //具體實現
}
protected synchronized void callComplete() {
this.done = true;
notify(); // 喚醒client等待線程
}
代碼九完成的功能主要是:啓動一個處理線程,讀取從服務端傳來的call對象,將call對象讀取完畢後,喚醒client處理線程。就這麼簡單,客戶端就獲取了服務端返回的數據了哦~。客戶端的源碼分析就到這裏了哦,下面咱們來分析Server端的源碼吧。
四.ipc.Server源碼分析
一樣,爲了讓你們對ipc.Server有個初步的瞭解,咱們先分析一下它的幾個內部類吧:
Call :用於存儲客戶端發來的請求
Listener : 監聽類,用於監聽客戶端發來的請求,同時Listener內部還有一個靜態類,Listener.Reader,當監聽器監聽到用戶請求,便讓Reader讀取用戶請求。
Responder :響應RPC請求類,請求處理完畢,由Responder發送給請求客戶端。
Connection :鏈接類,真正的客戶端請求讀取邏輯在這個類中。
Handler :請求處理類,會循環阻塞讀取callQueue中的call對象,並對其進行操做。
若是你看過ipc.Server的源碼,你會發現其實ipc.Server是一個abstract修飾的抽象類。那隨之而來的問題就是:hadoop是怎樣初始化RPC的Server端的呢?這個問題着實也讓我想了好長時間。不事後來我想到Namenode初始化時必定初始化了RPC的Sever端,那咱們去看看Namenode的初始化源碼吧:
1. 初始化Server
代碼十:
private void initialize(Configuration conf) throws IOException {
•••
// 建立 rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
//得到serviceRpcServer
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
//得到server
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
•••
this.server.start(); //啓動 RPC server Clients只容許鏈接該server
if (serviceRpcServer != null) {
serviceRpcServer.start(); //啓動 RPC serviceRpcServer 爲HDFS服務的server
}
startTrashEmptier(conf);
}
查看Namenode初始化源碼得知:RPC的server對象是經過ipc.RPC類的getServer()方法得到的。下面我們去看看ipc.RPC類中的getServer()源碼吧:
代碼十一:
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
final boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
這時咱們發現getServer()是一個建立Server對象的工廠方法,但建立的倒是RPC.Server類的對象。哈哈,如今你明白了我前面說的「RPC.Server是ipc.Server的實現類」了吧。不過RPC.Server的構造函數仍是調用了ipc.Server類的構造函數的,因篇幅所限,就不貼出相關源碼了。
2. 運行Server
如代碼十所示,初始化Server後,Server端就運行起來了,看看ipc.Server的start()源碼吧:
代碼十二:
/** 啓動服務 */
public synchronized void start() {
responder.start(); //啓動responder
listener.start(); //啓動listener
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start(); //逐個啓動Handler
}
}
3. Server處理請求
1)創建鏈接
分析過ipc.Client源碼後,咱們知道Client端的底層通訊直接採用了阻塞式IO編程,當時咱們曾作出猜想:Server端是否是也採用了阻塞式IO。如今咱們仔細地分析一下吧,若是Server端也採用阻塞式IO,當鏈接進來的Client端不少時,勢必會影響Server端的性能。hadoop的實現者們考慮到了這點,因此他們採用了java NIO來實現Server端,java NIO可參考博客:http://weixiaolu.iteye.com/blog/1479656 。那Server端採用java NIO是怎麼創建鏈接的呢?分析源碼得知,Server端採用Listener監聽客戶端的鏈接,下面先分析一下Listener的構造函數吧:
代碼十三:
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// 建立ServerSocketChannel,並設置成非阻塞式
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// 將server socket綁定到本地端口
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort();
// 得到一個selector
selector= Selector.open();
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
//啓動多個reader線程,爲了防止請求多時服務端響應延時的問題
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
// 註冊鏈接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
在啓動Listener線程時,服務端會一直等待客戶端的鏈接,下面貼出Server.Listener類的run()方法:
代碼十四:
public void run() {
•••
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key); //具體的鏈接方法
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
•••
}
下面貼出Server.Listener類中doAccept ()方法中的關鍵源碼吧:
代碼十五:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) { //創建鏈接
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader(); //從readers池中得到一個reader
try {
reader.startAdd(); // 激活readSelector,設置adding爲true
SelectionKey readKey = reader.registerChannel(channel);//將讀事件設置成興趣事件
c = new Connection(readKey, channel, System.currentTimeMillis());//建立一個鏈接對象
readKey.attach(c); //將connection對象注入readKey
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
•••
} finally {
//設置adding爲false,採用notify()喚醒一個reader,其實代碼十三中啓動的每一個reader都使
//用了wait()方法等待。因篇幅有限,就不貼出源碼了。
reader.finishAdd();
}
}
}
當reader被喚醒,reader接着執行doRead()方法。
2)接收請求
下面貼出Server.Listener.Reader類中的doRead()方法和Server.Connection類中的readAndProcess()方法源碼:
代碼十六:
方法一:
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment(); //得到connection對象
if (c == null) {
return;
}
c.setLastContact(System.currentTimeMillis());
try {
count = c.readAndProcess(); // 接受並處理請求
} catch (InterruptedException ieo) {
•••
}
•••
}
方法二:
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
•••
if (!rpcHeaderRead) {
if (rpcHeaderBuffer == null) {
rpcHeaderBuffer = ByteBuffer.allocate(2);
}
//讀取請求頭
count = channelRead(channel, rpcHeaderBuffer);
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
return count;
}
// 讀取請求版本號
int version = rpcHeaderBuffer.get(0);
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
•••
data = ByteBuffer.allocate(dataLength);
}
// 讀取請求
count = channelRead(channel, data);
if (data.remaining() == 0) {
•••
if (useSasl) {
•••
} else {
processOneRpc(data.array());//處理請求
}
•••
}
}
return count;
}
}
3)得到call對象
下面貼出Server.Connection類中的processOneRpc()方法和processData()方法的源碼。
代碼十七:
方法一:
private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
if (headerRead) {
processData(buf);
} else {
processHeader(buf);
headerRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " is unauthorized for user " + user);
}
}
}
方法二:
private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // 嘗試讀取id
Writable param = ReflectionUtils.newInstance(paramClass, conf);//讀取參數
param.readFields(dis);
Call call = new Call(id, param, this); //封裝成call
callQueue.put(call); // 將call存入callQueue
incRpcCount(); // 增長rpc請求的計數
}
4)處理call對象
你還記得Server類中還有個Handler內部類嗎?呵呵,對call對象的處理就是它乾的。下面貼出Server.Handler類中run()方法中的關鍵代碼:
代碼十八:
while (running) {
try {
final Call call = callQueue.take(); //彈出call,可能會阻塞
•••
//調用ipc.Server類中的call()方法,但該call()方法是抽象方法,具體實如今RPC.Server類中
value = call(call.connection.protocol, call.param, call.timestamp);
synchronized (call.connection.responseQueue) {
setupResponse(buf, call,
(error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error);
•••
//給客戶端響應請求
responder.doRespond(call);
}
}
5)返回請求
下面貼出Server.Responder類中的doRespond()方法源碼:
代碼十九:
方法一:
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
// 返回響應結果,並激活writeSelector
processResponse(call.connection.responseQueue, true);
}
}
}
小結:
到這裏,hadoop RPC機制的源碼分析就結束了,請繼續關注個人後續博客:hadoop心跳機制的源碼分析。在這裏須要感謝我所參考的iteye上相關博主的文章,因太多了,就不一一列舉了,不過最感謝的是wikieno的博客,博客地址爲:http://www.wikieno.com/2012/02/hadoop-ipc-server/ 。