上一節編寫了一個使用REST服務進行對象存取得單機程序git
本節接續對其進行擴展,爲了知足加入新的節點就能夠自由擴展服務器集羣的需求,咱們須要將單機版的接口與數據存儲進行解耦.github
讓接口與數據存儲成爲互相獨立的服務節點,二者互相合做提供對象存儲服務。這樣節點就能夠按照須要添加,而且分佈在不一樣的服務器上。api
架構圖服務器
如圖 接口層服務提供對外的REST接口,數據服務層則提供數據的存儲功能.架構
接口服務和數據服務之間有兩種接口:1 REST服務接口 2 消息隊裏,採用RabbitMQ(也能夠採用其餘消息隊列甚至是其餘方式)框架
每種接口有格子的特色和用處 REST適合大數量的數據傳輸 消息隊列則能夠知足1:1 1:n n:1 n:n的消息傳輸ide
心跳消息測試
dataServer每隔5秒 經過消息隊列想全部 apiServer發送心跳包spa
apiServer接收心跳包 而且每隔10秒移除未及時更新心跳包的dataServer設計
1 dataServer hearbeat 2 func StartHeartbeat() { 3 q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) 4 defer q.Close() 5 for { 6 q.Publish("apiServers", os.Getenv("LISTEN_ADDRESS")) 7 time.Sleep(5 * time.Second) 8 } 9 } 10 11 apiServer hearbeat 12 func ListenHeartbeat() { 13 q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) 14 defer q.Close() 15 q.Bind("apiServers") 16 c := q.Consume() 17 go removeExpiredDataServer() 18 for msg := range c { 19 dataServer, e := strconv.Unquote(string(msg.Body)) 20 if e != nil { 21 panic(e) 22 } 23 mutex.Lock() 24 dataServers[dataServer] = time.Now() 25 mutex.Unlock() 26 } 27 } 28 29 func removeExpiredDataServer() { 30 for { 31 time.Sleep(5 * time.Second) 32 mutex.Lock() 33 for s, t := range dataServers { 34 if t.Add(10 * time.Second).Before(time.Now()) { 35 delete(dataServers, s) 36 } 37 } 38 mutex.Unlock() 39 } 40 }
定位消息
apiServer locate func Locate(name string) string { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) q.Publish("dataServers", name) c := q.Consume() //廣播消息 等待1秒內的回覆信息 go func() { time.Sleep(time.Second) q.Close() }() msg := <-c s, _ := strconv.Unquote(string(msg.Body)) return s } dataServer locate func Locate(name string) bool { _, err := os.Stat(name) return !os.IsNotExist(err) } func StartLocate() { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) defer q.Close() q.Bind("dataServers") c := q.Consume() //綁定消息隊列 等待詢問定位信息 而後在本地查找 若找到則返回監聽地址 for msg := range c { object, e := strconv.Unquote(string(msg.Body)) if e != nil { panic(e) } if Locate(os.Getenv("STORAGE_ROOT") + "/objects/" + object) { q.Send(msg.ReplyTo, os.Getenv("LISTEN_ADDRESS")) } } }
put get操做與上節相似
代碼地址 https://github.com/stuarthu/go-implement-your-object-storage/tree/master/chapter2
由於測試機器有限 須要在一臺機器上開啓6個apiServer 2個dataServer 和 開啓RabbitMQ
因此咱們須要在同一臺機器上綁定多個地址
執行命令以下:
安裝配置rabbitmq 這裏就再也不介紹了
而後建立各個apiServer須要的存儲文件夾 開啓6個apiServer 2個dataServer 對應模擬不用的ip地址
咱們上傳一個test2對象
下面進行測試
咱們上傳一個叫作test2的對象 再locate定位他在哪一個數據服務節點 而後換另外一個數據服務節點查詢 看看是否成功
OK 所有成功
下一節 解決這樣一個問題 若是咱們屢次重複上傳同一個對象 那麼這個對象可能在全部服務器上都有存儲 甚至多是不同的數據(存儲是隨機選擇數據節點服務器)
咱們添加版本控制來解決這個問題 記錄版本等數據的重要信息 也就是元數據