Docker自發現註冊服務regd研發

0. 前言

[技術源於藝術, 藝術源於生活]
1) 這是我第一次發佈程序相關的技術文章, 10年前發表過不少關於3dsmax和maya的技術文章
2) 有人無故轉載個人文章, 因此這裏留一個個人聯繫方式, 歡迎討論 
郵箱: kekuer@gmail.com       qq: 5513219node

 

1. 背景

註冊服務其實目前已經有不少項目都在作, 好比: https://github.com/gliderlabs/registrator, 爲何我這裏還要本身作一套? 很簡單, 過重, 太年輕, 太不適應個人需求c++

 

2. 準備

1) 首先須要瞭解Docker Remote API, 比較簡單, REST+JSON, 能夠瀏覽一下官方網站:https://docs.docker.com/engine/reference/api/docker_remote_api_v1.23/git

2) etcd, 該服務生來就是爲服務自發現而作的, 官方介紹說得很簡單且清晰: consistent key-value store for shared configuration and service discovery, 也很簡單, REST+JSONgithub

能夠經過官方API來了解: https://github.com/coreos/etcd/blob/master/Documentation/v2/api.mdgolang

本文假設你已經對以上兩點有比較清晰的認知docker

 

3. 需求分析

要完成該服務, 咱們須要分析幾個事情:shell

1) 用什麼語言來實現

因爲docker和etcd這些應用都是golang寫的, 所以我毅然的決定就用golang來作了, 沒有多想, 讀者能夠考慮用其餘任何語言來實現, 好比c, c++, node.js等json

golang其實在該項目上有幾個好處: a) 輕量, b) 內存消耗少, c) 開發速度快, d) 什麼系統均可以跑api

2) 須要哪些模塊

所有都是golang的內構庫bash

a) tcp, http (用於請求)

都是REST的, 爲何不所有使用http, 而要退到tcp呢? 

由於docker API中的events接口是用的http的Chunked Encoding (Transfer-Encoding: chunked)

具體的能夠本身看一下http的協議定義: https://www.w3.org/Protocols/rfc2616/rfc2616.txt 中的(3.6.1 Chunked Transfer Coding), 都很簡單

4. 邏輯分析

1) 監聽docker events

a) docker API 中 GET /events, 上面咱們已經分析過, 這裏是Transfer-Encoding: chunked的一個http協議

b) 因爲我不知道golang是否有對chunked encoding支持 (看了源碼沒有找到, 若是有人知道的話, 提供一下指引, 謝謝), 所以這裏只有用tcp來封一個

c) 根據咱們業務需求, 咱們只須要對docker的start 和disconnect 2個事件進行監聽

爲何是start而不是connect? 由於connect作了之後, 業務不必定啓動起來了

爲何是disconnect而不是stop? 由於業務需求的是網絡自發現, 因此不能等待到stop, 只要網絡斷了, 必須立刻通知相應業務

func monitorDockerEvent() {
	log("info", "monitorevent", "connecting to docker API server", &logDetailStruct{
		Server: dockerAPIAddr,
	})
	reqContent := "GET /events?filters={%22event%22:[%22start%22,%22disconnect%22]} HTTP/1.1\n\n"
	tcpAddr, err := net.ResolveTCPAddr("tcp", dockerAPIAddr)
	if err != nil {
		catchErr()
		log("error", "monitorevent", "resolve tcp address failed: "+err.Error(), &logDetailStruct{
			Server: dockerAPIAddr,
		})
		return
	}

	conn, err := net.DialTCP("tcp", nil, tcpAddr)
	if err != nil {
		catchErr()
		log("error", "monitorevent", "dial tcp failed: "+err.Error(), &logDetailStruct{
			Server: dockerAPIAddr,
		})
		return
	}
	defer conn.Close()

	// write request header to server
	_, err = conn.Write([]byte(reqContent))
	if err != nil {
		catchErr()
		log("error", "monitorevent", "write to server failed: "+err.Error(), &logDetailStruct{
			Server: dockerAPIAddr,
		})
		return
	}

	reply := make([]byte, 1024)
	for {
		_, err = conn.Read(reply)
		if err != nil {
			catchErr()
			log("error", "monitorevent", "read from server failed: "+err.Error(), &logDetailStruct{
				Server: dockerAPIAddr,
			})
			break
		}

		// header is received
		if strings.HasPrefix(string(reply), "HTTP") {
			log("info", "monitorevent", "docker API server is connected", &logDetailStruct{
				Server: dockerAPIAddr,
			})
			continue
		}

		res := strings.Split(string(reply), "\n")
		// chunk is received, first line is length
		if len(res) > 1 {
			body := res[1]

			jsonMap, err := jsonutil.ParseJsonObject(body)
			if err != nil {
				catchErr()
				log("error", "monitorevent", "body json decode failed: "+err.Error(), &logDetailStruct{
					Server: dockerAPIAddr,
				})
				break
			}

			action, err := jsonutil.GetJsonStringValueViaPath(&jsonMap, "Action")
			if err != nil {
				catchErr()
				log("error", "monitorevent", "get container event action failed: "+err.Error(), &logDetailStruct{
					Server: dockerAPIAddr,
				})
				break
			}

			if action == "disconnect" {
				id, err := jsonutil.GetJsonStringValueViaPath(&jsonMap, "Actor", "Attributes", "container")
				if err != nil {
					catchErr()
					log("error", "monitorevent", "get disconnect container id failed: "+err.Error(), &logDetailStruct{
						Server: dockerAPIAddr,
					})
					break
				}
				deregisterContainer(id)
			} else if action == "start" {
				id, err := jsonutil.GetJsonStringValueViaPath(&jsonMap, "Actor", "ID")
				if err != nil {
					catchErr()
					log("error", "monitorevent", "get start container id failed: "+err.Error(), &logDetailStruct{
						Server: dockerAPIAddr,
					})
					break
				}
				if info, err := inspectContainer(id); err == nil {
					registerContainer(id, info)
				}
			}
		}
	}
}

