揭開 Raft 的神祕面紗,和ApacheRatis 瞭解Raft 組件的使用

揭開 Raft 的神祕面紗,和ApacheRatis 瞭解Raft 組件的使用

相比 Paxos, Raft 一直以來就是以易於理解著稱。今天咱們以一年 Raft 使用者的角度,來看一下,別人根據 Raft 論文實現了以後,咱們通常要怎麼樣使用。java

俗話說,要想知道梨子的味道,就要親口嘗一嘗,沒吃過豬肉,也要見一見豬跑。不然別人再怎麼樣形容,你可能還覺得是像貓狗一類毛茸茸。git

在 Raft 官網裏長長的列表就能發現,實現 Raft 的框架目前很多。Java 裏我大概看了螞蟻的 SOFARaft 和 Apache 的 Ratis。此次咱們以 Ratis 爲例,揭開面紗,來看看到底要怎樣使用。github

固然,下面具體提到的例子,也是這些組件中自帶的 example。算法

1、編譯

github下載 Ratis 直接 mvn clean package 便可,若是編譯過程當中出錯,能夠先clean install ratis-protoshell

2、示例

Ratis 自帶的示例有三個:apache

  • arithmetic
  • counter
  • filestore

在 ratis-examples 模塊中,對於 arithmetic 和 filestore比較方便,能夠經過main/bin目錄下的 shell 腳本快速啓動 Server 和 Client 來進行測試。微信

對於Raft,我們都知道是須要多實例組成集羣才能測試,你啓動一個實例沒啥用,連選主都成問題。Bin 目錄下的 start-all 支持 example 的名稱以及對應的命令。好比 filestore server 表明是啓動 filestore 這個應用的server。對應的命令參數會在相應example裏的 cli 中解析。同時會一次性啓動三個server,組成一個集羣並在週期內完成選舉。架構

而對於 counter 這個示例,並無相應的腳原本快速啓動三個server,這個咱們能夠經過命令行或者在IDE裏以參數的形式啓動。app

3、分析

下面咱們來示例裏看下 Raft Server 是怎樣工做的。負載均衡

對於 counter 示例來講,咱們啓動的時候,須要傳入一個參數,表明當前的server是第幾個,目的在於,要從 peers 列表中得知該用哪一個IP + 端口去啓動它。這裏咱們能發現,這個 peers 列表,是在代碼內提早設置好的。固然你說動態配置啥的,也沒啥問題,另外兩個示例是經過shell 腳本里common 中的配置傳入的。

因此,第一步咱們看到, Raft Server 在啓動的時候,會經過「配置」的形式,來知道 peer 之間的存在,這樣才能彼此通訊,讓別人給本身投票或者給別人投票,完成 Term 內的選舉。另外,才能接收到 Leader 傳過來的 Log ,而且應用到本地。

第二步,咱們來看下 Client 和 集羣之間是如何通訊的。整個 Raft 集羣可能有多個實例,咱們知道必須經過 Leader 來完成寫操做。那怎樣知道誰是Leader?有什麼辦法?

通常常見的思路有:

  • 在寫以前,先去集羣內查一下,誰是 Leader,而後再寫
  • 隨機拿一個寫,不行再換一個,不停的試,總會有一個成功。

固然方式二這樣試下去效率不過高。因此會在這個隨機試一次以後,集羣會將當前的 Leader 信息返回給 Client,而後 Client 直接經過這個創建鏈接進行通訊便可。

在 Ratis 裏, Client 調用非 Leader 節點會收到 Server 拋出的一個異常,異常中會包含一個稱爲 suggestLeader 的信息,表示當前正確的 Leader,按這個連上去就行。固然,若是若是在此過程當中發生的 Leader 的變動,那就會有一個新的suggestLeader 返回來,再次重試。

咱們來看 Counter 這個示例中的實現。

Server 和 Client 的共用的Common 代碼中,包含 peers 的聲明

public final class CounterCommon { 
  public static final List<RaftPeer> PEERS = new ArrayList<>(3); 

