分佈式對象存儲 讀書筆記(三) 元數據

上節的代碼沒法區分同一個對象的不一樣版本,爲了記錄對象版本以及其餘一些元數據,本節中會加入一個新組件:元數據服務數據庫

元數據服務就是提供對元數據的存取功能的服務。元數據指的是對象的描述信息,好比對象的名字、版本、大小以及散列值.json

新增長服務後的結構圖以下api

 

須要新安裝一個ElasticSearch搜索引擎。安裝過程可自行百度app

ElasticSearch搜索引擎中,咱們須要定義這樣一個映射。curl

'{"mappings":{"objects":{"properties":{"name":{"type":"string","index":"not_analyzed"},"version":{"type":"integer"},"size":{"type":"integer"},"hash":{"type":"string"}}}}}'ide

ES的索引至關於數據庫而類型至關於數據庫的表,映射則至關於定義表結構。該索引只有一個類型就是objects,包括4個屬性分別是name,version,size和hash函數

接口服務代碼添加了versions訪問處理函數搜索引擎

1 func main() {
2     go heartbeat.ListenHeartbeat()
3     http.HandleFunc("/objects/", objects.Handler)
4     http.HandleFunc("/locate/", locate.Handler)
5     http.HandleFunc("/versions/", versions.Handler)
6     log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
7 }
View Code

version處理函數僅處理Get請求url

調用ES的API進行version查詢最後返回spa

 1 func Handler(w http.ResponseWriter, r *http.Request) {
 2     m := r.Method
 3     if m != http.MethodGet {
 4         w.WriteHeader(http.StatusMethodNotAllowed)
 5         return
 6     }
 7     from := 0
 8     size := 1000
 9     name := strings.Split(r.URL.EscapedPath(), "/")[2]
10     for {
11         metas, e := es.SearchAllVersions(name, from, size)
12         if e != nil {
13             log.Println(e)
14             w.WriteHeader(http.StatusInternalServerError)
15             return
16         }
17         for i := range metas {
18             b, _ := json.Marshal(metas[i])
19             w.Write(b)
20             w.Write([]byte("\n"))
21         }
22         if len(metas) != size {
23             return
24         }
25         from += size
26     }
27 }
View Code

OBJECT處理函數中 添加了DELETE處理

在ES中添加新版本,數據爲0 ,哈希爲"",表示刪除操做

OBJECT處理函數中 PUT操做沒有較大改變 可是添加了在ES中記錄元數據的代碼

OBJECT處理函數中 GET操做會根據version查詢ES 獲取元數據 纔開始獲取對象數據 

 1 func get(w http.ResponseWriter, r *http.Request) {
 2     name := strings.Split(r.URL.EscapedPath(), "/")[2]
 3     versionId := r.URL.Query()["version"]
 4     version := 0
 5     var e error
 6     if len(versionId) != 0 {
 7         version, e = strconv.Atoi(versionId[0])
 8         if e != nil {
 9             log.Println(e)
10             w.WriteHeader(http.StatusBadRequest)
11             return
12         }
13     }
14     meta, e := es.GetMetadata(name, version)
15     if e != nil {
16         log.Println(e)
17         w.WriteHeader(http.StatusInternalServerError)
18         return
19     }
20     if meta.Hash == "" {
21         w.WriteHeader(http.StatusNotFound)
22         return
23     }
24     object := url.PathEscape(meta.Hash)
25     stream, e := getStream(object)
26     if e != nil {
27         log.Println(e)
28         w.WriteHeader(http.StatusNotFound)
29         return
30     }
31     io.Copy(w, stream)
32 }
33 
34 func put(w http.ResponseWriter, r *http.Request) {
35     hash := utils.GetHashFromHeader(r.Header)
36     if hash == "" {
37         log.Println("missing object hash in digest header")
38         w.WriteHeader(http.StatusBadRequest)
39         return
40     }
41 
42     c, e := storeObject(r.Body, url.PathEscape(hash))
43     if e != nil {
44         log.Println(e)
45         w.WriteHeader(c)
46         return
47     }
48     if c != http.StatusOK {
49         w.WriteHeader(c)
50         return
51     }
52 
53     name := strings.Split(r.URL.EscapedPath(), "/")[2]
54     size := utils.GetSizeFromHeader(r.Header)
55     e = es.AddVersion(name, hash, size)
56     if e != nil {
57         log.Println(e)
58         w.WriteHeader(http.StatusInternalServerError)
59     }
60 }
View Code

 

實操驗證

開啓 rabbitmq ES 設置ES映射

開啓 apiserver dataserver

使用curl 上傳一個test3對象  這時候會顯示400 bad request。 由於沒提供test3 對象的散列值 

先計算對象的哈希值的 而後再添加哈希PUT一次

OK!!

相關文章
相關標籤/搜索