配置文件熱加載的go語言實現

一般咱們更新應用程序的配置文件,都須要手動重啓程序或手動從新加載配置。假設一組服務部署在10臺機器上,你須要藉助批量運維工具執行重啓命令,並且10臺同時重啓可能還會形成服務短暫不可用。要是更新配置後,服務自動刷新配置多好...今天咱們就用go實現配置文件熱加載的小功能,之後更新配置不再用手動重啓了...git

1 基本思路

一般應用程序啓動的流程:加載配置,而後run()。咱們怎麼作到熱加載呢?咱們的思路是這樣的:github

【1】在加載配置文件以後,啓動一個線程app

【2】該線程定時監聽這個配置文件是否有改動運維

【3】若是配置文件有變更,就從新加載一下函數

【4】從新加載以後通知須要使用這些配置的應用程序(進程或線程),實際上就是刷新內存中配置工具

2 加載配置

首先咱們要實現加載配置功能。假設配置文件是k=v格式的,以下:測試

那咱們得寫一個解析配置的包了...讓咱們一塊兒面向對象:atom

type Config struct{
	filename string
	data map[string]string
	lastModifyTime int64
	rwLock sync.RWMutex
	notifyList []Notifyer
}

filename string 配置文件名稱線程

data map[string]string  將配置文件中的k/v解析存放到map中3d

lastModifyTime int64   記錄配置文件上一次更改時間

rwLock sync.RWMutex   讀寫鎖,處理這樣一種競爭狀況:更新這個結構體時其餘線程正在讀取改結構體中的內容,後續用到的時候會講

notifyList []Notifyer  存放全部觀察者,此處咱們用到了觀察者模式,也就是須要用到這個配置的對象,咱們就把它加到這個切片。當配置更新以後,通知切片中的對象配置更新了。

接下來咱們能夠給這個結構體添加一些方法了:

2.1 構造函數

func NewConfig(file string)(conf *Config, err error){
	conf = &Config{
		filename: file,
		data: make(map[string]string, 1024),
	}

	m, err := conf.parse()
	if err != nil {
		fmt.Printf("parse conf error:%v\n", err)
		return
	}

	// 將解析配置文件後的數據更新到結構體的map中,寫鎖
	conf.rwLock.Lock()
	conf.data = m
	conf.rwLock.Unlock()

	// 啓一個後臺線程去檢測配置文件是否更改
	go conf.reload()
	return
}

構造函數作了三件事:【1】初始化Config 【2】調用parse()函數,解析配置文件,並把解析後的map更新到Config 【3】啓動一個線程,準確說是啓動一個goroutine,即reload() 

注意此處更新map時加了寫鎖了,目的在於不影響擁有讀鎖的線程讀取數據。

2.2 parse()

解析函數比較簡單,主要是讀取配置文件,一行行解析,數據存放在map中。

func (c *Config) parse() (m map[string]string, err error) {
	// 若是在parse()中定義一個map,這樣就是一個新的map不用加鎖
	m = make(map[string]string, 1024)

	f, err := os.Open(c.filename)
	if err != nil {
		return
	}
	defer f.Close()

	reader := bufio.NewReader(f)
	// 聲明一個變量存放讀取行數
	var lineNo int
	for {
		line, errRet := reader.ReadString('\n')
		if errRet == io.EOF {
			// 這裏有一個坑,最後一行若是不是\n結尾會漏讀
			lineParse(&lineNo, &line, &m)
			break
		}
		if errRet != nil {
			err = errRet
			return
		}
		
		lineParse(&lineNo, &line, &m)
	}

	return 
}

func lineParse(lineNo *int, line *string, m *map[string]string) {
		*lineNo++

		l := strings.TrimSpace(*line)
		// 若是空行 或者 是註釋 跳過
		if len(l) == 0 || l[0] =='\n' || l[0]=='#' || l[0]==';' {
			return
		}

		itemSlice := strings.Split(l, "=")
		// =
		if len(itemSlice) == 0 {
			fmt.Printf("invalid config, line:%d", lineNo)
			return
		}

		key := strings.TrimSpace(itemSlice[0])
		if len(key) == 0 {
			fmt.Printf("invalid config, line:%d", lineNo)
			return
		}
		if len(key) == 1 {
			(*m)[key] = ""
			return
		}

		value := strings.TrimSpace(itemSlice[1])
		(*m)[key] = value	

		return
}