如上, 我本身封了一個超簡單的log方法, 就是把對象轉成json string (我比較喜歡json log, 後期會寫一些日誌收集分析的文章)

type logDetailStruct struct {
	Server    string `json:"server,omitempty"`
	Container string `json:"id,omitempty"`
	IP        string `json:"ip,omitempty"`
}

type logStruct struct {
	Level  string           `json:"level"`
	Action string           `json:"action"`
	Msg    string           `json:"msg"`
	Detail *logDetailStruct `json:"detail,omitempty"`
}

func log(level, action, msg string, detail *logDetailStruct) {
	log := &logStruct{
		Level:  level,
		Action: action,
		Msg:    msg,
		Detail: detail,
	}
	logString, _ := json.Marshal(log)
	fmt.Printf("%s\n", logString)
}

2) 對docker events返回JSON進行分析

a) 這裏其實就是json的parse(decode, unmarshal), 反正這個方法名字隨便你怎麼叫了, 就是從string搞成object的意思

b) 爲了簡單, 我這裏抽象了幾個json的方法來完成重複調用和獲取對應路徑的值

func ParseJsonObject(jsonString string) (map[string]interface{}, error) {
	var jsonMap map[string]interface{}
	decoder := json.NewDecoder(bytes.NewBuffer([]byte(jsonString)))
	decoder.UseNumber()

	err := decoder.Decode(&jsonMap)
	if err != nil {
		return nil, err
	}

	return jsonMap, nil
}

func GetJsonObjectValueMap(jsonMap *map[string]interface{}, key string) (interface{}, error) {
	jsonObjectValueMap := (*jsonMap)[key]
	if jsonObjectValueMap == nil {
		return nil, errors.New("key is not exists")
	}

	return jsonObjectValueMap, nil
}

func GetJsonValue(jsonMap *map[string]interface{}, key string) (interface{}, error) {
	value := (*jsonMap)[key]
	if value == nil {
		return "", errors.New("key is not exists")
	}

	return value, nil
}

func GetJsonStringValue(jsonMap *map[string]interface{}, key string) (string, error) {
	value, err := GetJsonValue(jsonMap, key)
	if err != nil {
		return "", nil
	}

	ret, ok := value.(string)

	if !ok {
		return "", errors.New("value type is not string")
	}

	return ret, nil
}

func GetJsonValueViaPath(jsonMap *map[string]interface{}, keys ...string) (interface{}, error) {
	jsonObjectValueMap := jsonMap
	for index, key := range keys {
		if index == len(keys)-1 {
			break
		}
		jsonObjectMap, err := GetJsonObjectValueMap(jsonObjectValueMap, key)
		if err != nil {
			return nil, err
		}
		a := jsonObjectMap.(map[string]interface{})
		jsonObjectValueMap = &a
	}

	if jsonObjectValueMap == nil {
		return nil, errors.New("key is not exists")
	}

	return GetJsonValue(jsonObjectValueMap, keys[len(keys)-1])
}

func GetJsonStringValueViaPath(jsonMap *map[string]interface{}, keys ...string) (string, error) {
	value, err := GetJsonValueViaPath(jsonMap, keys...)
	if err != nil {
		return "", err
	}

	ret, ok := value.(string)

	if !ok {
		return "", errors.New("value type is not string")
	}

	return ret, nil
}

