groupcache源碼解讀

Simple usage

gropucache的官方網站是 https://github.com/golang/groupcachegit

consistenthash模塊

一致性hash算法,一般是用在查找一個合適的下載節點時,使負載更平均,可是對於一樣的請求始終返回同樣的結果。github

type Map struct {
    hash     Hash
    replicas int
    keys     []int // Sorted
    hashMap  map[int]string
}

Map結構中replicas的含義是增長虛擬桶,使數據分佈更加均勻。

// 建立Map結構
func New(replicas int, fn Hash) *Map
// 添加新的Key
func (m *Map) Add(keys ...string)
// 根據hash(key)獲取value
func (m *Map) Get(key string) string
// 判斷Map是否爲空
func (m *Map) IsEmpty() bool
// 居然沒有提供Remove方法 O_O

用法簡單例子golang

package main

import (
	"fmt"

	"github.com/golang/groupcache/consistenthash"
)

func main() {
	c := consistenthash.New(70, nil)
	c.Add("A", "B", "C", "D", "E")
	for _, key := range []string{"what", "nice", "what", "nice", "good", "yes!"} {
		fmt.Printf("%s -> %s\n", key, c.Get(key))
	}
}

// Expect output
// -------------
// what -> C
// nice -> A
// what -> C
// nice -> A
// good -> D
// yes! -> E

singleflight 模塊

type Group struct {
}

// 當多個相同的key請求的時候,函數只被調用一次
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error)

使用例子算法

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/golang/groupcache/singleflight"
)

func NewDelayReturn(dur time.Duration, n int) func() (interface{}, error) {
	return func() (interface{}, error) {
		time.Sleep(dur)
		return n, nil
	}
}

func main() {
	g := singleflight.Group{}
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		ret, err := g.Do("key", NewDelayReturn(time.Second*1, 1))
		if err != nil {
			panic(err)
		}
		fmt.Printf("key-1 get %v\n", ret)
		wg.Done()
	}()
	go func() {
		time.Sleep(100 * time.Millisecond) // make sure this is call is later
		ret, err := g.Do("key", NewDelayReturn(time.Second*2, 2))
		if err != nil {
			panic(err)
		}
		fmt.Printf("key-2 get %v\n", ret)
		wg.Done()
	}()
	wg.Wait()
}

執行結果(耗時: 1.019s)服務器

key-2 get 1
key-1 get 1

lru 模塊

最初是用在內存管理上的一個算法,根據歷史的請求數,分析出最熱門的數據,並保存下來。app

type Cache struct {
    // MaxEntries is the maximum number of cache entries before
    // an item is evicted. Zero means no limit.
    MaxEntries int

    // OnEvicted optionally specificies a callback function to be
    // executed when an entry is purged from the cache.
    OnEvicted func(key Key, value interface{})
    // contains filtered or unexported fields
}

func New(maxEntries int) *Cache
func (c *Cache) Add(key Key, value interface{})
func (c *Cache) Get(key Key) (value interface{}, ok bool)
func (c *Cache) Len() int
func (c *Cache) Remove(key Key)
func (c *Cache) RemoveOldest()

用法舉例ide

package main

import (
        "fmt"

        "github.com/golang/groupcache/lru"
)

func main() {
        cache := lru.New(2)
        cache.Add("x", "x0")
        cache.Add("y", "y0")
        yval, ok := cache.Get("y")
        if ok {
                fmt.Printf("y is %v\n", yval)
        }
        cache.Add("z", "z0")

        fmt.Printf("cache length is %d\n", cache.Len())
        _, ok = cache.Get("x")
        if !ok {
                fmt.Printf("x key was weeded out\n")
        }
}
// Expect output
//--------------
// y is y0
// cache length is 2
// x key was weeded out

HTTPPool 模塊

type HTTPPool struct {
    // 可選,爲每次的請求封裝的Context參數
    Context func(*http.Request) groupcache.Context

    // 可選,不懂這個幹啥的
    // 註釋說:請求的時候用的就是這個
    Transport func(groupcache.Context) http.RoundTripper
}

// self 必須是一個合法的URL指向當前的服務器,好比 "http://10.0.0.1:8000"
// 這個函數默會註冊一個路由
// http.handle("/_groupcache/", poolInstance) 
// 該路由主要用戶節點間獲取數據的功能
// 另外該函數不能重複調用,不然會panic
func NewHTTPPool(self string) *HTTPPool

// 更新節點列表
// 用了consistenthash
// 奇怪的時候,只有節點添加的函數,並無刪除的
func (p *HTTPPool) Set(peers ...string)

// 用一致性hash算法選擇一個節點
func (p *HTTPPool) PickPeer(key string) (groupcache.ProtoGetter, bool)

// 用於處理經過HTTP傳遞過來的grpc請求
func (p *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request)

其餘 (待補充)

原本想在官網上找個例子,能夠函數

