集羣RPC通訊怎麼作

RPC

RPC即遠程過程調用,它的提出旨在消除通訊細節、屏蔽繁雜且易錯的底層網絡通訊操做,像調用本地服務通常地調用遠程服務,讓業務開發者更多關注業務開發而沒必要考慮網絡、硬件、系統的異構複雜環境。node

RPC過程

先看看集羣中RPC的整個通訊過程,假設從節點node1開始一個RPC調用,數組

  1. 先將待傳遞的數據放到NIO集羣通訊框架中;
  2. 因爲使用的是NIO模式,線程無需阻塞直接返回;
  3. 因爲與集羣其餘節點通訊須要花銷若干時間,爲了提升CPU使用率當前線程應該放棄CPU的使用權進行等待操做;
  4. NIO集羣通訊框架接收到node2節點的響應消息,並將消息封裝成Response對象保存至響應數組;
  5. 通訊框架接收到node4節點的響應消息,因爲是使用了並行通訊,因此node4可能比node3先返回消息,並將消息封裝成Response對象保存至響應數組;
  6. 通訊框架最後接收到node3節點的響應消息,並將消息封裝成Response對象保存至響應數組;
  7. 如今全部節點的響應都已經收集完畢,是時候通知剛剛被阻塞的那條線程了,原來的線程被notify醒後拿到全部節點的響應Response[]進行處理,至此完成了整個集羣RPC過程。

多線程

上面整個過程是在只有一條線程的狀況下,一切看起來沒什麼問題,但若是有多條線程併發調用則會致使一個問題:線程與響應的對應關係將被打亂,沒法肯定哪一個線程對應哪幾個響應。bash

由於NIO通訊框架不會每一個線程都獨自使用一個socket通道,爲提升性能通常都是使用長鏈接,全部線程共用一個socket通道,這時就算線程一比線程二先放入通訊框架也不能保證響應一比響應二先接收到,因此接收到響應一後不知道該通知線程一仍是線程二。只有解決了這個問題才能保證RPC調用的正確性。網絡

怎麼解決多線程

要解決線程與響應對應的問題就須要維護一個線程響應關係列表,響應從關係列表中就能查找對應的線程,如圖,在發送以前生成一個UUID標識,此標識要保證同socket中惟一,再把UUID與線程對象關係對應起來,可以使用Map數據結構實現,UUID的值做爲key,線程對應的鎖對象爲value。數據結構

接着制定一個協議報文,UUID做爲報文的其中一部分,報文發往另外一個節點node2後將響應信息message放入報文中並返回,node1對接收到的報文進行解包根據UUID去查找並喚起對應的線程,告訴它「你要的消息已經收到,往下處理吧」。但在集羣環境下,咱們更但願是集羣中全部節點的消息都接收到了才往下處理,如圖下半部分,一個UUID1的請求報文會發往node二、node3和node4三個節點,這時假如只接收到一個響應則不喚起線程,直到node二、node3對應UUID1的響應報文都接收到後才喚起對應線程往下執行。一樣地,UUID二、UUID3的報文消息都是如此處理,最後集羣中對應的響應都能正確回到各自線程上。多線程

來個例子

用簡單代碼實現一個RPC例子,自行選擇一個集羣通訊框架負責底層通訊,接着往下:併發

  1. 定義一個RPC接口,這些方法是預留提供給上層具體邏輯處理的入口,replyRequest方法用於處理響應邏輯,leftOver方法用於殘留請求的邏輯處理。
public interface RpcCallback {  
    public Serializable replyRequest(Serializable msg, Member sender);  
    public void leftOver(Serializable msg, Member sender);  
}  
複製代碼
  1. 定義通訊消息協議,實現Externalizable接口自定義序列化和反序列化,message用於存放響應消息,uuid標識用於關聯線程,rpcId用於標識RPC實例,reply表示是否回覆。
public class RpcMessage implements Externalizable {
  protected Serializable message;
  protected byte[] uuid;
  protected byte[] rpcId;
  protected boolean reply = false;

  public RpcMessage() {}

