RPC即遠程過程調用,它的提出旨在消除通訊細節、屏蔽繁雜且易錯的底層網絡通訊操做,像調用本地服務通常地調用遠程服務,讓業務開發者更多關注業務開發而沒必要考慮網絡、硬件、系統的異構複雜環境。node
先看看集羣中RPC的整個通訊過程,假設從節點node1開始一個RPC調用,數組
上面整個過程是在只有一條線程的狀況下,一切看起來沒什麼問題,但若是有多條線程併發調用則會致使一個問題:線程與響應的對應關係將被打亂,沒法肯定哪一個線程對應哪幾個響應。bash
由於NIO通訊框架不會每一個線程都獨自使用一個socket通道,爲提升性能通常都是使用長鏈接,全部線程共用一個socket通道,這時就算線程一比線程二先放入通訊框架也不能保證響應一比響應二先接收到,因此接收到響應一後不知道該通知線程一仍是線程二。只有解決了這個問題才能保證RPC調用的正確性。網絡
要解決線程與響應對應的問題就須要維護一個線程響應關係列表,響應從關係列表中就能查找對應的線程,如圖,在發送以前生成一個UUID標識,此標識要保證同socket中惟一,再把UUID與線程對象關係對應起來,可以使用Map數據結構實現,UUID的值做爲key,線程對應的鎖對象爲value。數據結構
接着制定一個協議報文,UUID做爲報文的其中一部分,報文發往另外一個節點node2後將響應信息message放入報文中並返回,node1對接收到的報文進行解包根據UUID去查找並喚起對應的線程,告訴它「你要的消息已經收到,往下處理吧」。但在集羣環境下,咱們更但願是集羣中全部節點的消息都接收到了才往下處理,如圖下半部分,一個UUID1的請求報文會發往node二、node3和node4三個節點,這時假如只接收到一個響應則不喚起線程,直到node二、node3對應UUID1的響應報文都接收到後才喚起對應線程往下執行。一樣地,UUID二、UUID3的報文消息都是如此處理,最後集羣中對應的響應都能正確回到各自線程上。多線程
用簡單代碼實現一個RPC例子,自行選擇一個集羣通訊框架負責底層通訊,接着往下:併發
public interface RpcCallback {
public Serializable replyRequest(Serializable msg, Member sender);
public void leftOver(Serializable msg, Member sender);
}
複製代碼
public class RpcMessage implements Externalizable {
protected Serializable message;
protected byte[] uuid;
protected byte[] rpcId;
protected boolean reply = false;
public RpcMessage() {}
public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
this.rpcId = rpcId;
this.uuid = uuid;
this.message = message;
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
reply = in.readBoolean();
int length = in.readInt();
uuid = new byte[length];
in.readFully(uuid);
length = in.readInt();
rpcId = new byte[length];
in.readFully(rpcId);
message = (Serializable) in.readObject();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeBoolean(reply);
out.writeInt(uuid.length);
out.write(uuid, 0, uuid.length);
out.writeInt(rpcId.length);
out.write(rpcId, 0, rpcId.length);
out.writeObject(message);
}
}
複製代碼
public class RpcResponseType {
public static final int FIRST_REPLY = 1;
public static final int MAJORITY_REPLY = 2;
public static final int ALL_REPLY = 3;
public static final int NO_REPLY = 4;
}
複製代碼
public class RpcResponse {
private Member source;
private Serializable message;
public RpcResponse() {}
public RpcResponse(Member source, Serializable message) {
this.source = source;
this.message = message;
}
public void setSource(Member source) {
this.source = source;
}
public void setMessage(Serializable message) {
this.message = message;
}
public Member getSource() {
return source;
}
public Serializable getMessage() {
return message;
}
}
複製代碼
public class RpcCollector {
public ArrayList<RpcResponse> responses = new ArrayList<RpcResponse>();
public byte[] key;
public int options;
public int destcnt;
public RpcCollector(byte[] key, int options, int destcnt) {
this.key = key;
this.options = options;
this.destcnt = destcnt;
}
public void addResponse(Serializable message, Member sender){
RpcResponse resp = new RpcResponse(sender,message);
responses.add(resp);
}
public boolean isComplete() {
if ( destcnt <= 0 ) return true;
switch (options) {
case RpcResponseType.ALL_REPLY:
return destcnt == responses.size();
case RpcResponseType.MAJORITY_REPLY:
{
float perc = ((float)responses.size()) / ((float)destcnt);
return perc >= 0.50f;
}
case RpcResponseType.FIRST_REPLY:
return responses.size()>0;
default:
return false;
}
}
public RpcResponse[] getResponses() {
return responses.toArray(new RpcResponse[responses.size()]);
}
}
複製代碼
public class RpcChannel implements ChannelListener {
private Channel channel;
private RpcCallback callback;
private byte[] rpcId;
private int replyMessageOptions = 0;
private HashMap<byte[], RpcCollector> responseMap = new HashMap<byte[], RpcCollector>();
public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
this.rpcId = rpcId;
this.channel = channel;
this.callback = callback;
channel.addChannelListener(this);
}
public RpcResponse[] send(Member[] destination, Serializable message, int rpcOptions,
int channelOptions, long timeout) throws ChannelException {
int sendOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
byte[] key = UUIDGenerator.randomUUID(false);
RpcCollector collector = new RpcCollector(key, rpcOptions, destination.length);
try {
synchronized (collector) {
if (rpcOptions != RpcResponseType.NO_REPLY) responseMap.put(key, collector);
RpcMessage rmsg = new RpcMessage(rpcId, key, message);
channel.send(destination, rmsg, sendOptions);
if (rpcOptions != RpcResponseType.NO_REPLY) collector.wait(timeout);
}
} catch (InterruptedException ix) {
Thread.currentThread().interrupt();
} finally {
responseMap.remove(key);
}
return collector.getResponses();
}
@Override
public void messageReceived(Serializable msg, Member sender) {
RpcMessage rmsg = (RpcMessage) msg;
byte[] key = rmsg.uuid;
if (rmsg.reply) {
RpcCollector collector = responseMap.get(key);
if (collector == null) {
callback.leftOver(rmsg.message, sender);
} else {
synchronized (collector) {
if (responseMap.containsKey(key)) {
collector.addResponse(rmsg.message, sender);
if (collector.isComplete()) collector.notifyAll();
} else {
callback.leftOver(rmsg.message, sender);
}
}
}
} else {
Serializable reply = callback.replyRequest(rmsg.message, sender);
rmsg.reply = true;
rmsg.message = reply;
try {
channel.send(new Member[] {sender}, rmsg,
replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
} catch (Exception x) {}
}
}
@Override
public boolean accept(Serializable msg, Member sender) {
if (msg instanceof RpcMessage) {
RpcMessage rmsg = (RpcMessage) msg;
return Arrays.equals(rmsg.rpcId, rpcId);
} else
return false;
}
}
複製代碼
public class MyRPC implements RpcCallback {
@Override
public Serializable replyRequest(Serializable msg, Member sender) {
RpcMessage mapmsg = (RpcMessage) msg;
mapmsg.message = "hello,response for you!";
return mapmsg;
}
@Override
public void leftOver(Serializable msg, Member sender) {
System.out.println("receive a leftover message!");
}
public static void main(String[] args) {
MyRPC myRPC = new MyRPC();
byte[] rpcId = new byte[] {1, 1, 1, 1};
byte[] key = new byte[] {0, 0, 0, 0};
String message = "hello";
int sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | Channel.SEND_OPTIONS_USE_ACK;
RpcMessage msg = new RpcMessage(rpcId, key, (Serializable) message);
RpcChannel rpcChannel = new RpcChannel(rpcId, channel, myRPC);
RpcResponse[] resp =
rpcChannel.send(channel.getMembers(), msg, RpcResponseType.FIRST_REPLY, sendOptions, 3000);
while (true)
Thread.currentThread().sleep(1000);
}
}
複製代碼
能夠看到經過上面的RPC封裝後,上層能夠把更多的精力關注到消息邏輯處理上面了,而沒必要關注具體的網絡IO如何實現,屏蔽了繁雜重複的網絡傳輸操做,爲上層提供了很大的方便。框架
=============廣告時間===============dom
公衆號的菜單已分爲「分佈式」、「機器學習」、「深度學習」、「NLP」、「Java深度」、「Java併發核心」、「JDK源碼」、「Tomcat內核」等,可能有一款適合你的胃口。機器學習
鄙人的新書《Tomcat內核設計剖析》已經在京東銷售了,有須要的朋友能夠購買。感謝各位朋友。
=========================
歡迎關注: