kubeproxy源碼分析

kubernetes離線安裝包,僅需三步

kube-proxy源碼解析

ipvs相對於iptables模式具有較高的性能與穩定性, 本文講以此模式的源碼解析爲主,若是想去了解iptables模式的原理,能夠去參考其實現,架構上無差異。linux

kube-proxy主要功能是監聽service和endpoint的事件,而後下放代理策略到機器上。 底層調用docker/libnetwork, 而libnetwork最終調用了netlink 與netns來實現ipvs的建立等動做
<!--more-->git

初始化配置

代碼入口:cmd/kube-proxy/app/server.go Run() 函數github

經過命令行參數去初始化proxyServer的配置docker

proxyServer, err := NewProxyServer(o)
type ProxyServer struct {
    // k8s client
    Client                 clientset.Interface
    EventClient            v1core.EventsGetter

    // ipvs 相關接口
    IptInterface           utiliptables.Interface
    IpvsInterface          utilipvs.Interface
    IpsetInterface         utilipset.Interface

    // 處理同步時的處理器
    Proxier                proxy.ProxyProvider

    // 代理模式,ipvs iptables userspace kernelspace(windows)四種
    ProxyMode              string
    // 配置同步週期
    ConfigSyncPeriod       time.Duration

    // service 與 endpoint 事件處理器
    ServiceEventHandler    config.ServiceHandler
    EndpointsEventHandler  config.EndpointsHandler
}

Proxier是主要入口,抽象了兩個函數:編程

type ProxyProvider interface {
    // Sync immediately synchronizes the ProxyProvider's current state to iptables.
    Sync()
    // 按期執行
    SyncLoop()
}

ipvs 的interface 這個很重要:windows

type Interface interface {
    // 刪除全部規則
    Flush() error
    // 增長一個virtual server
    AddVirtualServer(*VirtualServer) error

    UpdateVirtualServer(*VirtualServer) error
    DeleteVirtualServer(*VirtualServer) error
    GetVirtualServer(*VirtualServer) (*VirtualServer, error)
    GetVirtualServers() ([]*VirtualServer, error)

    // 給virtual server加個realserver, 如 VirtualServer就是一個clusterip realServer就是pod(或者自定義的endpoint)
    AddRealServer(*VirtualServer, *RealServer) error
    GetRealServers(*VirtualServer) ([]*RealServer, error)
    DeleteRealServer(*VirtualServer, *RealServer) error
}

咱們在下文再詳細看ipvs_linux是如何實現上面接口的api

virtual server與realserver, 最重要的是ip:port,而後就是一些代理的模式如sessionAffinity等:數組

type VirtualServer struct {
    Address   net.IP
    Protocol  string
    Port      uint16
    Scheduler string
    Flags     ServiceFlags
    Timeout   uint32
}

type RealServer struct {
    Address net.IP
    Port    uint16
    Weight  int
}
建立apiserver client
client, eventClient, err := createClients(config.ClientConnection, master)
建立Proxier 這是僅僅關注ipvs模式的proxier
else if proxyMode == proxyModeIPVS {
        glog.V(0).Info("Using ipvs Proxier.")
        proxierIPVS, err := ipvs.NewProxier(
            iptInterface,
            ipvsInterface,
            ipsetInterface,
            utilsysctl.New(),
            execer,
            config.IPVS.SyncPeriod.Duration,
            config.IPVS.MinSyncPeriod.Duration,
            config.IPTables.MasqueradeAll,
            int(*config.IPTables.MasqueradeBit),
            config.ClusterCIDR,
            hostname,
            getNodeIP(client, hostname),
            recorder,
            healthzServer,
            config.IPVS.Scheduler,
        )
...
        proxier = proxierIPVS
        serviceEventHandler = proxierIPVS
        endpointsEventHandler = proxierIPVS

這個Proxier具有如下方法:服務器

+OnEndpointsAdd(endpoints *api.Endpoints)
   +OnEndpointsDelete(endpoints *api.Endpoints)
   +OnEndpointsSynced()
   +OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
   +OnServiceAdd(service *api.Service)
   +OnServiceDelete(service *api.Service)
   +OnServiceSynced()
   +OnServiceUpdate(oldService, service *api.Service)
   +Sync()
   +SyncLoop()

因此ipvs的這個Proxier實現了咱們須要的絕大部分接口session

小結一下:

+-----------> endpointHandler
     |
     +-----------> serviceHandler
     |                ^
     |                | +-------------> sync 按期同步等
     |                | |
ProxyServer---------> Proxier --------> service 事件回調           
     |                  |                                                
     |                  +-------------> endpoint事件回調          
     |                                             |  觸發
     +-----> ipvs interface ipvs handler     <-----+

啓動proxyServer

  1. 檢查是否是帶了clean up參數,若是帶了那麼清除全部規則退出
  2. OOM adjuster貌似沒實現,忽略
  3. resouceContainer也沒實現,忽略
  4. 啓動metrics服務器,這個挺重要,好比咱們想監控時能夠傳入這個參數, 包含promethus的 metrics. metrics-bind-address參數
  5. 啓動informer, 開始監聽事件,分別啓動協程處理。

1 2 3 4咱們都不用太關注,細看5便可:

informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)

serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
// 註冊 service handler並啓動
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
// 這裏面僅僅是把ServiceEventHandler賦值給informer回調 
go serviceConfig.Run(wait.NeverStop)

endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
// 註冊endpoint 
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
go endpointsConfig.Run(wait.NeverStop)

go informerFactory.Start(wait.NeverStop)

serviceConfig.Run與endpointConfig.Run僅僅是給回調函數賦值, 因此註冊的handler就給了informer, informer監聽到事件時就會回調:

for i := range c.eventHandlers {
    glog.V(3).Infof("Calling handler.OnServiceSynced()")
    c.eventHandlers[i].OnServiceSynced()
}

那麼問題來了,註冊進去的這個handler是啥? 回顧一下上文的

serviceEventHandler = proxierIPVS
        endpointsEventHandler = proxierIPVS

因此都是這個proxierIPVS

handler的回調函數, informer會回調這幾個函數,因此咱們在本身開發時實現這個interface註冊進去便可:

type ServiceHandler interface {
    // OnServiceAdd is called whenever creation of new service object
    // is observed.
    OnServiceAdd(service *api.Service)
    // OnServiceUpdate is called whenever modification of an existing
    // service object is observed.
    OnServiceUpdate(oldService, service *api.Service)
    // OnServiceDelete is called whenever deletion of an existing service
    // object is observed.
    OnServiceDelete(service *api.Service)
    // OnServiceSynced is called once all the initial even handlers were
    // called and the state is fully propagated to local cache.
    OnServiceSynced()
}

開始監聽

go informerFactory.Start(wait.NeverStop)

這裏執行後,咱們建立刪除service endpoint等動做都會被監聽到,而後回調,回顧一下上面的圖,最終都是由Proxier去實現,因此後面咱們重點關注Proxier便可

s.Proxier.SyncLoop()

而後開始SyncLoop,下文開講

Proxier 實現

咱們建立一個service時OnServiceAdd方法會被調用, 這裏記錄一下以前的狀態與當前狀態兩個東西,而後發個信號給syncRunner讓它去處理:

func (proxier *Proxier) OnServiceAdd(service *api.Service) {
    namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
    if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
        proxier.syncRunner.Run()
    }
}

記錄service 信息,能夠看到沒作什麼事,就是把service存在map裏, 若是沒變直接刪掉map信息不作任何處理:

change, exists := scm.items[*namespacedName]
if !exists {
    change = &serviceChange{}
    // 老的service信息
    change.previous = serviceToServiceMap(previous)
    scm.items[*namespacedName] = change
}
// 當前監聽到的service信息
change.current = serviceToServiceMap(current)

若是同樣,直接刪除
if reflect.DeepEqual(change.previous, change.current) {
    delete(scm.items, *namespacedName)
}

proxier.syncRunner.Run() 裏面就發送了一個信號

select {
case bfr.run <- struct{}{}:
default:
}

這裏面處理了這個信號

s.Proxier.SyncLoop()