這裏我寫了兩個函數。lineParse()是解析每一行配置的。parse()就是解析的主函數,在parse()中我調用了兩次lineParse()。緣由是在使用bufio按行讀取配置文件的時候,有時候會出現這樣的狀況:有的人在寫配置文件的時候,最後一行沒有換行,也就是沒有‘\n’,而後咱們就直接讀到io.EOF了,這時候若是直接break就會致使最後一行丟失。因此這種狀況下咱們應該在break以前調用lineParse()把最後一行處理了。

3 封裝接口

上面咱們已經實現了讀取配置文件,並放到一個Config示例中,咱們須要爲這個Config封裝一些接口,方便用戶經過接口訪問Config的內容。這步比較簡單:

func (c *Config) GetInt(key string)(value int, err error){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	str, ok := c.data[key]
	if !ok {
		err = fmt.Errorf("key [%s] not found", key)
	}
	value, err = strconv.Atoi(str)
	return
}

func (c *Config) GetIntDefault(key string, defaultInt int)(value int){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	str, ok := c.data[key]
	if !ok {
		value = defaultInt
		return
	}
	value, err := strconv.Atoi(str)
	if err != nil {
		value = defaultInt
	}
	return
}

func (c *Config) GetString(key string)(value string, err error){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	value, ok := c.data[key]
	if !ok {
		err = fmt.Errorf("key [%s] not found", key)
	}
	return
}

func (c *Config) GetIStringDefault(key string, defaultStr string)(value string){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	value, ok := c.data[key]
	if !ok {
		value = defaultStr
		return
	}
	return
}

如上,一共封裝了4個接口:

GetInt(key string)(value int, err error)   經過key獲取value,並將value轉成int類型

GetIntDefault(key string, defaultInt int)(value int)    經過key獲取value,並將value轉成int類型;若是獲取失敗,使用默認值

GetString(key string)(value string, err error)    經過key獲取value,默認value爲string類型

GetIStringDefault(key string, defaultStr string)(value string)   經過key獲取value,默認value爲string類型;若是獲取失敗,使用默認值

注意:四個接口都用了讀鎖

4 reload()

上面咱們已經實現瞭解析,加載配置文件,併爲Config封裝了比較友好的接口。接下來,咱們能夠仔細看一下咱們以前啓動的goroutine了,即reload()方法。

func (c *Config) reload(){
	// 定時器
	ticker := time.NewTicker(time.Second * 5) 
	for _ = range ticker.C {
		// 打開文件
		// 爲何使用匿名函數? 當匿名函數退出時可用defer去關閉文件
		// 若是不用匿名函數,在循環中很差關閉文件,一不當心就內存泄露
		func (){
			f, err := os.Open(c.filename)
			if err != nil {
				fmt.Printf("open file error:%s\n", err)
				return
			}
			defer f.Close()

			fileInfo, err := f.Stat()
			if err != nil {
				fmt.Printf("stat file error:%s\n", err)
				return
			}
			// 或取當前文件修改時間
			curModifyTime := fileInfo.ModTime().Unix() 
			if curModifyTime > c.lastModifyTime {
				// 從新解析時,要考慮應用程序正在讀取這個配置所以應該加鎖
				m, err := c.parse()
				if err != nil {
					fmt.Printf("parse config error:%v\n", err)
					return
				}

				c.rwLock.Lock()
				c.data = m
				c.rwLock.Unlock()

				c.lastModifyTime = curModifyTime

				// 配置更新通知全部觀察者
				for _, n := range c.notifyList {
					n.Callback(c)
				}
			}
		}()
	}
}

reload()函數中作了這幾件事:

【1】用time.NewTicker每隔5秒去檢查一下配置文件

【2】若是配置文件的修改時間比上一次修改時間大,咱們認爲配置文件更新了。那麼咱們調用parse()解析配置文件,並更新conf實例中的數據。而且更新配置文件的修改時間。

【3】通知全部觀察者,即通知全部使用配置文件的程序、進程或實例,配置更新了。

