分佈式對象存儲 讀書筆記(二) 解耦

上一節編寫了一個使用REST服務進行對象存取得單機程序git

本節接續對其進行擴展,爲了知足加入新的節點就能夠自由擴展服務器集羣的需求,咱們須要將單機版的接口與數據存儲進行解耦.github

讓接口與數據存儲成爲互相獨立的服務節點,二者互相合做提供對象存儲服務。這樣節點就能夠按照須要添加,而且分佈在不一樣的服務器上。api

 

1 總體框架

架構圖服務器

如圖 接口層服務提供對外的REST接口,數據服務層則提供數據的存儲功能.架構

接口服務和數據服務之間有兩種接口:1 REST服務接口 2 消息隊裏,採用RabbitMQ(也能夠採用其餘消息隊列甚至是其餘方式)框架

每種接口有格子的特色和用處 REST適合大數量的數據傳輸 消息隊列則能夠知足1:1 1:n n:1 n:n的消息傳輸ide

 

2 消息設計

心跳消息測試

dataServer每隔5秒 經過消息隊列想全部 apiServer發送心跳包spa

apiServer接收心跳包 而且每隔10秒移除未及時更新心跳包的dataServer設計

 1 dataServer hearbeat
 2 func StartHeartbeat() {
 3     q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
 4     defer q.Close()
 5     for {
 6         q.Publish("apiServers", os.Getenv("LISTEN_ADDRESS"))
 7         time.Sleep(5 * time.Second)
 8     }
 9 }
10 
11 apiServer hearbeat
12 func ListenHeartbeat() {
13     q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
14     defer q.Close()
15     q.Bind("apiServers")
16     c := q.Consume()
17     go removeExpiredDataServer()
18     for msg := range c {
19         dataServer, e := strconv.Unquote(string(msg.Body))
20         if e != nil {
21             panic(e)
22         }
23         mutex.Lock()
24         dataServers[dataServer] = time.Now()
25         mutex.Unlock()
26     }
27 }
28 
29 func removeExpiredDataServer() {
30     for {
31         time.Sleep(5 * time.Second)
32         mutex.Lock()
33         for s, t := range dataServers {
34             if t.Add(10 * time.Second).Before(time.Now()) {
35                 delete(dataServers, s)
36             }
37         }
38         mutex.Unlock()
39     }
40 }
View Code

 

定位消息

apiServer locate
func Locate(name string) string {
    q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
    q.Publish("dataServers", name)
    c := q.Consume()
        //廣播消息  等待1秒內的回覆信息
    go func() {
        time.Sleep(time.Second)
        q.Close()
    }()
    msg := <-c
    s, _ := strconv.Unquote(string(msg.Body))
    return s
}    

dataServer locate
func Locate(name string) bool {
    _, err := os.Stat(name)
    return !os.IsNotExist(err)
}

func StartLocate() {
    q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
    defer q.Close()
    q.Bind("dataServers")
    c := q.Consume()
        //綁定消息隊列 等待詢問定位信息 而後在本地查找 若找到則返回監聽地址
    for msg := range c {
        object, e := strconv.Unquote(string(msg.Body))
        if e != nil {
            panic(e)
        }
        if Locate(os.Getenv("STORAGE_ROOT") + "/objects/" + object) {
            q.Send(msg.ReplyTo, os.Getenv("LISTEN_ADDRESS"))
        }
    }
}

 put get操做與上節相似

代碼地址 https://github.com/stuarthu/go-implement-your-object-storage/tree/master/chapter2

 

3 操做驗證

由於測試機器有限 須要在一臺機器上開啓6個apiServer 2個dataServer 和 開啓RabbitMQ

因此咱們須要在同一臺機器上綁定多個地址

執行命令以下:

安裝配置rabbitmq 這裏就再也不介紹了

而後建立各個apiServer須要的存儲文件夾 開啓6個apiServer 2個dataServer 對應模擬不用的ip地址

 

咱們上傳一個test2對象

 

下面進行測試

咱們上傳一個叫作test2的對象 再locate定位他在哪一個數據服務節點 而後換另外一個數據服務節點查詢 看看是否成功

OK 所有成功

下一節 解決這樣一個問題 若是咱們屢次重複上傳同一個對象 那麼這個對象可能在全部服務器上都有存儲 甚至多是不同的數據(存儲是隨機選擇數據節點服務器)

咱們添加版本控制來解決這個問題 記錄版本等數據的重要信息 也就是元數據

相關文章
相關標籤/搜索