c) 爲了後期使用數據的方便, 我把host的ip和host的gateway放入了docker inspect返回的json中, 所以須要先獲取ip和gateway

func GetGateway() (string, error) {
	dat, err := ioutil.ReadFile("/proc/net/route")
	if err != nil {
		return "", err
	}
	routes := strings.Split(string(dat), "\n")
	for index, route := range routes {
		if index == 0 {
			continue
		}
		fields := strings.Split(route, "\t")
		if len(fields) > 3 {
			gateway := fields[2]
			if gateway != "00000000" {
				ipSegs, _ := hex.DecodeString(gateway)
				return fmt.Sprintf("%v.%v.%v.%v", ipSegs[3], ipSegs[2], ipSegs[1], ipSegs[0]), nil
			}
		}
	}
	return "", errors.New("can't get gateway")
}

func GetIps() ([]string, error) {
	ifaces, err := net.Interfaces()
	if err != nil {
		return nil, err
	}

	ips := []string{}
	// handle err
	for _, i := range ifaces {
		// fmt.Println(ifaces)
		addrs, err := i.Addrs()
		if err != nil {
			return nil, err
		}
		// handle err
		for _, addr := range addrs {
			if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
				if ipnet.IP.To4() != nil {
					ips = append(ips, ipnet.IP.String())
				}
			}
		}
	}

	return ips, nil
}

3) 註冊docker container信息到etcd服務

b) etcd API 中 PUT http://127.0.0.1:2379/v2/keys/xxxx, form body是須要set的key和value

etcd的地址根據你本身服務來定, 通常都是一個host一個etcd, 而後註冊到cluster中去;

xxxx爲key, 這裏你能夠根據本身業務來定義一個詳細路徑; 在這裏我使用docker container id來做爲key, 而後前面加上prefix, 例如: abcd.com/docker/container/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

http請求中, header確定須要該條: Content-Type: application/x-www-form-urlencoded

b) golang是默認包含http put方法的, 所以咱們能夠偷懶直接用net/http模塊來實現, 在這裏我簡單封裝了一個etcd Set的方法

func Set(address, key, value string) error {
	client := http.Client{
		Timeout: time.Duration(5 * time.Second),
	}
	req, err := http.NewRequest("PUT",
		fmt.Sprintf("%s/v2/keys/%s", address, key),
		strings.NewReader("value="+value))
	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
	if err != nil {
		return err
	}
	res, err := client.Do(req)
	if err != nil {
		return err
	}

	defer res.Body.Close()
	content, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return err
	}

	var jsonMap map[string]interface{}
	err = json.Unmarshal(content, &jsonMap)
	if err != nil {
		return err
	}

	errorCode := jsonMap["errorCode"]
	message := jsonMap["message"]
	if errorCode != nil {
		return fmt.Errorf("set failed: [%v] %v", errorCode, message)
	}

	return nil
}

c) 註冊方法

func registerContainer(id, info string) error {
	detailJsonMap, err := jsonutil.ParseJsonObject(info)
	if err != nil {
		catchErr()
		log("error", "register", "unmarshal info failed: "+err.Error(), &logDetailStruct{
			Container: id,
		})
		return err
	}

	ip, err := jsonutil.GetJsonStringValueViaPath(&detailJsonMap, "NetworkSettings", "IPAddress")
	if err != nil {
		catchErr()
		log("error", "register", "get ip address failed: "+err.Error(), &logDetailStruct{
			Container: id,
		})
		return errors.New("network settings is missing in docker info")
	}

	gateway, err := jsonutil.GetJsonStringValueViaPath(&detailJsonMap, "NetworkSettings", "Gateway")
	if err != nil {
		catchErr()
		log("error", "register", "get gateway failed: "+err.Error(), &logDetailStruct{
			Container: id,
		})
		return errors.New("network settings is missing in docker info")
	}

	image, err := jsonutil.GetJsonStringValueViaPath(&detailJsonMap, "Config", "Image")
	if err != nil {
		catchErr()
		log("error", "register", "get image failed: "+err.Error(), &logDetailStruct{
			Container: id,
		})
		return errors.New("network settings is missing in docker info")
	}

	newInfo, err := json.Marshal(struct {
		HostGateway string
		HostIP      []string
		IP          string
		Gateway     string
		Image       string
		Detail      map[string]interface{}
	}{
		HostGateway: _gateway,
		HostIP:      _hostIP,
		IP:          ip,
		Gateway:     gateway,
		Image:       image,
		Detail:      detailJsonMap,
	})

	if err != nil {
		catchErr()
		log("error", "register", "add addon info failed: "+err.Error(), &logDetailStruct{
			Container: id,
		})
		return err
	}

	if err := etcd.Set(etcdAPIUrl, "gs.io/docker/containers/"+id, string(newInfo)); err != nil {
		catchErr()
		log("error", "register", "etcd set failed: "+err.Error(), &logDetailStruct{
			Container: id,
		})
		return err
	}

	log("info", "register", "register container success", &logDetailStruct{
		Container: id,
		IP:        ip,
	})

	return nil
}

