kubernetes離線安裝包,僅需三步
ipvs相對於iptables模式具有較高的性能與穩定性, 本文講以此模式的源碼解析爲主,若是想去了解iptables模式的原理,能夠去參考其實現,架構上無差異。linux
kube-proxy主要功能是監聽service和endpoint的事件,而後下放代理策略到機器上。 底層調用docker/libnetwork, 而libnetwork最終調用了netlink 與netns來實現ipvs的建立等動做
<!--more-->git
代碼入口:cmd/kube-proxy/app/server.go
Run() 函數github
經過命令行參數去初始化proxyServer的配置docker
proxyServer, err := NewProxyServer(o)
type ProxyServer struct { // k8s client Client clientset.Interface EventClient v1core.EventsGetter // ipvs 相關接口 IptInterface utiliptables.Interface IpvsInterface utilipvs.Interface IpsetInterface utilipset.Interface // 處理同步時的處理器 Proxier proxy.ProxyProvider // 代理模式,ipvs iptables userspace kernelspace(windows)四種 ProxyMode string // 配置同步週期 ConfigSyncPeriod time.Duration // service 與 endpoint 事件處理器 ServiceEventHandler config.ServiceHandler EndpointsEventHandler config.EndpointsHandler }
Proxier是主要入口,抽象了兩個函數:編程
type ProxyProvider interface { // Sync immediately synchronizes the ProxyProvider's current state to iptables. Sync() // 按期執行 SyncLoop() }
ipvs 的interface 這個很重要:windows
type Interface interface { // 刪除全部規則 Flush() error // 增長一個virtual server AddVirtualServer(*VirtualServer) error UpdateVirtualServer(*VirtualServer) error DeleteVirtualServer(*VirtualServer) error GetVirtualServer(*VirtualServer) (*VirtualServer, error) GetVirtualServers() ([]*VirtualServer, error) // 給virtual server加個realserver, 如 VirtualServer就是一個clusterip realServer就是pod(或者自定義的endpoint) AddRealServer(*VirtualServer, *RealServer) error GetRealServers(*VirtualServer) ([]*RealServer, error) DeleteRealServer(*VirtualServer, *RealServer) error }
咱們在下文再詳細看ipvs_linux是如何實現上面接口的api
virtual server與realserver, 最重要的是ip:port,而後就是一些代理的模式如sessionAffinity等:數組
type VirtualServer struct { Address net.IP Protocol string Port uint16 Scheduler string Flags ServiceFlags Timeout uint32 } type RealServer struct { Address net.IP Port uint16 Weight int }
建立apiserver client
client, eventClient, err := createClients(config.ClientConnection, master)
建立Proxier 這是僅僅關注ipvs模式的proxier
else if proxyMode == proxyModeIPVS { glog.V(0).Info("Using ipvs Proxier.") proxierIPVS, err := ipvs.NewProxier( iptInterface, ipvsInterface, ipsetInterface, utilsysctl.New(), execer, config.IPVS.SyncPeriod.Duration, config.IPVS.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname), recorder, healthzServer, config.IPVS.Scheduler, ) ... proxier = proxierIPVS serviceEventHandler = proxierIPVS endpointsEventHandler = proxierIPVS
這個Proxier具有如下方法:服務器
+OnEndpointsAdd(endpoints *api.Endpoints) +OnEndpointsDelete(endpoints *api.Endpoints) +OnEndpointsSynced() +OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) +OnServiceAdd(service *api.Service) +OnServiceDelete(service *api.Service) +OnServiceSynced() +OnServiceUpdate(oldService, service *api.Service) +Sync() +SyncLoop()
因此ipvs的這個Proxier實現了咱們須要的絕大部分接口session
小結一下:
+-----------> endpointHandler | +-----------> serviceHandler | ^ | | +-------------> sync 按期同步等 | | | ProxyServer---------> Proxier --------> service 事件回調 | | | +-------------> endpoint事件回調 | | 觸發 +-----> ipvs interface ipvs handler <-----+
1 2 3 4咱們都不用太關注,細看5便可:
informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod) serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod) // 註冊 service handler並啓動 serviceConfig.RegisterEventHandler(s.ServiceEventHandler) // 這裏面僅僅是把ServiceEventHandler賦值給informer回調 go serviceConfig.Run(wait.NeverStop) endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod) // 註冊endpoint endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler) go endpointsConfig.Run(wait.NeverStop) go informerFactory.Start(wait.NeverStop)
serviceConfig.Run與endpointConfig.Run僅僅是給回調函數賦值, 因此註冊的handler就給了informer, informer監聽到事件時就會回調:
for i := range c.eventHandlers { glog.V(3).Infof("Calling handler.OnServiceSynced()") c.eventHandlers[i].OnServiceSynced() }
那麼問題來了,註冊進去的這個handler是啥? 回顧一下上文的
serviceEventHandler = proxierIPVS endpointsEventHandler = proxierIPVS
因此都是這個proxierIPVS
handler的回調函數, informer會回調這幾個函數,因此咱們在本身開發時實現這個interface註冊進去便可:
type ServiceHandler interface { // OnServiceAdd is called whenever creation of new service object // is observed. OnServiceAdd(service *api.Service) // OnServiceUpdate is called whenever modification of an existing // service object is observed. OnServiceUpdate(oldService, service *api.Service) // OnServiceDelete is called whenever deletion of an existing service // object is observed. OnServiceDelete(service *api.Service) // OnServiceSynced is called once all the initial even handlers were // called and the state is fully propagated to local cache. OnServiceSynced() }
go informerFactory.Start(wait.NeverStop)
這裏執行後,咱們建立刪除service endpoint等動做都會被監聽到,而後回調,回顧一下上面的圖,最終都是由Proxier去實現,因此後面咱們重點關注Proxier便可
s.Proxier.SyncLoop()
而後開始SyncLoop,下文開講
咱們建立一個service時OnServiceAdd方法會被調用, 這裏記錄一下以前的狀態與當前狀態兩個東西,而後發個信號給syncRunner讓它去處理:
func (proxier *Proxier) OnServiceAdd(service *api.Service) { namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() { proxier.syncRunner.Run() } }
記錄service 信息,能夠看到沒作什麼事,就是把service存在map裏, 若是沒變直接刪掉map信息不作任何處理:
change, exists := scm.items[*namespacedName] if !exists { change = &serviceChange{} // 老的service信息 change.previous = serviceToServiceMap(previous) scm.items[*namespacedName] = change } // 當前監聽到的service信息 change.current = serviceToServiceMap(current) 若是同樣,直接刪除 if reflect.DeepEqual(change.previous, change.current) { delete(scm.items, *namespacedName) }
proxier.syncRunner.Run() 裏面就發送了一個信號
select { case bfr.run <- struct{}{}: default: }
這裏面處理了這個信號
s.Proxier.SyncLoop() func (proxier *Proxier) SyncLoop() { // Update healthz timestamp at beginning in case Sync() never succeeds. if proxier.healthzServer != nil { proxier.healthzServer.UpdateTimestamp() } proxier.syncRunner.Loop(wait.NeverStop) }
runner裏收到信號執行,沒收到信號會按期執行:
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { glog.V(3).Infof("%s Loop running", bfr.name) bfr.timer.Reset(bfr.maxInterval) for { select { case <-stop: bfr.stop() glog.V(3).Infof("%s Loop stopping", bfr.name) return case <-bfr.timer.C(): // 按期執行 bfr.tryRun() case <-bfr.run: bfr.tryRun() // 收到事件信號執行 } } }
這個bfr runner裏咱們最須要主意的是一個回調函數,tryRun裏檢查這個回調是否知足被調度的條件:
type BoundedFrequencyRunner struct { name string // the name of this instance minInterval time.Duration // the min time between runs, modulo bursts maxInterval time.Duration // the max time between runs run chan struct{} // try an async run mu sync.Mutex // guards runs of fn and all mutations fn func() // function to run, 這個回調 lastRun time.Time // time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs } // 傳入的proxier.syncProxyRules這個函數 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
這是個600行左右的搓逼函數,也是處理主要邏輯的地方。
...(又臭又長,重複代碼多,看不下去了,細節問題本身去看吧)
serv := &utilipvs.VirtualServer{ Address: net.ParseIP(ingress.IP), Port: uint16(svcInfo.port), Protocol: string(svcInfo.protocol), Scheduler: proxier.ipvsScheduler, } if err := proxier.syncService(svcNameString, serv, false); err == nil { if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil { } }
看下實現, 若是沒有就建立,若是已存在就更新, 給網卡綁定service的cluster ip:
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error { appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs) if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) { if appliedVirtualServer == nil { if err := proxier.ipvs.AddVirtualServer(vs); err != nil { return err } } else { if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil { return err } } } // bind service address to dummy interface even if service not changed, // in case that service IP was removed by other processes if bindAddr { _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice) if err != nil { return err } } return nil }
如今能夠去看ipvs的AddVirtualServer的實現了,主要是利用socket與內核進程通訊作到的。pkg/util/ipvs/ipvs_linux.go
裏 runner結構體實現了這些方法, 這裏用到了 docker/libnetwork/ipvs庫:
// runner implements Interface. type runner struct { exec utilexec.Interface ipvsHandle *ipvs.Handle } // New returns a new Interface which will call ipvs APIs. func New(exec utilexec.Interface) Interface { ihandle, err := ipvs.New("") // github.com/docker/libnetwork/ipvs if err != nil { glog.Errorf("IPVS interface can't be initialized, error: %v", err) return nil } return &runner{ exec: exec, ipvsHandle: ihandle, } }
New的時候建立了一個特殊的socket, 這裏與咱們普通的socket編程無差異,關鍵是syscall.AF_NETLINK這個參數,表明與內核進程通訊:
sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC) func getNetlinkSocket(protocol int) (*NetlinkSocket, error) { fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol) if err != nil { return nil, err } s := &NetlinkSocket{ fd: int32(fd), } s.lsa.Family = syscall.AF_NETLINK if err := syscall.Bind(fd, &s.lsa); err != nil { syscall.Close(fd) return nil, err } return s, nil }
建立一個service, 轉換成docker service格式,直接調用:
// AddVirtualServer is part of Interface. func (runner *runner) AddVirtualServer(vs *VirtualServer) error { eSvc, err := toBackendService(vs) if err != nil { return err } return runner.ipvsHandle.NewService(eSvc) }
而後就是把service信息打包,往socket裏面寫便可:
func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) { req := newIPVSRequest(cmd) req.Seq = atomic.AddUint32(&i.seq, 1) if s == nil { req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute } else { req.AddData(fillService(s)) } // 把service塞到請求中 if d == nil { if cmd == ipvsCmdGetDest { req.Flags |= syscall.NLM_F_DUMP } } else { req.AddData(fillDestinaton(d)) } // 給內核進程發送service信息 res, err := execute(i.sock, req, 0) if err != nil { return [][]byte{}, err } return res, nil }
構造請求
func newIPVSRequest(cmd uint8) *nl.NetlinkRequest { return newGenlRequest(ipvsFamily, cmd) }
在構造請求時傳入的是ipvs協議簇
而後構造一個與內核通訊的消息頭
func NewNetlinkRequest(proto, flags int) *NetlinkRequest { return &NetlinkRequest{ NlMsghdr: syscall.NlMsghdr{ Len: uint32(syscall.SizeofNlMsghdr), Type: uint16(proto), Flags: syscall.NLM_F_REQUEST | uint16(flags), Seq: atomic.AddUint32(&nextSeqNr, 1), }, } }
給消息加Data,這個Data是個數組,須要實現兩個方法:
type NetlinkRequestData interface { Len() int // 長度 Serialize() []byte // 序列化, 內核通訊也須要必定的數據格式,service信息也須要實現 }
好比 header是這樣序列化的, 一看愣住了,思考很久纔看懂:
拆下看:
([unsafe.Sizeof(hdr)]byte) 一個*[]byte類型,長度就是結構體大小
(unsafe.Pointer(hdr))把結構體轉成byte指針類型
加個*取它的值
用[:]轉成byte返回
func (hdr *genlMsgHdr) Serialize() []byte { return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:] }
發送service信息給內核
一個很普通的socket發送接收數據
func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) { var ( err error ) if err := s.Send(req); err != nil { return nil, err } pid, err := s.GetPid() if err != nil { return nil, err } var res [][]byte done: for { msgs, err := s.Receive() if err != nil { return nil, err } for _, m := range msgs { if m.Header.Seq != req.Seq { continue } if m.Header.Pid != pid { return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid) } if m.Header.Type == syscall.NLMSG_DONE { break done } if m.Header.Type == syscall.NLMSG_ERROR { error := int32(native.Uint32(m.Data[0:4])) if error == 0 { break done } return nil, syscall.Errno(-error) } if resType != 0 && m.Header.Type != resType { continue } res = append(res, m.Data) if m.Header.Flags&syscall.NLM_F_MULTI == 0 { break done } } } return res, nil }
Service 數據打包
這裏比較細,核心思想就是內核只認必定格式的標準數據,咱們把service信息按其標準打包發送給內核便可。
至於怎麼打包的就不詳細講了。
func fillService(s *Service) nl.NetlinkRequestData { cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil) nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily)) if s.FWMark != 0 { nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark)) } else { nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol)) nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address)) // Port needs to be in network byte order. portBuf := new(bytes.Buffer) binary.Write(portBuf, binary.BigEndian, s.Port) nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes()) } nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName)) if s.PEName != "" { nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName)) } f := &ipvsFlags{ flags: s.Flags, mask: 0xFFFFFFFF, } nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize()) nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout)) nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask)) return cmdAttr }
Service整體來說代碼比較簡單,可是以爲有些地方實現的有點繞,不夠簡單直接。 整體來講就是監聽apiserver事件,而後比對 處理,按期也會去執行同步策略.
掃碼關注sealyun
探討可加QQ羣:98488045