  public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
    this.rpcId = rpcId;
    this.uuid = uuid;
    this.message = message;
  }

  @Override
  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
    reply = in.readBoolean();
    int length = in.readInt();
    uuid = new byte[length];
    in.readFully(uuid);
    length = in.readInt();
    rpcId = new byte[length];
    in.readFully(rpcId);
    message = (Serializable) in.readObject();
  }

  @Override
  public void writeExternal(ObjectOutput out) throws IOException {
    out.writeBoolean(reply);
    out.writeInt(uuid.length);
    out.write(uuid, 0, uuid.length);
    out.writeInt(rpcId.length);
    out.write(rpcId, 0, rpcId.length);
    out.writeObject(message);
  }
}

複製代碼
  1. 響應類型,提供多種喚起線程的條件,一共四種類型,分別表示接收到第一個響應就喚起線程、接收到集羣中大多數節點的響應就喚起線程、接收到集羣中全部節點的響應才喚起線程、無需等待響應的無響應模式。
public class RpcResponseType {  
  public static final int FIRST_REPLY = 1;  
  public static final int MAJORITY_REPLY = 2;  
  public static final int ALL_REPLY = 3;  
  public static final int NO_REPLY = 4;  
}  
複製代碼
  1. 響應對象,用於封裝接收到的消息,Member在通訊框架是節點的抽象,這裏用來表示來源節點。
public class RpcResponse {
  private Member source;
  private Serializable message;

  public RpcResponse() {}

  public RpcResponse(Member source, Serializable message) {
    this.source = source;
    this.message = message;
  }

  public void setSource(Member source) {
    this.source = source;
  }

  public void setMessage(Serializable message) {
    this.message = message;
  }

  public Member getSource() {
    return source;
  }

  public Serializable getMessage() {
    return message;
  }
}
 
複製代碼
  1. RPC響應集,用於存放同個UUID的全部響應。
public class RpcCollector {  
    public ArrayList<RpcResponse> responses = new ArrayList<RpcResponse>();   
    public byte[] key;  
    public int options;  
    public int destcnt;  
    public RpcCollector(byte[] key, int options, int destcnt) {  
        this.key = key;  
        this.options = options;  
        this.destcnt = destcnt;  
    }  
    public void addResponse(Serializable message, Member sender){  
        RpcResponse resp = new RpcResponse(sender,message);  
        responses.add(resp);  
    }  
    public boolean isComplete() {  
        if ( destcnt <= 0 ) return true;  
        switch (options) {  
            case RpcResponseType.ALL_REPLY:  
                return destcnt == responses.size();  
            case RpcResponseType.MAJORITY_REPLY:  
            {  
                float perc = ((float)responses.size()) / ((float)destcnt);  
                return perc >= 0.50f;  
            }  
            case RpcResponseType.FIRST_REPLY:  
                return responses.size()>0;  
            default:  
                return false;  
        }  
    }  
    public RpcResponse[] getResponses() {  
        return responses.toArray(new RpcResponse[responses.size()]);  
    }  
}  
複製代碼
  1. RPC核心類,是整個RPC的抽象,它實現通訊框架的ChannelListener接口,實現了該接口就能在messageReceived方法中處理接收到的消息。由於全部的消息都會經過此方法,因此它必需要根據key去處理對應的線程,同時它也要負責調用RpcCallback接口定義的相關的方法,例如響應請求的replyRequest方法和處理殘留的響應leftOver方法,殘留響應是指有時咱們在接收到第一個響應後就喚起線程。
public class RpcChannel implements ChannelListener {
  private Channel channel;
  private RpcCallback callback;
  private byte[] rpcId;
  private int replyMessageOptions = 0;
  private HashMap<byte[], RpcCollector> responseMap = new HashMap<byte[], RpcCollector>();