package main

// A SizeReaderAt is a ReaderAt with a Size method.
//
// An io.SectionReader implements SizeReaderAt.
type SizeReaderAt interface {
    Size() int64
    io.ReaderAt
}

// NewMultiReaderAt is like io.MultiReader but produces a ReaderAt
// (and Size), instead of just a reader.
func NewMultiReaderAt(parts ...SizeReaderAt) SizeReaderAt {
    m := &multi{
        parts: make([]offsetAndSource, 0, len(parts)),
    }
    var off int64
    for _, p := range parts {
        m.parts = append(m.parts, offsetAndSource{off, p})
        off += p.Size()
    }
    m.size = off
    return m
}

// NewChunkAlignedReaderAt returns a ReaderAt wrapper that is backed
// by a ReaderAt r of size totalSize where the wrapper guarantees that
// all ReadAt calls are aligned to chunkSize boundaries and of size
// chunkSize (except for the final chunk, which may be shorter).
//
// A chunk-aligned reader is good for caching, letting upper layers have
// any access pattern, but guarantees that the wrapped ReaderAt sees
// only nicely-cacheable access patterns & sizes.
func NewChunkAlignedReaderAt(r SizeReaderAt, chunkSize int) SizeReaderAt {
    // ...
}

func part(s string) SizeReaderAt {
    return io.NewSectionReader(strings.NewReader(s), 0, int64(len(s)))
}

func handler(w http.ResponseWriter, r *http.Request) {
    sra := NewMultiReaderAt(
        part("Hello, "), part(" world! "),
        part("You requested "+r.URL.Path+"\n"),
    )
    rs := io.NewSectionReader(sra, 0, sra.Size())
    http.ServeContent(w, r, "foo.txt", modTime, rs)
}


func main(){
    me := "http://10.0.0.1"
    peers := groupcache.NewHTTPPool(me)

    // Whenever peers change:
    peers.Set("http://10.0.0.1", "http://10.0.0.2", "http://10.0.0.3")
        
    var thumbNails = groupcache.NewGroup("thumbnail", 64<<20, groupcache.GetterFunc(
        func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
            fileName := key
            dest.SetBytes(generateThumbnail(fileName))
            return nil
        }))

    var data []byte
    err := thumbNails.Get(ctx, "big-file.jpg",
        groupcache.AllocatingByteSliceSink(&data))
    // ...
    http.ServeContent(w, r, "big-file-thumb.jpg", modTime, bytes.NewReader(data))


            
}

groupcache 使用例子

package main

import (
        "errors"
        "flag"
        "log"
        "net/http"
        "strconv"
        "strings"

        "github.com/golang/groupcache"
)

var localStorage map[string]string

func init() {
        localStorage = make(map[string]string)
        localStorage["hello"] = "world"
        localStorage["info"] = "This is an example"
}

func main() {
        port := flag.Int("port", 4100, "Listen port")
        flag.Parse()

        // Name have to starts with http://
        self := "http://localhost:" + strconv.Itoa(*port)
        pool := groupcache.NewHTTPPool(self)
        pool.Set(self, "http://localhost:4101")

        var helloworld = groupcache.NewGroup("helloworld", 10<<20, groupcache.GetterFunc(
                func(ctx groupcache.Context, key string, dest groupcache.Sink) error {
                        log.Printf("groupcache get key: %v", key)
                        value, exists := localStorage[key]
                        if !exists {
                                dest.SetString(key + " NotExist")
                                return errors.New(key + " NotExist")
                        } else {
                                dest.SetString(value)
                                return nil
                        }
                }))

        http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
                key := strings.TrimPrefix(r.RequestURI, "/")
                log.Printf("Request(%v) key(%v)", r.RemoteAddr, key)
                if key == "" {
                        http.Error(w, "Bad Request", http.StatusBadRequest)
                        return
                }
                var data []byte
                err := helloworld.Get(nil, key, groupcache.AllocatingByteSliceSink(&data))
                if err != nil {
                        http.Error(w, err.Error(), http.StatusBadRequest)
                        return
                }
                log.Printf("cache data: %v", data)
                w.Write(data)
                log.Println("Gets: ", helloworld.Stats.Gets.String())
                log.Println("CacheHits: ", helloworld.Stats.CacheHits.String())
                log.Println("Loads: ", helloworld.Stats.Loads.String())
                log.Println("LocalLoads: ", helloworld.Stats.LocalLoads.String())
                log.Println("PeerErrors: ", helloworld.Stats.PeerErrors.String())
                log.Println("PeerLoads: ", helloworld.Stats.PeerLoads.String())
        })
        http.ListenAndServe(":"+strconv.Itoa(*port), nil)
}

參考文章

  1. http://talks.golang.org/2013/oscon-dl.slide#1
  2. http://xuchongfeng.github.io/2016/02/21/GroupCache%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB/
相關文章
相關標籤/搜索