5 觀察者模式

咱們反覆提到觀察者,反覆提到通知全部觀察者配置文件更新。那麼咱們就要實現這個觀察者:

type Notifyer  interface {
	Callback(*Config)
}  

定義這樣一個Notifyer接口,只要實現了Callback方法的對象,就都實現了這個Notifyer接口。實現了這個接口的對象,若是都須要被通知配置文件更新,那這些對象均可以加入到前面定義的notifyList []Notifyer這個切片中,等待被通知配置文件更新。

好了,此處咱們是否少寫了添加觀察者的方法呢??是的,立刻寫:

// 添加觀察者
func (c *Config) AddObserver(n Notifyer) {
	c.notifyList = append(c.notifyList, n)
}

6 測試

通過上面一番折騰,我們的熱加載就快實現了,咱們來測一測:

一般咱們在應用程序中怎麼使用配置文件?【1】加載配置文件,加載以後數據放在一個全局結構體中 【2】run()

也就是run()中咱們要使用全局的結構體,可是這個全局結構體會由於配置文件的更改被更新。此時又存在須要加鎖的狀況了。我擦,是否是很麻煩。。不用擔憂,咱們用一個原子操做搞定。

假設咱們的配置文件中存放的是hostname/port/kafkaAddr/kafkaPort這幾個字段。。

type AppConfig struct {
	hostname string
	port int
	kafkaAddr string
	kafkaPort int
}  

接下來咱們要用原子操做保證數據一致性了:

// reload()協程寫 和 for循環的讀,都是對Appconfig對象,所以有讀寫衝突
type AppConfigMgr struct {
	config atomic.Value
}

// 初始化結構體
var appConfigMgr = &AppConfigMgr{}  

atomic.Value能保證存放數據和讀取出數據不會有衝突。因此當咱們更新數據時存放到atomic.Value中,咱們使用數據從atomic.Value加載出來,這樣不用加鎖就能保證數據的一致性了。完美~~

咱們須要AppConfigMgr實現Callback方法,即實現Notifyer接口,這樣才能被通知配置更新:

func (a *AppConfigMgr)Callback(conf *reconf.Config) {
	appConfig := &AppConfig{}
	hostname, err := conf.GetString("hostname")
	if err != nil {
		fmt.Printf("get hostname err: %v\n", err)
		return
	}
	appConfig.hostname = hostname

	kafkaPort, err := conf.GetInt("kafkaPort")
	if err != nil {
		fmt.Printf("get kafkaPort err: %v\n", err)
		return
	}
	appConfig.kafkaPort = kafkaPort

	appConfigMgr.config.Store(appConfig)

}

這個Callback實現功能是:當被通知配置更新時,立刻讀取更新的數據並存放到config   atomic.Value 中。

好了,咱們要寫主函數了。

func initConfig(file string) {
	// [1] 打開配置文件
	conf, err := reconf.NewConfig(file)
	if err != nil {
		fmt.Printf("read config file err: %v\n", err)
		return
	}

	// 添加觀察者
	conf.AddObserver(appConfigMgr)

	// [2]第一次讀取配置文件
	var appConfig AppConfig
	appConfig.hostname, err = conf.GetString("hostname")
	if err != nil {
		fmt.Printf("get hostname err: %v\n", err)
		return
	}
	fmt.Println("Hostname:", appConfig.hostname)

	appConfig.kafkaPort, err = conf.GetInt("kafkaPort")
	if err != nil {
		fmt.Printf("get kafkaPort err: %v\n", err)
		return
	}
	fmt.Println("kafkaPort:", appConfig.kafkaPort)

	// [3] 把讀取到的配置文件數據存儲到atomic.Value
	appConfigMgr.config.Store(&appConfig)
	fmt.Println("first load sucess.")

}

func run(){
	for {
		appConfig := appConfigMgr.config.Load().(*AppConfig)

		fmt.Println("Hostname:", appConfig.hostname)
		fmt.Println("kafkaPort:", appConfig.kafkaPort)
		fmt.Printf("%v\n", "--------------------")
		time.Sleep(5 * time.Second)
	}
}

func main() { 
	confFile := "../parseConfig/test.cfg"
	initConfig(confFile)
	// 應用程序 不少配置已經不是存在文件中而是etcd
	run() 
}

