Remote Procedure Call 遠程方法調用。不須要了解網絡細節,某一程序便可使用該協議請求來自網絡內另外一臺及其程序的服務。它是一個 Client/Server 的結構,提供服務的一方稱爲Server,消費服務的一方稱爲Client。java
Hadoop 底層的交互都是經過 rpc 進行的。例 如:datanode 和 namenode、tasktracker 和 jobtracker、secondary namenode 和 namenode 之間的通訊都是經過 rpc 實現的。node
TODO: 此文未寫明瞭。明顯須要畫 4張圖, rpc 原理圖,Hadoop rpc 時序圖, 客戶端 流程圖,服端流程圖。最好帖幾個包圖+ 類圖(組件圖)。待完善。apache
要實現遠程過程調用,須要有3要素: 一、server 必須發佈服務 二、在 client 和 server 兩端都須要有模塊來處理協議和鏈接 三、server 發佈的服務,須要將接口給到 client服務器
TODO 缺個 RPC 圖網絡
Hadoop RPC 源代碼主要在org.apache.hadoop.ipc包下。org.apache.hadoop.ipc.RPC 內部包含5個內部類。app
org.apache.hadoop.ipc.Client 有5個內部類框架
客戶端和服務端創建鏈接的大體執行過程爲:socket
在 Object org.apache.hadoop.ipc.RPC.Invoker.invoke(Object proxy, Method method, Object[] args) 方法中調用
client.call(new Invocation(method, args), remoteId);ide
上述的 new Invocation(method, args) 是 org.apache.hadoop.ipc.RPC 的內部類,它包含被調用的方法名稱及其參數。此處主要是設置方法和參數。 client 爲 org.apache.hadoop.ipc.Client 的實例對象。函數
org.apache.hadoop.ipc.Client.call() 方法的具體源代碼。在call()方法中 getConnection()內部獲取一個 org.apache.hadoop.ipc.Client.Connection 對象並啓動 io 流 setupIOstreams()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
Writable org.apache.hadoop.ipc.Client.call(Writable param, ConnectionId remoteId) throwsInterruptedException, IOException {
Call call =
new
Call(param);
//A call waiting for a value.
// Get a connection from the pool, or create a new one and add it to the
// pool. Connections to a given ConnectionId are reused.
Connection connection = getConnection(remoteId, call);
// 主要在 org.apache.hadoop.net 包下。
connection.sendParam(call);
//客戶端發送數據過程
boolean
interrupted =
false
;
synchronized
(call) {
while
(!call.done) {
try
{
call.wait();
// wait for the result
}
catch
(InterruptedException ie) {
// save the fact that we were interrupted
interrupted =
true
;
}
}
… …
}
}
// Get a connection from the pool, or create a new one and add it to the
// pool. Connections to a given ConnectionId are reused.
private
Connection getConnection(ConnectionId remoteId,
Call call)
throws
IOException, InterruptedException {
if
(!running.get()) {
// the client is stopped
throw
new
IOException(
"The client is stopped"
);
}
Connection connection;
// we could avoid this allocation for each RPC by having a
// connectionsId object and with set() method. We need to manage the
// refs for keys in HashMap properly. For now its ok.
do
{
synchronized
(connections) {
connection = connections.get(remoteId);
if
(connection ==
null
) {
connection =
new
Connection(remoteId);
connections.put(remoteId, connection);
}
}
}
while
(!connection.addCall(call));
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
//it will take longer to establish a connection and that will slow the
//entire system down.
connection.setupIOstreams();
// 向服務段發送一個 header 並等待結果
return
connection;
}
|
setupIOstreams() 方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
void
org.apache.hadoop.ipc.Client.Connection.setupIOstreams()
throws
InterruptedException {
// Connect to the server and set up the I/O streams. It then sends
// a header to the server and starts
// the connection thread that waits for responses.
while
(
true
) {
setupConnection();
// 創建鏈接
InputStream inStream = NetUtils.getInputStream(socket);
// 輸入
OutputStream outStream = NetUtils.getOutputStream(socket);
// 輸出
writeRpcHeader(outStream);
}
… …
// update last activity time
touch();
// start the receiver thread after the socket connection has been set up start();
}
|
啓動org.apache.hadoop.ipc.Client.Connection 客戶端獲取服務器端放回數據過程
1
2
3
4
|
void
org.apache.hadoop.ipc.Client.Connection.run()
while
(waitForWork()) {
//wait here for work - read or close connection
receiveResponse();
}
|
ipc.Server 有6個內部類:
大體過程爲:
Namenode的初始化時,RPC的server對象是經過ipc.RPC類的getServer()方法得到的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
void
org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(Configuration conf) throwsIOException
// create rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if
(dnSocketAddr !=
null
) {
int
serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
this
.serviceRpcServer = RPC.getServer(
this
, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false
, conf, namesystem.getDelegationTokenSecretManager());
this
.serviceRPCAddress =
this
.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
… …
this
.server.start();
//start RPC server
|
啓動 server
1
2
3
4
5
6
7
8
9
10
11
|
void
org.apache.hadoop.ipc.Server.start()
// Starts the service. Must be called before any calls will be handled.
public
synchronized
void
start() {
responder.start();
listener.start();
handlers =
new
Handler[handlerCount];
for
(
int
i =
0
; i < handlerCount; i++) {
handlers[i] =
new
Handler(i);
handlers[i].start();
//處理call
}
}
|
Server處理請求, server 一樣使用非阻塞 nio 以提升吞吐量
1
2
3
4
5
6
7
|
org.apache.hadoop.ipc.Server.Listener.Listener(Server)
throws
IOException
public
Listener()
throws
IOException {
address =
new
InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(
false
);
… … }
|
真正創建鏈接
1
|
void
org.apache.hadoop.ipc.Server.Listener.doAccept(SelectionKey key)
throws
IOException,OutOfMemoryError
|
Reader 讀數據接收請求
1
2
3
4
5
6
7
|
void
org.apache.hadoop.ipc.Server.Listener.doRead(SelectionKey key)
throws
InterruptedException
try
{
count = c.readAndProcess();
}
catch
(InterruptedException ieo) {
LOG.info(getName() +
": readAndProcess caught InterruptedException"
, ieo);
throw
ieo;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
int
org.apache.hadoop.ipc.Server.Connection.readAndProcess()
throws
IOException,InterruptedException
if
(!rpcHeaderRead) {
//Every connection is expected to send the header.
if
(rpcHeaderBuffer ==
null
) {
rpcHeaderBuffer = ByteBuffer.allocate(
2
);
}
count = channelRead(channel, rpcHeaderBuffer);
if
(count <
0
|| rpcHeaderBuffer.remaining() >
0
) {
return
count;
}
int
version = rpcHeaderBuffer.get(
0
);
… …
processOneRpc(data.array());
// 數據處理
|
下面貼出Server.Connection類中的processOneRpc()方法和processData()方法的源碼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
void
org.apache.hadoop.ipc.Server.Connection.processOneRpc(
byte
[] buf)
throws
IOException,InterruptedException
private
void
processOneRpc(
byte
[] buf)
throws
IOException,
InterruptedException {
if
(headerRead) {
processData(buf);
}
else
{
processHeader(buf);
headerRead =
true
;
if
(!authorizeConnection()) {
throw
new
AccessControlException(
"Connection from "
+
this
+
" for protocol "
+ header.getProtocol()
+
" is unauthorized for user "
+ user);
}
}
}
|
處理call
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
void
org.apache.hadoop.ipc.Server.Handler.run()
while
(running) {
try
{
final
Call call = callQueue.take();
// pop the queue; maybe blocked here
… …
CurCall.set(call);
try
{
// Make the call as the user via Subject.doAs, thus associating
// the call with the Subject
if
(call.connection.user ==
null
) {
value = call(call.connection.protocol, call.param,
call.timestamp);
}
else
{
… …}
|
返回請求
下面貼出Server.Responder類中的doRespond()方法源碼:
1
2
3
4
5
6
7
8
9
10
11
12
|
void
org.apache.hadoop.ipc.Server.Responder.doRespond(Call call)
throws
IOException
//
// Enqueue a response from the application.
//
void
doRespond(Call call)
throws
IOException {
synchronized
(call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if
(call.connection.responseQueue.size() ==
1
) {
processResponse(call.connection.responseQueue,
true
);
}
}
}
|
補充: notify()讓因wait()進入阻塞隊列裏的線程(blocked狀態)變爲runnable,而後發出notify()動做的線程繼續執行完,待其完成後,進行調度時,調用wait()的線程可能會被再次調度而進入running狀態。