MIT-6.824 Lab 3: Fault-tolerant Key/Value Service

概述

lab2中實現了raft協議,本lab將在raft之上實現一個可容錯的k/v存儲服務,第一部分是實現一個不帶日誌壓縮的版本,第二部分是實現日誌壓縮。時間緣由我只完成了第一部分。git

設計思路

kvserver

如上圖,lab2實現了raft協議,本lab將實現kvserver。每一個raft都關聯一個kvserver,Clerks發送Put(), Append(), Get() RPC給leader服務器中的kvserver,kvserver收到請求後將操做打包成Log Entry提交給raft,而後阻塞等待raft將這個Entry拷貝到其它server,當Log Entry被拷貝到大部分的server後,leader 的raft會通知kvserver(raft往管道中塞comitted Entry,kvserver經過讀這個管道獲取通知),kvserver執行命令,而後響應Clerk。github

Clerk

客戶端經過Clerk發送請求,來看下Clerk代碼:緩存

type Clerk struct {
    servers []*labrpc.ClientEnd
    // You will have to modify this struct.

    lastLeader  int
    cid         int64
    seq         int
}

func (ck *Clerk) Get(key string) string {

    // You will have to modify this function.
    // 參數: 要讀的key, 當前clerk的id,  請求序列號
    getArgs := GetArgs{Key: key, Cid:ck.cid, Seq:ck.seq}
    reply := GetReply{}

    for {
        doneCh := make(chan bool, 1)
        go func() {
           //發送Get() RPC
            ok := ck.servers[ck.lastLeader].Call("KVServer.Get", &getArgs, &reply)
            doneCh <- ok
        }()

        select {
        case <-time.After(600 * time.Millisecond):
            DPrintf("clerk(%d) retry PutAppend after timeout\n", ck.cid)
            continue
        case ok := <- doneCh:
           //收到響應後,而且是leader返回的,那麼說明這個命令已經執行了
            if ok && reply.WrongLeader != WrongLeader {
                //請求序列號加1
              ck.seq++
                return reply.Value
            }
        }

       //換一個server重試
        ck.lastLeader++
        ck.lastLeader %= len(ck.servers)
    }

    return ""
}

這裏只給出了Get()的代碼,Put()和Append()相似,發送KVServer.Get給一個server,若是這個server不是leader,換一個server重試。直到發給真正的leader,而且leader將這個命令拷貝到大部分其它server後,而後成功執行該命令,Clerk.Get()纔會返回。服務器

KVServer

再來看下服務端的代碼,KVServer處理Clerk的RPC請求:app

type KVServer struct {
    mu      sync.Mutex
    me      int
    rf      *raft.Raft
    applyCh chan raft.ApplyMsg

    maxraftstate int // snapshot if log grows this big

    // Your definitions here.
   // 保存鍵值對
    db      map[string]string
    latestReplies map[int64]*LatestReply
    notify map[int]chan struct{}
}

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
    // Your code here.
    if _, isLeader := kv.rf.GetState(); !isLeader {
        reply.WrongLeader = WrongLeader
        reply.Err = ""
        return
    }

    // 防止重複請求
    kv.mu.Lock()
    if latestReply, ok := kv.latestReplies[args.Cid]; ok && args.Seq <= latestReply.Seq {
        reply.WrongLeader = IsLeader
        reply.Value = latestReply.Reply.Value
        reply.Err = latestReply.Reply.Err
        kv.mu.Unlock()
        return
    }
    kv.mu.Unlock()

    command := Op{Operation:"Get", Key:args.Key, Cid:args.Cid, Seq:args.Seq}
    index, term, _ := kv.rf.Start(command)

    // 阻塞等待結果
    kv.mu.Lock()
    ch := make(chan struct{})
    kv.notify[index] = ch
    kv.mu.Unlock()

    select {
    case <-ch:
        curTerm, isLeader := kv.rf.GetState()
        DPrintf("%v got notify at index %v, isLeader = %v\n", kv.me, index, isLeader)
        if !isLeader || curTerm != term {
            reply.WrongLeader = WrongLeader
            reply.Err = ""
        } else {
            reply.WrongLeader = IsLeader
            kv.mu.Lock()
            if value, ok := kv.db[args.Key]; ok {
                reply.Value = value
                reply.Err = OK
            } else {
                reply.Err = ErrNoKey
            }
            kv.mu.Unlock()
        }

    }

}

