SOFAJRaft—初次使用

SOFAJRaft 是基於 Raft 算法的生產級高性能 Java 實現,支持 MULTI-RAFT-GROUP。應用場景有 Leader 選舉、分佈式鎖服務、高可靠的元信息管理、分佈式存儲系統。html

若是不瞭解Raft算法的朋友能夠去看看這篇文章:Raft 爲何是更易理解的分佈式一致性算法,寫的很詳細了。java

這張圖是SOFAJRaft的設計圖,其中Node 表明了一個 SOFAJRaft Server 節點。node

因爲SOFAJRaft的Node節點是一個分佈式的結構,因此Node節點須要將信息傳遞給其餘Node,因此Replicator的做用就是用來複制信息給其餘的Node。多個Replicator共同組成一個ReplicatorGroup。git

Snapshot是表示一個快照,就是對數據當前值的一個記錄,會存盤保存,提供冷備數據功能。 Leader 生成快照有這麼幾個做用:github

  • 當有新的 Node 加入集羣的時候,不用只靠日誌複製、回放去和 Leader 保持數據一致,而是經過安裝 Leader 的快照來跳過早期大量日誌的回放;
  • Leader 用快照替代 Log 複製能夠減小網絡上的數據量;
  • 用快照替代早期的 Log 能夠節省存儲空間;

StateMachine 接口是用來給用戶去實現的部分。經過用戶實現具體的業務邏輯從而在分佈式系統中達成共識。 在 StateMachine 上,咱們要去實現狀態機暴露給咱們待實現的幾個接口,最重要的是 onApply 接口,要在這個接口裏將 Cilent 的請求指令進行運算,轉換成具體的計數器值。而 onSnapshotSave 和 onSnapshotLoad 接口則是負責快照的生成和加載。算法

Client也是須要用戶去實現的部分,用戶須要去定義不一樣的消息類型和客戶端的處理邏輯。bash

實現Counter分佈式計數器

下面咱們給出個需求: 提供一個 Counter,Client 每次計數時能夠指定步幅,也能夠隨時發起查詢。 將它翻譯成具體的功能點,主要有三部分:網絡

  1. 實現:Counter server,具有計數功能,具體運算公式爲:Cn = Cn-1 + delta;
  2. 提供寫服務,寫入 delta 觸發計數器運算;
  3. 提供讀服務,讀取當前 Cn 值;

具體代碼:Counterapp

在這個demo中,咱們啓動三個server做爲一個group,傳入下面的參數:框架

/tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
/tmp/server2 counter 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
/tmp/server3 counter 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
複製代碼

表示使用/tmp/server1 ,/tmp/server2,/tmp/server3三個目錄用來存儲數據,raft group名稱爲 counter,節點ip也分別爲

127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
複製代碼

而後啓動客戶端,並傳入下面參數:

counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
複製代碼

表示綁定的raft group名稱爲 counter,集羣爲:

127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
複製代碼

服務端

CounterServer

public CounterServer(final String dataPath, final String groupId, final PeerId serverId,
                     final NodeOptions nodeOptions) throws IOException {
    // 初始化路徑
    FileUtils.forceMkdir(new File(dataPath));

    // 這裏讓 raft RPC 和業務 RPC 使用同一個 RPC server, 一般也能夠分開
    final RpcServer rpcServer = new RpcServer(serverId.getPort());
    RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
    // 註冊業務處理器
    rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));
    rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));
    // 初始化狀態機
    this.fsm = new CounterStateMachine();
    // 設置狀態機到啓動參數
    nodeOptions.setFsm(this.fsm);
    // 設置存儲路徑
    // 日誌, 必須
    nodeOptions.setLogUri(dataPath + File.separator + "log");
    // 元信息, 必須
    nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
    // snapshot, 可選, 通常都推薦
    nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
    // 初始化 raft group 服務框架
    this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
    // 啓動
    this.node = this.raftGroupService.start();
}
複製代碼

服務端CounterServer在實例化的時候會設置相應的處理器,這裏設置了GetValueRequestProcessor和 IncrementAndGetRequestProcessor。 GetValueRequestProcessor用來提供讀服務,讀取當前 Cn 值; IncrementAndGetRequestProcessor提供寫服務,寫入 delta 觸發計數器運算; GetValueRequestProcessor

@Override
public Object handleRequest(final BizContext bizCtx, final GetValueRequest request) throws Exception {
    if (!this.counterServer.getFsm().isLeader()) {
        return this.counterServer.redirect();
    }

    final ValueResponse response = new ValueResponse();
    response.setSuccess(true);
    response.setValue(this.counterServer.getFsm().getValue());
    return response;
}
複製代碼

GetValueRequestProcessor的處理很是的簡單,直接獲取狀態機的值而後返回。

IncrementAndGetRequestProcessor

public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx, final IncrementAndGetRequest request) {
    //判斷當前節點是不是leader
    if (!this.counterServer.getFsm().isLeader()) {
        asyncCtx.sendResponse(this.counterServer.redirect());
        return;
    }
    //設置響應數據
    final ValueResponse response = new ValueResponse();
    //封裝請求數據,並回調響應結果
    final IncrementAndAddClosure closure = new IncrementAndAddClosure(counterServer, request, response,
            status -> {
                //響應成功
                if (!status.isOk()) {
                    response.setErrorMsg(status.getErrorMsg());
                    response.setSuccess(false);
                }
                //發送響應請求
                asyncCtx.sendResponse(response);
            });

    try {
        final Task task = new Task();
        task.setDone(closure);
        //序列化請求
        task.setData(ByteBuffer
                .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(request)));
        //調用node處理請求
        // apply task to raft group.
        counterServer.getNode().apply(task);
    } catch (final CodecException e) {
        LOG.error("Fail to encode IncrementAndGetRequest", e);
        //請求失敗,則當即響應
        response.setSuccess(false);
        response.setErrorMsg(e.getMessage());
        asyncCtx.sendResponse(response);
    }
}
複製代碼