  public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
    this.rpcId = rpcId;
    this.channel = channel;
    this.callback = callback;
    channel.addChannelListener(this);
  }

  public RpcResponse[] send(Member[] destination, Serializable message, int rpcOptions,
      int channelOptions, long timeout) throws ChannelException {
    int sendOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
    byte[] key = UUIDGenerator.randomUUID(false);
    RpcCollector collector = new RpcCollector(key, rpcOptions, destination.length);
    try {
      synchronized (collector) {
        if (rpcOptions != RpcResponseType.NO_REPLY) responseMap.put(key, collector);
        RpcMessage rmsg = new RpcMessage(rpcId, key, message);
        channel.send(destination, rmsg, sendOptions);
        if (rpcOptions != RpcResponseType.NO_REPLY) collector.wait(timeout);
      }
    } catch (InterruptedException ix) {
      Thread.currentThread().interrupt();
    } finally {
      responseMap.remove(key);
    }
    return collector.getResponses();
  }

  @Override
  public void messageReceived(Serializable msg, Member sender) {
    RpcMessage rmsg = (RpcMessage) msg;
    byte[] key = rmsg.uuid;
    if (rmsg.reply) {
      RpcCollector collector = responseMap.get(key);
      if (collector == null) {
        callback.leftOver(rmsg.message, sender);
      } else {

        synchronized (collector) {
          if (responseMap.containsKey(key)) {
            collector.addResponse(rmsg.message, sender);
            if (collector.isComplete()) collector.notifyAll();
          } else {
            callback.leftOver(rmsg.message, sender);
          }
        }
      }
    } else {
      Serializable reply = callback.replyRequest(rmsg.message, sender);
      rmsg.reply = true;
      rmsg.message = reply;
      try {
        channel.send(new Member[] {sender}, rmsg,
            replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
      } catch (Exception x) {}
    }
  }

  @Override
  public boolean accept(Serializable msg, Member sender) {
    if (msg instanceof RpcMessage) {
      RpcMessage rmsg = (RpcMessage) msg;
      return Arrays.equals(rmsg.rpcId, rpcId);
    } else
      return false;
  }
}

複製代碼
  1. 自定義一個RPC,它要實現RpcCallback接口,分別對請求處理和殘留響應處理,這裏請求處理僅僅是簡單返回「hello,response for you!」做爲響應消息,殘留響應處理則是簡單輸出「receive a leftover message!」。假如整個集羣有五個節點,因爲接收模式設置成了FIRST_REPLY,因此每一個只會接受一個響應消息,其餘的響應都被當作殘留響應處理。
public class MyRPC implements RpcCallback {
  @Override
  public Serializable replyRequest(Serializable msg, Member sender) {
    RpcMessage mapmsg = (RpcMessage) msg;
    mapmsg.message = "hello,response for you!";
    return mapmsg;
  }

  @Override
  public void leftOver(Serializable msg, Member sender) {
    System.out.println("receive a leftover message!");
  }

  public static void main(String[] args) {
    MyRPC myRPC = new MyRPC();
    byte[] rpcId = new byte[] {1, 1, 1, 1};
    byte[] key = new byte[] {0, 0, 0, 0};
    String message = "hello";
    int sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | Channel.SEND_OPTIONS_USE_ACK;
    RpcMessage msg = new RpcMessage(rpcId, key, (Serializable) message);
    RpcChannel rpcChannel = new RpcChannel(rpcId, channel, myRPC);
    RpcResponse[] resp =
        rpcChannel.send(channel.getMembers(), msg, RpcResponseType.FIRST_REPLY, sendOptions, 3000);
    while (true)
      Thread.currentThread().sleep(1000);
  }
}

複製代碼

能夠看到經過上面的RPC封裝後,上層能夠把更多的精力關注到消息邏輯處理上面了,而沒必要關注具體的網絡IO如何實現,屏蔽了繁雜重複的網絡傳輸操做,爲上層提供了很大的方便。框架

=============廣告時間===============dom

公衆號的菜單已分爲「分佈式」、「機器學習」、「深度學習」、「NLP」、「Java深度」、「Java併發核心」、「JDK源碼」、「Tomcat內核」等,可能有一款適合你的胃口。機器學習

鄙人的新書《Tomcat內核設計剖析》已經在京東銷售了,有須要的朋友能夠購買。感謝各位朋友。

爲何寫《Tomcat內核設計剖析》

=========================

歡迎關注:

這裏寫圖片描述
相關文章
相關標籤/搜索