  static { 
    PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n1"), "127.0.0.1:6000")); 
    PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n2"), "127.0.0.1:6001")); 
    PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n3"), "127.0.0.1:6002")); 
  }

這裏聲明瞭三個節點。

經過命令行啓動時,會直接把index 傳進來, index 取值1-3。

java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}

而後在Server 啓動的時候,拿到對應的配置信息。

//find current peer object based on application parameter 
    RaftPeer currentPeer = 
        CounterCommon.PEERS.get(Integer.parseInt(args[0]) - 1);

再設置存儲目錄

//set the storage directory (different for each peer) in RaftProperty object 
    File raftStorageDir = new File("./" + currentPeer.getId().toString()); 
    RaftServerConfigKeys.setStorageDir(properties, 
        Collections.singletonList(raftStorageDir))

重點看這裏,每一個 Server 都會有一個狀態機「CounterStateMachine」,平時咱們的「業務邏輯」都放到這裏

//create the counter state machine which hold the counter value 
    CounterStateMachine counterStateMachine = new CounterStateMachine();

客戶端發送的命令,會在這個狀態機中被執行,同時這些命令又以Log 的形式複製給其它節點,各個節點的Log 又會在它本身的狀態機裏執行,從而保證各個節點狀態的一致。

揭開 Raft 的神祕面紗,和ApacheRatis 瞭解Raft 組件的使用

最後根據這些配置,生成 Raft Server 實例並啓動。

//create and start the Raft server 
    RaftServer server = RaftServer.newBuilder() 
        .setGroup(CounterCommon.RAFT_GROUP) 
        .setProperties(properties) 
        .setServerId(currentPeer.getId()) 
        .setStateMachine(counterStateMachine) 
        .build(); 
    server.start();

CounterStateMachine 裏,應用計數的這一小段代碼,咱們看先檢查了命令是否合法,而後執行命令

//check if the command is valid 
    String logData = entry.getStateMachineLogEntry().getLogData() 
        .toString(Charset.defaultCharset()); 
    if (!logData.equals("INCREMENT")) { 
      return CompletableFuture.completedFuture( 
          Message.valueOf("Invalid Command")); 
    } 
    //update the last applied term and index 
    final long index = entry.getIndex(); 
    updateLastAppliedTermIndex(entry.getTerm(), index); 

    //actual execution of the command: increment the counter 
    counter.incrementAndGet(); 

    //return the new value of the counter to the client 
    final CompletableFuture<Message> f = 
        CompletableFuture.completedFuture(Message.valueOf(counter.toString())); 

    //if leader, log the incremented value and it's log index 
    if (trx.getServerRole() == RaftProtos.RaftPeerRole.LEADER) { 
      LOG.info("{}: Increment to {}", index, counter.toString()); 
    }

咱們再來看 Client 的實現。

和 Server 相似,經過配置屬性,建立一個實例

private static RaftClient buildClient() { 
    RaftProperties raftProperties = new RaftProperties(); 
    RaftClient.Builder builder = RaftClient.newBuilder() 
        .setProperties(raftProperties) 
        .setRaftGroup(CounterCommon.RAFT_GROUP) 
        .setClientRpc( 
            new GrpcFactory(new Parameters()) 
                .newRaftClientRpc(ClientId.randomId(), raftProperties)); 
    return builder.build(); 
  }

而後就能夠向Server發送命令開工了。

raftClient.send(Message.valueOf("INCREMENT"));
Counter 的狀態機支持INCREMENT 和 GET 兩個命令。因此example 最後執行了一個 GET 的命令來獲取最終的計數結果

RaftClientReply count = raftClient.sendReadOnly(Message.valueOf("GET"));

4、內部部分實現

RaftClientImpl 裏,初期會從peers列表中選一個,當成leader 去請求。

RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, 
      RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) { 
    this.clientId = clientId; 
    this.clientRpc = clientRpc; 
    this.peers = new ConcurrentLinkedQueue<>(group.getPeers()); 
    this.groupId = group.getGroupId(); 
    this.leaderId = leaderId != null? leaderId 
        : !peers.isEmpty()? peers.iterator().next().getId(): null; 
    ... 
  }

以後,會根據server 返回的不一樣異常分別處理。

private RaftClientReply sendRequest(RaftClientRequest request) throws IOException { 
    RaftClientReply reply; 
    try { 
      reply = clientRpc.sendRequest(request); 
    } catch (GroupMismatchException gme) { 
      throw gme; 
    } catch (IOException ioe) { 
      handleIOException(request, ioe); 
    } 
    reply = handleLeaderException(request, reply, null); 
    reply = handleRaftException(reply, Function.identity()); 
    return reply; 
  }

好比在 handleLeaderException 中,又分幾種狀況,由於經過Client 來和 Server 進行通信的時候,會隨機從peers裏選擇一個,作爲leader去請求,若是 Server 返回異常,說它不是leader,就用下面的代碼,隨機從另外的peer裏選擇一個再去請求。

final RaftPeerId oldLeader = request.getServerId(); 
    final RaftPeerId curLeader = leaderId; 
    final boolean stillLeader = oldLeader.equals(curLeader); 
    if (newLeader == null && stillLeader) { 
      newLeader = CollectionUtils.random(oldLeader, 
          CollectionUtils.as(peers, RaftPeer::getId)); 
    } 

 static <T> T random(final T given, Iterable<T> iteration) { 
    Objects.requireNonNull(given, "given == null"); 
    Objects.requireNonNull(iteration, "iteration == null"); 

    final List<T> list = StreamSupport.stream(iteration.spliterator(), false) 
        .filter(e -> !given.equals(e)) 
        .collect(Collectors.toList()); 
    final int size = list.size(); 
    return size == 0? null: list.get(ThreadLocalRandom.current().nextInt(size)); 
  }

是否是感受很低效。若是這個時候,server 返回的信息裏,告訴client 誰是 leader,那client 直接連上去就能夠了是吧。

/** 
   * @return null if the reply is null or it has 
   * {@link NotLeaderException} or {@link LeaderNotReadyException} 
   * otherwise return the same reply. 
   */ 
  RaftClientReply handleLeaderException(RaftClientRequest request, RaftClientReply reply, 
                                        Consumer<RaftClientRequest> handler) { 
    if (reply == null || reply.getException() instanceof LeaderNotReadyException) { 
      return null; 
    } 
    final NotLeaderException nle = reply.getNotLeaderException(); 
    if (nle == null) { 
      return reply; 
    } 
    return handleNotLeaderException(request, nle, handler); 
  }
RaftClientReply handleNotLeaderException(RaftClientRequest request, NotLeaderException nle, 
      Consumer<RaftClientRequest> handler) { 
    refreshPeers(nle.getPeers()); 
    final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null 
        : nle.getSuggestedLeader().getId(); 
    handleIOException(request, nle, newLeader, handler); 
    return null; 
  }

咱們會看到,在異常的信息中,若是可以提取出一個 suggestedLeader,這時候就會作爲新的leaderId來使用,下次直接鏈接了。

本文轉載自微信公衆號「Tomcat那些事兒」,能夠經過如下二維碼關注。轉載本文請聯繫Tomcat那些事兒公衆號。

揭開 Raft 的神祕面紗,和ApacheRatis 瞭解Raft 組件的使用

【編輯推薦】

  1. 全面的Java開源Apache Commons 工具類介紹
  2. Apache 基金會與 GitHub 都受美國法律約束 開源當自立
  3. 聊聊Apache Dubbo,概念、架構和負載均衡
  4. Paxos算法爲何說是Raft,Zab協議的鼻祖,及原理解析
  5. 使用Python掌握Apache Kafka應當瞭解的3個庫

【責任編輯:武曉燕 TEL:(010)68476606】

相關文章
相關標籤/搜索