一般咱們更新應用程序的配置文件,都須要手動重啓程序或手動從新加載配置。假設一組服務部署在10臺機器上,你須要藉助批量運維工具執行重啓命令,並且10臺同時重啓可能還會形成服務短暫不可用。要是更新配置後,服務自動刷新配置多好...今天咱們就用go實現配置文件熱加載的小功能,之後更新配置不再用手動重啓了...git
一般應用程序啓動的流程:加載配置,而後run()。咱們怎麼作到熱加載呢?咱們的思路是這樣的:github
【1】在加載配置文件以後,啓動一個線程app
【2】該線程定時監聽這個配置文件是否有改動運維
【3】若是配置文件有變更,就從新加載一下函數
【4】從新加載以後通知須要使用這些配置的應用程序(進程或線程),實際上就是刷新內存中配置工具
首先咱們要實現加載配置功能。假設配置文件是k=v格式的,以下:測試
那咱們得寫一個解析配置的包了...讓咱們一塊兒面向對象:atom
type Config struct{ filename string data map[string]string lastModifyTime int64 rwLock sync.RWMutex notifyList []Notifyer }
filename string 配置文件名稱線程
data map[string]string 將配置文件中的k/v解析存放到map中3d
lastModifyTime int64 記錄配置文件上一次更改時間
rwLock sync.RWMutex 讀寫鎖,處理這樣一種競爭狀況:更新這個結構體時其餘線程正在讀取改結構體中的內容,後續用到的時候會講
notifyList []Notifyer 存放全部觀察者,此處咱們用到了觀察者模式,也就是須要用到這個配置的對象,咱們就把它加到這個切片。當配置更新以後,通知切片中的對象配置更新了。
接下來咱們能夠給這個結構體添加一些方法了:
func NewConfig(file string)(conf *Config, err error){ conf = &Config{ filename: file, data: make(map[string]string, 1024), } m, err := conf.parse() if err != nil { fmt.Printf("parse conf error:%v\n", err) return } // 將解析配置文件後的數據更新到結構體的map中,寫鎖 conf.rwLock.Lock() conf.data = m conf.rwLock.Unlock() // 啓一個後臺線程去檢測配置文件是否更改 go conf.reload() return }
構造函數作了三件事:【1】初始化Config 【2】調用parse()函數,解析配置文件,並把解析後的map更新到Config 【3】啓動一個線程,準確說是啓動一個goroutine,即reload()
注意此處更新map時加了寫鎖了,目的在於不影響擁有讀鎖的線程讀取數據。
解析函數比較簡單,主要是讀取配置文件,一行行解析,數據存放在map中。
func (c *Config) parse() (m map[string]string, err error) { // 若是在parse()中定義一個map,這樣就是一個新的map不用加鎖 m = make(map[string]string, 1024) f, err := os.Open(c.filename) if err != nil { return } defer f.Close() reader := bufio.NewReader(f) // 聲明一個變量存放讀取行數 var lineNo int for { line, errRet := reader.ReadString('\n') if errRet == io.EOF { // 這裏有一個坑,最後一行若是不是\n結尾會漏讀 lineParse(&lineNo, &line, &m) break } if errRet != nil { err = errRet return } lineParse(&lineNo, &line, &m) } return } func lineParse(lineNo *int, line *string, m *map[string]string) { *lineNo++ l := strings.TrimSpace(*line) // 若是空行 或者 是註釋 跳過 if len(l) == 0 || l[0] =='\n' || l[0]=='#' || l[0]==';' { return } itemSlice := strings.Split(l, "=") // = if len(itemSlice) == 0 { fmt.Printf("invalid config, line:%d", lineNo) return } key := strings.TrimSpace(itemSlice[0]) if len(key) == 0 { fmt.Printf("invalid config, line:%d", lineNo) return } if len(key) == 1 { (*m)[key] = "" return } value := strings.TrimSpace(itemSlice[1]) (*m)[key] = value return }
這裏我寫了兩個函數。lineParse()是解析每一行配置的。parse()就是解析的主函數,在parse()中我調用了兩次lineParse()。緣由是在使用bufio按行讀取配置文件的時候,有時候會出現這樣的狀況:有的人在寫配置文件的時候,最後一行沒有換行,也就是沒有‘\n’,而後咱們就直接讀到io.EOF了,這時候若是直接break就會致使最後一行丟失。因此這種狀況下咱們應該在break以前調用lineParse()把最後一行處理了。
上面咱們已經實現了讀取配置文件,並放到一個Config示例中,咱們須要爲這個Config封裝一些接口,方便用戶經過接口訪問Config的內容。這步比較簡單:
func (c *Config) GetInt(key string)(value int, err error){ c.rwLock.RLock() defer c.rwLock.RUnlock() str, ok := c.data[key] if !ok { err = fmt.Errorf("key [%s] not found", key) } value, err = strconv.Atoi(str) return } func (c *Config) GetIntDefault(key string, defaultInt int)(value int){ c.rwLock.RLock() defer c.rwLock.RUnlock() str, ok := c.data[key] if !ok { value = defaultInt return } value, err := strconv.Atoi(str) if err != nil { value = defaultInt } return } func (c *Config) GetString(key string)(value string, err error){ c.rwLock.RLock() defer c.rwLock.RUnlock() value, ok := c.data[key] if !ok { err = fmt.Errorf("key [%s] not found", key) } return } func (c *Config) GetIStringDefault(key string, defaultStr string)(value string){ c.rwLock.RLock() defer c.rwLock.RUnlock() value, ok := c.data[key] if !ok { value = defaultStr return } return }
如上,一共封裝了4個接口:
GetInt(key string)(value int, err error) 經過key獲取value,並將value轉成int類型
GetIntDefault(key string, defaultInt int)(value int) 經過key獲取value,並將value轉成int類型;若是獲取失敗,使用默認值
GetString(key string)(value string, err error) 經過key獲取value,默認value爲string類型
GetIStringDefault(key string, defaultStr string)(value string) 經過key獲取value,默認value爲string類型;若是獲取失敗,使用默認值
注意:四個接口都用了讀鎖
上面咱們已經實現瞭解析,加載配置文件,併爲Config封裝了比較友好的接口。接下來,咱們能夠仔細看一下咱們以前啓動的goroutine了,即reload()方法。
func (c *Config) reload(){ // 定時器 ticker := time.NewTicker(time.Second * 5) for _ = range ticker.C { // 打開文件 // 爲何使用匿名函數? 當匿名函數退出時可用defer去關閉文件 // 若是不用匿名函數,在循環中很差關閉文件,一不當心就內存泄露 func (){ f, err := os.Open(c.filename) if err != nil { fmt.Printf("open file error:%s\n", err) return } defer f.Close() fileInfo, err := f.Stat() if err != nil { fmt.Printf("stat file error:%s\n", err) return } // 或取當前文件修改時間 curModifyTime := fileInfo.ModTime().Unix() if curModifyTime > c.lastModifyTime { // 從新解析時,要考慮應用程序正在讀取這個配置所以應該加鎖 m, err := c.parse() if err != nil { fmt.Printf("parse config error:%v\n", err) return } c.rwLock.Lock() c.data = m c.rwLock.Unlock() c.lastModifyTime = curModifyTime // 配置更新通知全部觀察者 for _, n := range c.notifyList { n.Callback(c) } } }() } }
reload()函數中作了這幾件事:
【1】用time.NewTicker每隔5秒去檢查一下配置文件
【2】若是配置文件的修改時間比上一次修改時間大,咱們認爲配置文件更新了。那麼咱們調用parse()解析配置文件,並更新conf實例中的數據。而且更新配置文件的修改時間。
【3】通知全部觀察者,即通知全部使用配置文件的程序、進程或實例,配置更新了。
咱們反覆提到觀察者,反覆提到通知全部觀察者配置文件更新。那麼咱們就要實現這個觀察者:
type Notifyer interface { Callback(*Config) }
定義這樣一個Notifyer接口,只要實現了Callback方法的對象,就都實現了這個Notifyer接口。實現了這個接口的對象,若是都須要被通知配置文件更新,那這些對象均可以加入到前面定義的notifyList []Notifyer這個切片中,等待被通知配置文件更新。
好了,此處咱們是否少寫了添加觀察者的方法呢??是的,立刻寫:
// 添加觀察者 func (c *Config) AddObserver(n Notifyer) { c.notifyList = append(c.notifyList, n) }
通過上面一番折騰,我們的熱加載就快實現了,咱們來測一測:
一般咱們在應用程序中怎麼使用配置文件?【1】加載配置文件,加載以後數據放在一個全局結構體中 【2】run()
也就是run()中咱們要使用全局的結構體,可是這個全局結構體會由於配置文件的更改被更新。此時又存在須要加鎖的狀況了。我擦,是否是很麻煩。。不用擔憂,咱們用一個原子操做搞定。
假設咱們的配置文件中存放的是hostname/port/kafkaAddr/kafkaPort這幾個字段。。
type AppConfig struct { hostname string port int kafkaAddr string kafkaPort int }
接下來咱們要用原子操做保證數據一致性了:
// reload()協程寫 和 for循環的讀,都是對Appconfig對象,所以有讀寫衝突 type AppConfigMgr struct { config atomic.Value } // 初始化結構體 var appConfigMgr = &AppConfigMgr{}
atomic.Value能保證存放數據和讀取出數據不會有衝突。因此當咱們更新數據時存放到atomic.Value中,咱們使用數據從atomic.Value加載出來,這樣不用加鎖就能保證數據的一致性了。完美~~
咱們須要AppConfigMgr實現Callback方法,即實現Notifyer接口,這樣才能被通知配置更新:
func (a *AppConfigMgr)Callback(conf *reconf.Config) { appConfig := &AppConfig{} hostname, err := conf.GetString("hostname") if err != nil { fmt.Printf("get hostname err: %v\n", err) return } appConfig.hostname = hostname kafkaPort, err := conf.GetInt("kafkaPort") if err != nil { fmt.Printf("get kafkaPort err: %v\n", err) return } appConfig.kafkaPort = kafkaPort appConfigMgr.config.Store(appConfig) }
這個Callback實現功能是:當被通知配置更新時,立刻讀取更新的數據並存放到config atomic.Value 中。
好了,咱們要寫主函數了。
func initConfig(file string) { // [1] 打開配置文件 conf, err := reconf.NewConfig(file) if err != nil { fmt.Printf("read config file err: %v\n", err) return } // 添加觀察者 conf.AddObserver(appConfigMgr) // [2]第一次讀取配置文件 var appConfig AppConfig appConfig.hostname, err = conf.GetString("hostname") if err != nil { fmt.Printf("get hostname err: %v\n", err) return } fmt.Println("Hostname:", appConfig.hostname) appConfig.kafkaPort, err = conf.GetInt("kafkaPort") if err != nil { fmt.Printf("get kafkaPort err: %v\n", err) return } fmt.Println("kafkaPort:", appConfig.kafkaPort) // [3] 把讀取到的配置文件數據存儲到atomic.Value appConfigMgr.config.Store(&appConfig) fmt.Println("first load sucess.") } func run(){ for { appConfig := appConfigMgr.config.Load().(*AppConfig) fmt.Println("Hostname:", appConfig.hostname) fmt.Println("kafkaPort:", appConfig.kafkaPort) fmt.Printf("%v\n", "--------------------") time.Sleep(5 * time.Second) } } func main() { confFile := "../parseConfig/test.cfg" initConfig(confFile) // 應用程序 不少配置已經不是存在文件中而是etcd run() }
主函數中調用了initConfig()和run()。
initConfig()中:reconf.NewConfig(file)的時候咱們已經第一次解析配置,並啓動線程不斷更新配置了。此外initConfig()還作了一些事,就是經過Config提供的接口,將配置文件中的數據讀取到appConfig 中,而後再將appConfig 存儲到 atomic.Value中。
run()就是模擬應用程序在運行過程當中使用配置的過程:run()中獲取配置信息就是從 atomic.Value加載出來,這樣保證數據一致性。
編譯運行,而後不斷更改配置文件中kafkaAddr,測試結果以下:
這樣配置文集熱加載就實現了。
附一下全部代碼:
reconf/reconf.go:
package reconf // 實現一個解析配置文件的包 import ( "time" "io" "fmt" "os" "bufio" "strings" "strconv" "sync" ) type Config struct{ filename string data map[string]string lastModifyTime int64 rwLock sync.RWMutex notifyList []Notifyer } func NewConfig(file string)(conf *Config, err error){ conf = &Config{ filename: file, data: make(map[string]string, 1024), } m, err := conf.parse() if err != nil { fmt.Printf("parse conf error:%v\n", err) return } // 將解析配置文件後的數據更新到結構體的map中,寫鎖 conf.rwLock.Lock() conf.data = m conf.rwLock.Unlock() // 啓一個後臺線程去檢測配置文件是否更改 go conf.reload() return } // 添加觀察者 func (c *Config) AddObserver(n Notifyer) { c.notifyList = append(c.notifyList, n) } func (c *Config) reload(){ // 定時器 ticker := time.NewTicker(time.Second * 5) for _ = range ticker.C { // 打開文件 // 爲何使用匿名函數? 當匿名函數退出時可用defer去關閉文件 // 若是不用匿名函數,在循環中很差關閉文件,一不當心就內存泄露 func (){ f, err := os.Open(c.filename) if err != nil { fmt.Printf("open file error:%s\n", err) return } defer f.Close() fileInfo, err := f.Stat() if err != nil { fmt.Printf("stat file error:%s\n", err) return } // 或取當前文件修改時間 curModifyTime := fileInfo.ModTime().Unix() if curModifyTime > c.lastModifyTime { // 從新解析時,要考慮應用程序正在讀取這個配置所以應該加鎖 m, err := c.parse() if err != nil { fmt.Printf("parse config error:%v\n", err) return } c.rwLock.Lock() c.data = m c.rwLock.Unlock() c.lastModifyTime = curModifyTime // 配置更新通知全部觀察者 for _, n := range c.notifyList { n.Callback(c) } } }() } } func (c *Config) parse() (m map[string]string, err error) { // 若是在parse()中定義一個map,這樣就是一個新的map不用加鎖 m = make(map[string]string, 1024) f, err := os.Open(c.filename) if err != nil { return } defer f.Close() reader := bufio.NewReader(f) // 聲明一個變量存放讀取行數 var lineNo int for { line, errRet := reader.ReadString('\n') if errRet == io.EOF { // 這裏有一個坑,最後一行若是不是\n結尾會漏讀 lineParse(&lineNo, &line, &m) break } if errRet != nil { err = errRet return } lineParse(&lineNo, &line, &m) } return } func lineParse(lineNo *int, line *string, m *map[string]string) { *lineNo++ l := strings.TrimSpace(*line) // 若是空行 或者 是註釋 跳過 if len(l) == 0 || l[0] =='\n' || l[0]=='#' || l[0]==';' { return } itemSlice := strings.Split(l, "=") // = if len(itemSlice) == 0 { fmt.Printf("invalid config, line:%d", lineNo) return } key := strings.TrimSpace(itemSlice[0]) if len(key) == 0 { fmt.Printf("invalid config, line:%d", lineNo) return } if len(key) == 1 { (*m)[key] = "" return } value := strings.TrimSpace(itemSlice[1]) (*m)[key] = value return } func (c *Config) GetInt(key string)(value int, err error){ c.rwLock.RLock() defer c.rwLock.RUnlock() str, ok := c.data[key] if !ok { err = fmt.Errorf("key [%s] not found", key) } value, err = strconv.Atoi(str) return } func (c *Config) GetIntDefault(key string, defaultInt int)(value int){ c.rwLock.RLock() defer c.rwLock.RUnlock() str, ok := c.data[key] if !ok { value = defaultInt return } value, err := strconv.Atoi(str) if err != nil { value = defaultInt } return } func (c *Config) GetString(key string)(value string, err error){ c.rwLock.RLock() defer c.rwLock.RUnlock() value, ok := c.data[key] if !ok { err = fmt.Errorf("key [%s] not found", key) } return } func (c *Config) GetIStringDefault(key string, defaultStr string)(value string){ c.rwLock.RLock() defer c.rwLock.RUnlock() value, ok := c.data[key] if !ok { value = defaultStr return } return }
reconf/notify.go:
package reconf // 通知應用程序文件改變 type Notifyer interface { Callback(*Config) }
reconf_test/main.go:
package main import ( "time" "go_dev/s23_conf/reconf" "sync/atomic" "fmt" ) type AppConfig struct { hostname string port int kafkaAddr string kafkaPort int } // reload()協程寫 和 for循環的讀,都是對Appconfig對象,所以有讀寫衝突 type AppConfigMgr struct { config atomic.Value } // 初始化結構體 var appConfigMgr = &AppConfigMgr{} func (a *AppConfigMgr)Callback(conf *reconf.Config) { appConfig := &AppConfig{} hostname, err := conf.GetString("hostname") if err != nil { fmt.Printf("get hostname err: %v\n", err) return } appConfig.hostname = hostname kafkaPort, err := conf.GetInt("kafkaPort") if err != nil { fmt.Printf("get kafkaPort err: %v\n", err) return } appConfig.kafkaPort = kafkaPort appConfigMgr.config.Store(appConfig) } func initConfig(file string) { // [1] 打開配置文件 conf, err := reconf.NewConfig(file) if err != nil { fmt.Printf("read config file err: %v\n", err) return } // 添加觀察者 conf.AddObserver(appConfigMgr) // [2]第一次讀取配置文件 var appConfig AppConfig appConfig.hostname, err = conf.GetString("hostname") if err != nil { fmt.Printf("get hostname err: %v\n", err) return } fmt.Println("Hostname:", appConfig.hostname) appConfig.kafkaPort, err = conf.GetInt("kafkaPort") if err != nil { fmt.Printf("get kafkaPort err: %v\n", err) return } fmt.Println("kafkaPort:", appConfig.kafkaPort) // [3] 把讀取到的配置文件數據存儲到atomic.Value appConfigMgr.config.Store(&appConfig) fmt.Println("first load sucess.") } func run(){ for { appConfig := appConfigMgr.config.Load().(*AppConfig) fmt.Println("Hostname:", appConfig.hostname) fmt.Println("kafkaPort:", appConfig.kafkaPort) fmt.Printf("%v\n", "--------------------") time.Sleep(5 * time.Second) } } func main() { confFile := "../parseConfig/test.cfg" initConfig(confFile) // 應用程序 不少配置已經不是存在文件中而是etcd run() }
本篇全部代碼都上傳到github上了:https://github.com/zingp/goclub/tree/master/go_dev/s23_conf。參見該目錄下的reconf 和reconf_test兩個目錄。