func (proxier *Proxier) SyncLoop() {
    // Update healthz timestamp at beginning in case Sync() never succeeds.
    if proxier.healthzServer != nil {
        proxier.healthzServer.UpdateTimestamp()
    }
    proxier.syncRunner.Loop(wait.NeverStop)
}

runner裏收到信號執行,沒收到信號會按期執行:

func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
    glog.V(3).Infof("%s Loop running", bfr.name)
    bfr.timer.Reset(bfr.maxInterval)
    for {
        select {
        case <-stop:
            bfr.stop()
            glog.V(3).Infof("%s Loop stopping", bfr.name)
            return
        case <-bfr.timer.C():  // 按期執行
            bfr.tryRun()
        case <-bfr.run:
            bfr.tryRun()       // 收到事件信號執行
        }
    }
}

這個bfr runner裏咱們最須要主意的是一個回調函數,tryRun裏檢查這個回調是否知足被調度的條件:

type BoundedFrequencyRunner struct {
    name        string        // the name of this instance
    minInterval time.Duration // the min time between runs, modulo bursts
    maxInterval time.Duration // the max time between runs

    run chan struct{} // try an async run

    mu      sync.Mutex  // guards runs of fn and all mutations
    fn      func()      // function to run, 這個回調
    lastRun time.Time   // time of last run
    timer   timer       // timer for deferred runs
    limiter rateLimiter // rate limiter for on-demand runs
}

// 傳入的proxier.syncProxyRules這個函數
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

這是個600行左右的搓逼函數,也是處理主要邏輯的地方。

syncProxyRules

  1. 設置一些iptables規則,如mark與comment
  2. 肯定機器上有網卡,ipvs須要綁定地址到上面
  3. 肯定有ipset,ipset是iptables的擴展,能夠給一批地址設置iptables規則

...(又臭又長,重複代碼多,看不下去了,細節問題本身去看吧)

  1. 咱們最關注的,如何去處理VirtualServer的
serv := &utilipvs.VirtualServer{
    Address:   net.ParseIP(ingress.IP),
    Port:      uint16(svcInfo.port),
    Protocol:  string(svcInfo.protocol),
    Scheduler: proxier.ipvsScheduler,
}
if err := proxier.syncService(svcNameString, serv, false); err == nil {
    if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
    }
}

看下實現, 若是沒有就建立,若是已存在就更新, 給網卡綁定service的cluster ip:

func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
    appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
    if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
        if appliedVirtualServer == nil {
            if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
                return err
            }
        } else {
            if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil {
                return err
            }
        }
    }

    // bind service address to dummy interface even if service not changed,
    // in case that service IP was removed by other processes
    if bindAddr {
        _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
        if err != nil {
            return err
        }
    }
    return nil
}

建立service實現

如今能夠去看ipvs的AddVirtualServer的實現了,主要是利用socket與內核進程通訊作到的。
pkg/util/ipvs/ipvs_linux.go 裏 runner結構體實現了這些方法, 這裏用到了 docker/libnetwork/ipvs庫:

// runner implements Interface.
type runner struct {
    exec       utilexec.Interface
    ipvsHandle *ipvs.Handle
}

// New returns a new Interface which will call ipvs APIs.
func New(exec utilexec.Interface) Interface {
    ihandle, err := ipvs.New("") // github.com/docker/libnetwork/ipvs
    if err != nil {
        glog.Errorf("IPVS interface can't be initialized, error: %v", err)
        return nil
    }
    return &runner{
        exec:       exec,
        ipvsHandle: ihandle,
    }
}

New的時候建立了一個特殊的socket, 這裏與咱們普通的socket編程無差異,關鍵是syscall.AF_NETLINK這個參數,表明與內核進程通訊:

sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC)

func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
    fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol)
    if err != nil {
        return nil, err
    }
    s := &NetlinkSocket{
        fd: int32(fd),
    }
    s.lsa.Family = syscall.AF_NETLINK
    if err := syscall.Bind(fd, &s.lsa); err != nil {
        syscall.Close(fd)
        return nil, err
    }

    return s, nil
}

建立一個service, 轉換成docker service格式,直接調用:

// AddVirtualServer is part of Interface.
func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
    eSvc, err := toBackendService(vs)
    if err != nil {
        return err
    }
    return runner.ipvsHandle.NewService(eSvc)
}