4) 取消註冊docker container

a) etcd API 中 DELETE http://127.0.0.1:2379/v2/keys/xxxx

b) golang是默認包含http delete方法的, 所以在這裏我也簡單封裝了一個etcd Del的方法

func Del(address string, key string) error {
	client := http.Client{
		Timeout: time.Duration(5 * time.Second),
	}
	req, err := http.NewRequest("DELETE",
		fmt.Sprintf("%s/v2/keys/%s", address, key),
		nil)
	if err != nil {
		return err
	}
	res, err := client.Do(req)
	if err != nil {
		return err
	}

	defer res.Body.Close()
	content, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return err
	}

	var jsonMap map[string]interface{}
	err = json.Unmarshal(content, &jsonMap)
	if err != nil {
		return err
	}

	errorCode := jsonMap["errorCode"]
	message := jsonMap["message"]
	if errorCode != nil {
		return fmt.Errorf("delete failed: [%v] %v", errorCode, message)
	}

	return nil
}

 c) 取消註冊方法

func deregisterContainer(id string) error {
	if err := etcd.Del(etcdAPIUrl, "gs.io/docker/containers/"+id); err != nil {
		// Key not found is valid
		if !strings.Contains(err.Error(), "100") {
			catchErr()
			log("error", "register", "etcd delete failed: "+err.Error(), &logDetailStruct{
				Container: id,
			})
			return err
		}
	}

	log("info", "deregister", "deregister container success", &logDetailStruct{
		Container: id,
	})

	return nil
}

5. 源碼

整理以後會發布源碼到碼雲

6. 構造

1) 先作一個docker文件

FROM alpine:3.3
MAINTAINER Docker Containers Registrator Maintainers "kekuer@gmail.com"

ADD ./regd /usr/bin
RUN chmod +x /usr/bin/regd
ENTRYPOINT ["/usr/bin/regd"]

2) 作一個build.sh

這裏使用了一個我作的go-build的鏡像

FROM alpine:3.3
MAINTAINER Docker Golang build Maintainers "kekuer@gmail.com"

RUN apk --update add curl git mercurial bzr go && rm -rf /var/cache/apk/*

WORKDIR "/app"

ENV GOROOT /usr/lib/go
ENV GOPATH /gopath
ENV GOBIN /gopath/bin
ENV PATH $PATH:$GOROOT/bin:$GOPATH/bin

經過shell構建註冊鏡像

#!/bin/bash

VER=1.1

docker run --rm -v $(pwd):/app funwun.io/go-build:1.0  go build regd.go
docker build -t funwun.io/regd:${VER} -f ${VER}.Dockerfile .

7. 結語

此項目, 咱們牽涉到了幾個點, 1) docker api, 2) etcd, 3) golang, 4) http client, 固然都是基礎下的基礎

還有就是提供一個bash的方案, 比較輕, 功能也比較弱, 可是基本能完成:

#!/bin/bash

/usr/bin/docker events -f event=disconnect -f event=connect |
while IFS= read -r line
do
  ID=$(echo $line | grep -o "container=[0-9a-f]\{64\}" | sed -e "s/container=//g")
  REMOVE=false
  if [[ $line == *"disconnect"* ]]; then
  	REMOVE=true
  fi
  if [ ! -z "$ID" -a "$ID" != " " ]; then
  	ETCD_KEY=funwun.io/docker/containers/$ID
    if [ $REMOVE = true ]; then
      RET=$(etcdctl rm $ETCD_KEY)
      if [[ $RET == "Error*" ]]; then
      	echo $RET
      else
      	echo "[DOWN] /funwun.io/docker/containers/$ID"
      fi
    else
      GATEWAY=$(route -n | grep '^0\.0\.0\.0' | awk '{print $2}')
      INFO=$(docker inspect $ID)
      RET=$(etcdctl set $ETCD_KEY "{\"Info\": ${INFO}, \"Gateway\": \"$GATEWAY\"}")
      if [[ $RET == "Error*" ]]; then
      	echo $RET
      else
      	echo "[UP  ] /funwun.io/docker/containers/$ID"
      fi
    fi
  fi
done
相關文章
相關標籤/搜索