主函數中調用了initConfig()和run()。

initConfig()中:reconf.NewConfig(file)的時候咱們已經第一次解析配置,並啓動線程不斷更新配置了。此外initConfig()還作了一些事,就是經過Config提供的接口,將配置文件中的數據讀取到appConfig 中,而後再將appConfig 存儲到 atomic.Value中。

run()就是模擬應用程序在運行過程當中使用配置的過程:run()中獲取配置信息就是從 atomic.Value加載出來,這樣保證數據一致性。

編譯運行,而後不斷更改配置文件中kafkaAddr,測試結果以下:

這樣配置文集熱加載就實現了。

附一下全部代碼:

reconf/reconf.go:

package reconf

// 實現一個解析配置文件的包
import (
	"time"
	"io"
	"fmt"
	"os"
	"bufio"
	"strings"
	"strconv"
	"sync"
)

type Config struct{
	filename string
	data map[string]string
	lastModifyTime int64
	rwLock sync.RWMutex
	notifyList []Notifyer
}

func NewConfig(file string)(conf *Config, err error){
	conf = &Config{
		filename: file,
		data: make(map[string]string, 1024),
	}

	m, err := conf.parse()
	if err != nil {
		fmt.Printf("parse conf error:%v\n", err)
		return
	}

	// 將解析配置文件後的數據更新到結構體的map中,寫鎖
	conf.rwLock.Lock()
	conf.data = m
	conf.rwLock.Unlock()

	// 啓一個後臺線程去檢測配置文件是否更改
	go conf.reload()
	return
}

// 添加觀察者
func (c *Config) AddObserver(n Notifyer) {
	c.notifyList = append(c.notifyList, n)
}

func (c *Config) reload(){
	// 定時器
	ticker := time.NewTicker(time.Second * 5) 
	for _ = range ticker.C {
		// 打開文件
		// 爲何使用匿名函數? 當匿名函數退出時可用defer去關閉文件
		// 若是不用匿名函數,在循環中很差關閉文件,一不當心就內存泄露
		func (){
			f, err := os.Open(c.filename)
			if err != nil {
				fmt.Printf("open file error:%s\n", err)
				return
			}
			defer f.Close()

			fileInfo, err := f.Stat()
			if err != nil {
				fmt.Printf("stat file error:%s\n", err)
				return
			}
			// 或取當前文件修改時間
			curModifyTime := fileInfo.ModTime().Unix() 
			if curModifyTime > c.lastModifyTime {
				// 從新解析時,要考慮應用程序正在讀取這個配置所以應該加鎖
				m, err := c.parse()
				if err != nil {
					fmt.Printf("parse config error:%v\n", err)
					return
				}

				c.rwLock.Lock()
				c.data = m
				c.rwLock.Unlock()

				c.lastModifyTime = curModifyTime

				// 配置更新通知全部觀察者
				for _, n := range c.notifyList {
					n.Callback(c)
				}
			}
		}()
	}
}

func (c *Config) parse() (m map[string]string, err error) {
	// 若是在parse()中定義一個map,這樣就是一個新的map不用加鎖
	m = make(map[string]string, 1024)

	f, err := os.Open(c.filename)
	if err != nil {
		return
	}
	defer f.Close()

	reader := bufio.NewReader(f)
	// 聲明一個變量存放讀取行數
	var lineNo int
	for {
		line, errRet := reader.ReadString('\n')
		if errRet == io.EOF {
			// 這裏有一個坑,最後一行若是不是\n結尾會漏讀
			lineParse(&lineNo, &line, &m)
			break
		}
		if errRet != nil {
			err = errRet
			return
		}
		
		lineParse(&lineNo, &line, &m)
	}

	return 
}

func lineParse(lineNo *int, line *string, m *map[string]string) {
		*lineNo++

		l := strings.TrimSpace(*line)
		// 若是空行 或者 是註釋 跳過
		if len(l) == 0 || l[0] =='\n' || l[0]=='#' || l[0]==';' {
			return
		}

		itemSlice := strings.Split(l, "=")
		// =
		if len(itemSlice) == 0 {
			fmt.Printf("invalid config, line:%d", lineNo)
			return
		}

		key := strings.TrimSpace(itemSlice[0])
		if len(key) == 0 {
			fmt.Printf("invalid config, line:%d", lineNo)
			return
		}
		if len(key) == 1 {
			(*m)[key] = ""
			return
		}

		value := strings.TrimSpace(itemSlice[1])
		(*m)[key] = value	

		return
}