而後就是把service信息打包,往socket裏面寫便可:

func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
    req := newIPVSRequest(cmd)
    req.Seq = atomic.AddUint32(&i.seq, 1)

    if s == nil {
        req.Flags |= syscall.NLM_F_DUMP                    //Flag to dump all messages
        req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
    } else {
        req.AddData(fillService(s))
    } // 把service塞到請求中

    if d == nil {
        if cmd == ipvsCmdGetDest {
            req.Flags |= syscall.NLM_F_DUMP
        }

    } else {
        req.AddData(fillDestinaton(d))
    }

    // 給內核進程發送service信息
    res, err := execute(i.sock, req, 0)
    if err != nil {
        return [][]byte{}, err
    }

    return res, nil
}
構造請求
func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {
    return newGenlRequest(ipvsFamily, cmd)
}

在構造請求時傳入的是ipvs協議簇

而後構造一個與內核通訊的消息頭

func NewNetlinkRequest(proto, flags int) *NetlinkRequest {
    return &NetlinkRequest{
        NlMsghdr: syscall.NlMsghdr{
            Len:   uint32(syscall.SizeofNlMsghdr),
            Type:  uint16(proto),
            Flags: syscall.NLM_F_REQUEST | uint16(flags),
            Seq:   atomic.AddUint32(&nextSeqNr, 1),
        },
    }
}
給消息加Data,這個Data是個數組,須要實現兩個方法:
type NetlinkRequestData interface {
    Len() int  // 長度
    Serialize() []byte // 序列化, 內核通訊也須要必定的數據格式,service信息也須要實現
}

好比 header是這樣序列化的, 一看愣住了,思考很久纔看懂:
拆下看:
([unsafe.Sizeof(hdr)]byte) 一個*[]byte類型,長度就是結構體大小
(unsafe.Pointer(hdr))把結構體轉成byte指針類型
加個*取它的值
用[:]轉成byte返回

func (hdr *genlMsgHdr) Serialize() []byte {
    return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
}
發送service信息給內核

一個很普通的socket發送接收數據

func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
    var (
        err error
    )

    if err := s.Send(req); err != nil {
        return nil, err
    }

    pid, err := s.GetPid()
    if err != nil {
        return nil, err
    }

    var res [][]byte

done:
    for {
        msgs, err := s.Receive()
        if err != nil {
            return nil, err
        }
        for _, m := range msgs {
            if m.Header.Seq != req.Seq {
                continue
            }
            if m.Header.Pid != pid {
                return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
            }
            if m.Header.Type == syscall.NLMSG_DONE {
                break done
            }
            if m.Header.Type == syscall.NLMSG_ERROR {
                error := int32(native.Uint32(m.Data[0:4]))
                if error == 0 {
                    break done
                }
                return nil, syscall.Errno(-error)
            }
            if resType != 0 && m.Header.Type != resType {
                continue
            }
            res = append(res, m.Data)
            if m.Header.Flags&syscall.NLM_F_MULTI == 0 {
                break done
            }
        }
    }
    return res, nil
}
Service 數據打包
這裏比較細,核心思想就是內核只認必定格式的標準數據,咱們把service信息按其標準打包發送給內核便可。
至於怎麼打包的就不詳細講了。
func fillService(s *Service) nl.NetlinkRequestData {
    cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil)
    nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily))
    if s.FWMark != 0 {
        nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark))
    } else {
        nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol))
        nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address))

        // Port needs to be in network byte order.
        portBuf := new(bytes.Buffer)
        binary.Write(portBuf, binary.BigEndian, s.Port)
        nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes())
    }

    nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName))
    if s.PEName != "" {
        nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
    }
    f := &ipvsFlags{
        flags: s.Flags,
        mask:  0xFFFFFFFF,
    }
    nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize())
    nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout))
    nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask))
    return cmdAttr
}

總結

Service整體來說代碼比較簡單,可是以爲有些地方實現的有點繞,不夠簡單直接。 整體來講就是監聽apiserver事件,而後比對 處理,按期也會去執行同步策略.

掃碼關注sealyun
探討可加QQ羣:98488045

相關文章
相關標籤/搜索