本文將從docker(1.12.6)源碼的角度分析docker daemon怎麼將容器的日誌收集出來並經過配置的log-driver發送出去,並結合示例介紹了好雨雲幫中實現的一個zmq-loger。閱讀本文,你也能夠實現適合本身業務場景的log-driver。php
本文適合可以閱讀和編寫golang代碼的同窗。 (1)首先你須要認知如下幾個關鍵詞:python
(2)你須要知道關於進程產生日誌的形式:
進程產生日誌有兩類輸出方式,一類是寫入到文件中。另外一類是直接寫到stdout或者stderr,例如php的echo
python的print
golang的fmt.Println("")
等等。
(3)是否知道docker-daemon與運行中container的關係? 一個container就是一個特殊的進程,它是由docker daemon建立並啓動,所以container是docker daemon的子進程。由docker daemon守護和管理。所以container的stdout可以被docker daemon獲取到。基於此理論,咱們來分析docker daemon相關代碼。golang
# /container/container.go:62 type CommonContainer struct{ StreamConfig *stream.Config ... } # /container/stream/streams.go:26 type Config struct { sync.WaitGroup stdout *broadcaster.Unbuffered stderr *broadcaster.Unbuffered stdin io.ReadCloser stdinPipe io.WriteCloser }
找到如上所示對應的代碼,顯示了每個container實例都有幾個屬性stdout,stderr,stdin,以及管道stdinPipe。這裏說下stdinPipe,當容器使用-i參數啓動時標準輸入將被運行,daemon將可以使用此管道向容器內寫入標準輸入。docker
![2017011930658image2017-1-18 17-18-38.png](http://7xqmjb.com1.z0.glb.clouddn.com/2017011930658image2017-1-18 17-18-38.png)json
咱們試想以上圖例,若是是你,你怎麼實現日誌收集轉發?安全
# /container/container.go:312 func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) { c, err := logger.GetLogDriver(cfg.Type) if err != nil { return nil, fmt.Errorf("Failed to get logging factory: %v", err) } ctx := logger.Context{ Config: cfg.Config, ContainerID: container.ID, ContainerName: container.Name, ContainerEntrypoint: container.Path, ContainerArgs: container.Args, ContainerImageID: container.ImageID.String(), ContainerImageName: container.Config.Image, ContainerCreated: container.Created, ContainerEnv: container.Config.Env, ContainerLabels: container.Config.Labels, DaemonName: "docker", } // Set logging file for "json-logger" if cfg.Type == jsonfilelog.Name { ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID)) if err != nil { return nil, err } } return c(ctx) } #/container/container.go:978 func (container *Container) startLogging() error { if container.HostConfig.LogConfig.Type == "none" { return nil // do not start logging routines } l, err := container.StartLogger(container.HostConfig.LogConfig) if err != nil { return fmt.Errorf("Failed to initialize logging driver: %v", err) } copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l) container.LogCopier = copier copier.Run() container.LogDriver = l // set LogPath field only for json-file logdriver if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok { container.LogPath = jl.LogPath() } return nil }
第一個方法是爲container查找log-driver。首先根據容器配置的log-driver類別調用:logger.GetLogDriver(cfg.Type)
返回一個方法類型:socket
/daemon/logger/factory.go:9 type Creator func(Context) (Logger, error)
實質就是從工廠類註冊的logdriver插件去查找,具體源碼下文分析。獲取到c方法後構建調用參數具體就是容器的一些信息。而後使用調用c方法返回driver。driver是個接口類型,咱們看看有哪些方法:源碼分析
# /daemon/logger/logger.go:61 type Logger interface { Log(*Message) error Name() string Close() error }
很簡單的三個方法,也很容易理解,Log()
發送日誌消息到driver,Close()
進行關閉操做(根據不一樣實現)。 也就是說咱們本身實現一個logdriver,只須要實現如上三個方法,而後註冊到logger工廠類中便可。下面咱們來看/daemon/logger/factory.go
插件
第二個方法就是處理日誌了,獲取到日誌driver,在建立一個Copier
,顧名思義就是複製日誌,分別從stdout 和stderr複製到logger driver。下面看看具體關鍵實現:線程
#/daemon/logger/copir.go:41 func (c *Copier) copySrc(name string, src io.Reader) { defer c.copyJobs.Done() reader := bufio.NewReader(src) for { select { case <-c.closed: return default: line, err := reader.ReadBytes('\n') line = bytes.TrimSuffix(line, []byte{'\n'}) // ReadBytes can return full or partial output even when it failed. // e.g. it can return a full entry and EOF. if err == nil || len(line) > 0 { if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil { logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) } } if err != nil { if err != io.EOF { logrus.Errorf("Error scanning log stream: %s", err) } return } } } }
每讀取一行數據,構建一個消息,調用logdriver的log方法發送到driver處理。
位於/daemon/logger/factory.go
的源碼實現即時日誌driver的註冊器,其中幾個重要的方法(上文已經提到一個):
# /daemon/logger/factory.go:21 func (lf *logdriverFactory) register(name string, c Creator) error { if lf.driverRegistered(name) { return fmt.Errorf("logger: log driver named '%s' is already registered", name) } lf.m.Lock() lf.registry[name] = c lf.m.Unlock() return nil } # /daemon/logger/factory.go:39 func (lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error { lf.m.Lock() defer lf.m.Unlock() if _, ok := lf.optValidator[name]; ok { return fmt.Errorf("logger: log validator named '%s' is already registered", name) } lf.optValidator[name] = l return nil }
看起來很簡單,就是將一個Creator
方法類型添加到一個map結構中,將LogOptValidator
添加到另外一個map這裏注意加鎖的操做。
#/daemon/logger/factory.go:13 type LogOptValidator func(cfg map[string]string) error
這個主要是驗證driver的參數 ,dockerd和docker啓動參數中有:--log-opt
上文已經完整分析了docker daemon管理logdriver和處理日誌的整個流程。相信你已經比較明白了。下面咱們以zmq-driver爲例講講咱們怎麼實現本身的driver。直接接收容器的日誌。
上文咱們已經談了一個log-driver須要實現的幾個方法。 咱們能夠看看位於/daemon/logger
目錄下的已有的driver的實現,例如fluentd
,awslogs
等。 下面咱們來分析zmq-driver具體的代碼:
//定義一個struct,這裏包含一個zmq套接字 type ZmqLogger struct { writer *zmq.Socket containerId string tenantId string serviceId string felock sync.Mutex } //定義init方法調用logger註冊器的方法註冊當前driver //和參數驗證方法。 func init() { if err := logger.RegisterLogDriver(name, New); err != nil { logrus.Fatal(err) } if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil { logrus.Fatal(err) } } //實現一個上文提到的Creator方法註冊logdriver. //這裏新建一個zmq套接字構建一個實例 func New(ctx logger.Context) (logger.Logger, error) { zmqaddress := ctx.Config[zmqAddress] puber, err := zmq.NewSocket(zmq.PUB) if err != nil { return nil, err } var ( env = make(map[string]string) tenantId string serviceId string ) for _, pair := range ctx.ContainerEnv { p := strings.SplitN(pair, "=", 2) //logrus.Errorf("ContainerEnv pair: %s", pair) if len(p) == 2 { key := p[0] value := p[1] env[key] = value } } tenantId = env["TENANT_ID"] serviceId = env["SERVICE_ID"] if tenantId == "" { tenantId = "default" } if serviceId == "" { serviceId = "default" } puber.Connect(zmqaddress) return &ZmqLogger{ writer: puber, containerId: ctx.ID(), tenantId: tenantId, serviceId: serviceId, felock: sync.Mutex{}, }, nil } //實現Log方法,這裏使用zmq socket發送日誌消息 //這裏必須注意,zmq socket是線程不安全的,咱們知道 //本方法可能被兩個線程(複製stdout和膚質stderr)調用//必須使用鎖保證線程安全。不然會發生錯誤。 func (s *ZmqLogger) Log(msg *logger.Message) error { s.felock.Lock() defer s.felock.Unlock() s.writer.Send(s.tenantId, zmq.SNDMORE) s.writer.Send(s.serviceId, zmq.SNDMORE) if msg.Source == "stderr" { s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT) } else { s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT) } return nil } //實現Close方法,這裏用來關閉zmq socket。 //一樣注意線程安全,調用此方法的是容器關閉協程。 func (s *ZmqLogger) Close() error { s.felock.Lock() defer s.felock.Unlock() if s.writer != nil { return s.writer.Close() } return nil } func (s *ZmqLogger) Name() string { return name } //驗證參數的方法,咱們使用參數傳入zmq pub的地址。 func ValidateLogOpt(cfg map[string]string) error { for key := range cfg { switch key { case zmqAddress: default: return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name) } } if cfg[zmqAddress] == "" { return fmt.Errorf("must specify a value for log opt '%s'", zmqAddress) } return nil }
多研究源碼能夠方便咱們理解docker的工做原理。今天咱們分析了日誌部分。但願讀者對這部分功能可以理解得更清晰。