func (c *Config) GetInt(key string)(value int, err error){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	str, ok := c.data[key]
	if !ok {
		err = fmt.Errorf("key [%s] not found", key)
	}
	value, err = strconv.Atoi(str)
	return
}

func (c *Config) GetIntDefault(key string, defaultInt int)(value int){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	str, ok := c.data[key]
	if !ok {
		value = defaultInt
		return
	}
	value, err := strconv.Atoi(str)
	if err != nil {
		value = defaultInt
	}
	return
}

func (c *Config) GetString(key string)(value string, err error){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	value, ok := c.data[key]
	if !ok {
		err = fmt.Errorf("key [%s] not found", key)
	}
	return
}

func (c *Config) GetIStringDefault(key string, defaultStr string)(value string){
	c.rwLock.RLock()
	defer c.rwLock.RUnlock()

	value, ok := c.data[key]
	if !ok {
		value = defaultStr
		return
	}
	return
}

reconf/notify.go:

package reconf

// 通知應用程序文件改變

type Notifyer  interface {
	Callback(*Config)
}

reconf_test/main.go:

package main

import (
	"time"
	"go_dev/s23_conf/reconf"
	"sync/atomic"
	"fmt"
)

type AppConfig struct {
	hostname string
	port int
	kafkaAddr string
	kafkaPort int
}

// reload()協程寫 和 for循環的讀,都是對Appconfig對象,所以有讀寫衝突
type AppConfigMgr struct {
	config atomic.Value
}

// 初始化結構體
var appConfigMgr = &AppConfigMgr{}

func (a *AppConfigMgr)Callback(conf *reconf.Config) {
	appConfig := &AppConfig{}
	hostname, err := conf.GetString("hostname")
	if err != nil {
		fmt.Printf("get hostname err: %v\n", err)
		return
	}
	appConfig.hostname = hostname

	kafkaPort, err := conf.GetInt("kafkaPort")
	if err != nil {
		fmt.Printf("get kafkaPort err: %v\n", err)
		return
	}
	appConfig.kafkaPort = kafkaPort

	appConfigMgr.config.Store(appConfig)

}

func initConfig(file string) {
	// [1] 打開配置文件
	conf, err := reconf.NewConfig(file)
	if err != nil {
		fmt.Printf("read config file err: %v\n", err)
		return
	}

	// 添加觀察者
	conf.AddObserver(appConfigMgr)

	// [2]第一次讀取配置文件
	var appConfig AppConfig
	appConfig.hostname, err = conf.GetString("hostname")
	if err != nil {
		fmt.Printf("get hostname err: %v\n", err)
		return
	}
	fmt.Println("Hostname:", appConfig.hostname)

	appConfig.kafkaPort, err = conf.GetInt("kafkaPort")
	if err != nil {
		fmt.Printf("get kafkaPort err: %v\n", err)
		return
	}
	fmt.Println("kafkaPort:", appConfig.kafkaPort)

	// [3] 把讀取到的配置文件數據存儲到atomic.Value
	appConfigMgr.config.Store(&appConfig)
	fmt.Println("first load sucess.")

}

func run(){
	for {
		appConfig := appConfigMgr.config.Load().(*AppConfig)

		fmt.Println("Hostname:", appConfig.hostname)
		fmt.Println("kafkaPort:", appConfig.kafkaPort)
		fmt.Printf("%v\n", "--------------------")
		time.Sleep(5 * time.Second)
	}
}

func main() { 
	confFile := "../parseConfig/test.cfg"
	initConfig(confFile)
	// 應用程序 不少配置已經不是存在文件中而是etcd
	run() 
}  

本篇全部代碼都上傳到github上了https://github.com/zingp/goclub/tree/master/go_dev/s23_conf。參見該目錄下的reconf 和reconf_test兩個目錄。

相關文章
相關標籤/搜索