KVServer.db用於保存鍵值對。
KVServer.Get()首先判斷本身是否是leader,若是不是leader,直接返回,這樣Clerk好重試其它server。若是是leader,先在緩存中找,看這個請求是否已經執行過了。
由於可能出現這麼一種狀況:若是leader commit一個Entry後當即奔潰了,那麼Clerk就收不到響應,那麼Clerk會將這個請求發給新的leader,新的leader收到請求後若是不作任何措施,將會二次commit該Log Entry,對於Put()和Append()請求執行兩次是不正確的,因此須要一個辦法防止一個請求執行兩次。
能夠這麼作:每一個Clerk都分配一個惟一的cid,每一個請求分配一個惟一的序列號seq,每成功一個請求,該序列號加一。服務端記錄每一個客戶端cid最近一次apply的請求的序列號seq和對應的響應結果,根據這個信息可知,當再次收到這個客戶端的序列號小於seq的請求時,說明已經執行過了,直接返回結果。函數

若是以前沒有執行過,那麼調用this

kv.rf.Start(command)

將Log Entry提交給raft,而且阻塞等待raft將這個Entry拷貝到其它大部分server,從阻塞返回後,說明這個Entry已經被拷貝到大部分server了,而且已經執行了命令,這時能夠將結果返回給Clerk了。線程

那麼在哪裏接收raft的消息呢?KVServer在建立的時候會在一個線程中執行以下函數:設計

func (kv *KVServer) applyDaemon()  {
    for appliedEntry := range kv.applyCh {
        command := appliedEntry.Command.(Op)

        // 執行命令, 過濾已經執行過得命令
        kv.mu.Lock()
        if latestReply, ok := kv.latestReplies[command.Cid]; !ok || command.Seq > latestReply.Seq {
            switch command.Operation {
            case "Get":
                latestReply := LatestReply{Seq:command.Seq,}
                reply := GetReply{}
                if value, ok := kv.db[command.Key]; ok {
                    reply.Value = value
                } else {
                    reply.Err = ErrNoKey
                }
                latestReply.Reply = reply
                kv.latestReplies[command.Cid] = &latestReply
            case "Put":
                kv.db[command.Key] = command.Value
                latestReply := LatestReply{Seq:command.Seq}
                kv.latestReplies[command.Cid] = &latestReply
            case "Append":
                kv.db[command.Key] += command.Value
                latestReply := LatestReply{Seq:command.Seq}
                kv.latestReplies[command.Cid] = &latestReply
            default:
                panic("invalid command operation")
            }
        }

        DPrintf("%d applied index:%d, cmd:%v\n", kv.me, appliedEntry.CommandIndex, command)

        // 通知
        if ch, ok := kv.notify[appliedEntry.CommandIndex]; ok && ch != nil {
            DPrintf("%d notify index %d\n",kv.me, appliedEntry.CommandIndex)
            close(ch)
            delete(kv.notify, appliedEntry.CommandIndex)
        }
        kv.mu.Unlock()
    }
}

kv.applyCh這個chanel會在建立raft的時候傳給raft,當某個Log Entry能夠被commit的時候,raft會往這個chanel中塞,只要for循環這個kv.applyCh,就能知道已經被commit的Entry,拿到Entry後,根據其中的命令執行相應的操做,而後通知KVServer.Get()繼續執行。3d

具體代碼在:https://github.com/gatsbyd/mit_6.824_2018 若有錯誤,歡迎指正: 15313676365

相關文章
相關標籤/搜索