這裏使用IncrementAndAddClosure來封裝響應和請求,並經過回調的方式進行異步回寫數據到client。而後實例化Task實例,序列化請求數據,調用node的apply方法。

而後設置了CounterStateMachine狀態機,並設值了日誌,元信息和快照的存儲路徑。 CounterStateMachine實現了StateMachineAdapter抽象類,並重寫了3個方法: onApply用來處理具體的業務 onSnapshotSave保存快照 onSnapshotLoad加載快照 在保存和加載快照的地方使用了CounterSnapshotFile類來進行輔助。

CounterStateMachine

public class CounterStateMachine extends StateMachineAdapter {
	...
	private final AtomicLong    value      = new AtomicLong(0);
	
	public void onApply(final Iterator iter) {
	    //獲取processor中封裝的數據
	    while (iter.hasNext()) {
	        long delta = 0;
	
	        //用於封裝請求數據和回調結果
	        IncrementAndAddClosure closure = null;
	        if (iter.done() != null) {
	            // This task is applied by this node, get value from closure to avoid additional parsing.
	            closure = (IncrementAndAddClosure) iter.done();
	            delta = closure.getRequest().getDelta();
	        } else {
	            // Have to parse FetchAddRequest from this user log.
	            final ByteBuffer data = iter.getData();
	            try {
	                final IncrementAndGetRequest request = SerializerManager.getSerializer(SerializerManager.Hessian2)
	                        .deserialize(data.array(), IncrementAndGetRequest.class.getName());
	                delta = request.getDelta();
	            } catch (final CodecException e) {
	                LOG.error("Fail to decode IncrementAndGetRequest", e);
	            }
	        }
	        //獲取當前值
	        final long prev = this.value.get();
	        //將當前值加上delta
	        final long updated = value.addAndGet(delta);
	        //設置響應,並調用run方法回寫響應方法
	        if (closure != null) {
	            closure.getResponse().setValue(updated);
	            closure.getResponse().setSuccess(true);
	            closure.run(Status.OK());
	        }
	        LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
	        iter.next();
	    }
	}
}
複製代碼

這裏的onApply方法首先會獲取processor中封裝的數據,而後獲取processor中傳入的closure實例,而後處理好業務邏輯後調用closure的run進行回調返回數據到客戶端。

客戶端

CounterClient

public static void main(final String[] args) throws Exception {
    if (args.length != 2) {
        System.out.println("Useage : java com.alipay.sofa.jraft.example.counter.CounterClient {groupId} {conf}");
        System.out
            .println("Example: java com.alipay.sofa.jraft.example.counter.CounterClient counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
        System.exit(1);
    }
    final String groupId = args[0];
    final String confStr = args[1];

    final Configuration conf = new Configuration();
    if (!conf.parse(confStr)) {
        throw new IllegalArgumentException("Fail to parse conf:" + confStr);
    }
    // 更新raft group配置
    RouteTable.getInstance().updateConfiguration(groupId, conf);
    //接下來初始化 RPC 客戶端並更新路由表
    final BoltCliClientService cliClientService = new BoltCliClientService();
    cliClientService.init(new CliOptions());

    if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
        throw new IllegalStateException("Refresh leader failed");
    }
    //獲取 leader 後發送請求
    final PeerId leader = RouteTable.getInstance().selectLeader(groupId);
    System.out.println("Leader is " + leader);
    final int n = 1000;
    final CountDownLatch latch = new CountDownLatch(n);
    final long start = System.currentTimeMillis();
    for (int i = 0; i < n; i++) {
        incrementAndGet(cliClientService, leader, i, latch);
    }
    latch.await();
    System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms.");
    System.exit(0);
}
複製代碼

客戶端先是根據groupId和IP綁定server,而後更新路由表,獲取leader

private static void incrementAndGet(final BoltCliClientService cliClientService, final PeerId leader, final long delta, CountDownLatch latch) throws RemotingException, InterruptedException {
    final IncrementAndGetRequest request = new IncrementAndGetRequest();
    request.setDelta(delta);
    cliClientService.getRpcClient().invokeWithCallback(leader.getEndpoint().toString(), request,
        new InvokeCallback() {

            @Override
            public void onResponse(Object result) {
                latch.countDown();
                System.out.println("incrementAndGet result:" + result);
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                latch.countDown();

            }

            @Override
            public Executor getExecutor() {
                return null;
            }
        }, 5000);
}
複製代碼

而後調用incrementAndGet方法。incrementAndGet方法中使用cliClientService獲取client而後傳入request請求並設值回調函數。

整體流程

這裏總結一下整個server和client的調用流程

首先是CounterClient綁定server後,獲取server的leader節點,而後發送一個IncrementAndGetRequest的request請求到server。

Server接收到請求後根據請求的類型交給IncrementAndGetRequestProcessor處理,並調用handleRequest方法。

而後handleRequest會將數據封裝調用狀態機的onApply方法,處理業務數據後調用closure進行回調。

closure回調後會封裝一個ValueResponse發送響應請求給客戶端。

客戶端會回調onResponse方法。

到這裏整個counter的例子就講解完畢了

相關文章
相關標籤/搜索