解析 | openshift源碼簡析之pod網絡配置(下)

【編者按】openshift底層是經過kubelet來管理pod,kubelet經過CNI插件來配置pod網絡.openshift node節點在啓動的時會在一個goroutine中啓動kubelet, 由kubelet來負責pod的管理工做。node

本文主要從源碼的角度入手,簡單分析在openshift環境下kubelet是如何經過調用openshift sdn插件來配置pod網絡。git

上一節分析了openshift-sdn插件是如何配置Pod網絡的,本節分析openshift-sdn插件獲取Pod IP時cniServer的處理流程。
CNIServer流程github

在上面的分析中咱們知道,openshift-sdn插件是經過方法doCNIServerAdd向cniserver來請求IP的,那cniserver是如何處理請求的呢?咱們先來看cniServer的邏輯。
cniServer的定義位於openshit代碼庫的pkg/network/node/cniserver/cniserver.go文件,定義以下:json

1type CNIServer struct {
2 http.Server
3 requestFunc cniRequestFunc
4 rundir string
5 config *Config
6}
它包括了一個http server,以及一個處理請求的handler cniRequestFunc, 還有一些配置相關的字段。
cniSever的構造器方法位於pkg/network/node/cniserver/cniserver.go#L120, 內容以下:api

1// Create and return a new CNIServer object which will listen on a socket in the given path
2func NewCNIServer(rundir string, config Config) CNIServer {
3 router := mux.NewRouter()
4
5 s := &CNIServer{
6 Server: http.Server{
7 Handler: router,
8 },
9 rundir: rundir,
10 config: config,
11 }
12 router.NotFoundHandler = http.HandlerFunc(http.NotFound)
13 router.HandleFunc("/", s.handleCNIRequest).Methods("POST")
14 return s
15}
從上面第13行的代碼能夠看出,該server只處理一條POST方法的路由,處理請求的handler是handleCNIRequest這個方法,該方法的定義位於 pkg/network/node/cniserver/cniserver.go#L277,內容以下:網絡

1// Dispatch a pod request to the request handler and return the result to the
2// CNI server client
3func (s CNIServer) handleCNIRequest(w http.ResponseWriter, r http.Request) {
4 req, err := cniRequestToPodRequest(r)
5 if err != nil {
6 http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
7 return
8 }
9
10 glog.V(5).Infof("Waiting for %s result for pod %s/%s", req.Command, req.PodNamespace, req.PodName)
11 result, err := s.requestFunc(req)
12 if err != nil {
13 http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
14 } else {
15 // Empty response JSON means success with no body
16 w.Header().Set("Content-Type", "application/json")
17 if _, err := w.Write(result); err != nil {
18 glog.Warningf("Error writing %s HTTP response: %v", req.Command, err)
19 }
20 }
21}
從第11行能夠看出,該方法又是調用requestFunc這個方法來處理請求,請求結束後經過w.Write或者是http.Error返回調用者的response。requestFunc是在cniserver的Start的方法中傳入的,傳入的其實是podManager的handleCNIRequest方法,該方法位於文件pkg/network/node/pod.go#L25,內容以下:app

1// Enqueue incoming pod requests from the CNI server, wait on the result,
2// and return that result to the CNI client
3func (m podManager) handleCNIRequest(request cniserver.PodRequest) ([]byte, error) {
4 glog.V(5).Infof("Dispatching pod network request %v", request)
5 m.addRequest(request)
6 result := m.waitRequest(request)
7 glog.V(5).Infof("Returning pod network request %v, result %s err %v", request, string(result.Response), result.Err)
8 return result.Response, result.Err
9}
在第5行該方法先經過addRequest方法把請求放到一個隊列裏面,而後調用第6行的waitRequest等待請求執行完成。
addRequest定義位於pkg/network/node/pod.go#L240, 內容以下:socket

1// Add a request to the podManager CNI request queue
2func (m podManager) addRequest(request cniserver.PodRequest) {
3 m.requests <- request
4}
能夠看出請求被放到了m.requests這個channel裏面,也就是在這裏用channel作的隊列。
waitRequest是從一個channel裏取出結果,定義位於pkg/network/node/pod.go#L245,內容以下:工具

1// Wait for and return the result of a pod request
2func (m podManager) waitRequest(request cniserver.PodRequest) *cniserver.PodResult {
3 return <-request.Result
4}
剛纔說了addRequest會把請求放到m.requests這個隊列裏面,那隊列裏的請求是如何被執行的呢?答案就是podManager在啓動時會在一個gorotine裏調用processCNIRequests這個方法,該方法會循環的從m.requests這個channel裏面取出請求執行。processCNIRequests定義位於pkg/network/node/pod.go#L286,內容以下:oop

1// Process all CNI requests from the request queue serially. Our OVS interaction
2// and scripts currently cannot run in parallel, and doing so greatly complicates
3// setup/teardown logic
4func (m *podManager) processCNIRequests() {
5 for request := range m.requests {
6 glog.V(5).Infof("Processing pod network request %v", request)
7 result := m.processRequest(request)
8 glog.V(5).Infof("Processed pod network request %v, result %s err %v", request, string(result.Response), result.Err)
9 request.Result <- result
10 }
11 panic("stopped processing CNI pod requests!")
12}
能夠看出該方法經過一個for循環不斷的從m.requests裏面取出請求,而後調用processRequest方法來處理請求,最後把處理的結果在放到request.Result裏面由上面的waitRequest來獲取。
咱們來分析processRequest方法的執行邏輯,該方法定義位於pkg/network/node/pod.go#L296,內容以下:

1func (m podManager) processRequest(request cniserver.PodRequest) *cniserver.PodResult {
2 m.runningPodsLock.Lock()
3 defer m.runningPodsLock.Unlock()
4
5 pk := getPodKey(request)
6 result := &cniserver.PodResult{}
7 switch request.Command {
8 case cniserver.CNI_ADD:
9 ipamResult, runningPod, err := m.podHandler.setup(request)
10 if ipamResult != nil {
11 result.Response, err = json.Marshal(ipamResult)
12 if err == nil {
13 m.runningPods[pk] = runningPod
14 if m.ovs != nil {
15 m.updateLocalMulticastRulesWithLock(runningPod.vnid)
16 }
17 }
18 }
19 if err != nil {
20 PodOperationsErrors.WithLabelValues(PodOperationSetup).Inc()
21 result.Err = err
22 }
23 case cniserver.CNI_UPDATE:
24 vnid, err := m.podHandler.update(request)
25 if err == nil {
26 if runningPod, exists := m.runningPods[pk]; exists {
27 runningPod.vnid = vnid
28 }
29 }
30 result.Err = err
31 case cniserver.CNI_DEL:
32 if runningPod, exists := m.runningPods[pk]; exists {
33 delete(m.runningPods, pk)
34 if m.ovs != nil {
35 m.updateLocalMulticastRulesWithLock(runningPod.vnid)
36 }
37 }
38 result.Err = m.podHandler.teardown(request)
39 if result.Err != nil {
40 PodOperationsErrors.WithLabelValues(PodOperationTeardown).Inc()
41 }
42 default:
43 result.Err = fmt.Errorf("unhandled CNI request %v", request.Command)
44 }
45 return result
46}
能夠看出該方法針對request.Command的三種不一樣取值有三部分邏輯來分別處理,咱們重點分析Command等於cniserver.CNI_ADD時的邏輯,也就是前面調用openshift-sdn時傳遞ADD參數的處理邏輯。在Command等於cniserver.CNI_ADD部分的代碼主要是調用第9行的podHandler的setup方法,該方法的定義位於pkg/network/node/pod.go#L497,內容以下:

1// Set up all networking (host/container veth, OVS flows, IPAM, loopback, etc)
2func (m podManager) setup(req cniserver.PodRequest) (cnitypes.Result, *runningPod, error) {
3 defer PodOperationsLatency.WithLabelValues(PodOperationSetup).Observe(sinceInMicroseconds(time.Now()))
4
5 pod, err := m.kClient.Core().Pods(req.PodNamespace).Get(req.PodName, metav1.GetOptions{})
6 if err != nil {
7 return nil, nil, err
8 }
9
10 ipamResult, podIP, err := m.ipamAdd(req.Netns, req.SandboxID)
11 if err != nil {
12 return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.SandboxID, err)
13 }
14
15 // Release any IPAM allocations and hostports if the setup failed
16 var success bool
17 defer func() {
18 if !success {
19 m.ipamDel(req.SandboxID)
20 if mappings := m.shouldSyncHostports(nil); mappings != nil {
21 if err := m.hostportSyncer.SyncHostports(Tun0, mappings); err != nil {
22 glog.Warningf("failed syncing hostports: %v", err)
23 }
24 }
25 }
26 }()
27
28 // Open any hostports the pod wants
29 var v1Pod v1.Pod
30 if err := kapiv1.Convert_core_Pod_To_v1_Pod(pod, &v1Pod, nil); err != nil {
31 return nil, nil, err
32 }
33 podPortMapping := kubehostport.ConstructPodPortMapping(&v1Pod, podIP)
34 if mappings := m.shouldSyncHostports(podPortMapping); mappings != nil {
35 if err := m.hostportSyncer.OpenPodHostportsAndSync(podPortMapping, Tun0, mappings); err != nil {
36 return nil, nil, err
37 }
38 }
39
40 vnid, err := m.policy.GetVNID(req.PodNamespace)
41 if err != nil {
42 return nil, nil, err
43 }
44
45 if err := maybeAddMacvlan(pod, req.Netns); err != nil {
46 return nil, nil, err
47 }
48
49 ofport, err := m.ovs.SetUpPod(req.SandboxID, req.HostVeth, podIP, vnid)
50 if err != nil {
51 return nil, nil, err
52 }
53 if err := setupPodBandwidth(m.ovs, pod, req.HostVeth, req.SandboxID); err != nil {
54 return nil, nil, err
55 }
56
57 m.policy.EnsureVNIDRules(vnid)
58 success = true
59 return ipamResult, &runningPod{podPortMapping: podPortMapping, vnid: vnid, ofport: ofport}, nil
60}
該方法的主要邏輯有兩個,一是第10行調用m.ipamAdd獲取IP,這裏涉及到IPAM,後面單獨分析;另外一個是第49行調用ovs.SetUpPod設置OVS規則,後面也會單獨分析。

至此,openshfit-sdn請求IP時cniServer的處理流程分析結束,下節咱們分析cniServer如何調用IPAM插件來管理IP。

上面分析了openshfit-sdn請求IP時cniServer的處理流程,這一節咱們分析cniServer調用IPAM插件來管理IP的邏輯。

IPAM
cniServer是調用IPAM插件host-local來作IP管理的,該插件位於/opt/cni/bin目錄,是一個預編譯的二進制可執行程序。本節將從IP的分配和釋放兩方面來分析cniServer跟host-local的交互流程。

IP分配
前面章節說了cniServer是調用了podManager的ipamAdd方法來獲取IP的,那它又是如何同host-local插件交互的呢,咱們來展開分析。
ipamAdd方法的定義位於pkg/network/node/pod.go#L422, 內容以下:

1// Run CNI IPAM allocation for the container and return the allocated IP address
2func (m podManager) ipamAdd(netnsPath string, id string) (cni020.Result, net.IP, error) {
3 if netnsPath == "" {
4 return nil, nil, fmt.Errorf("netns required for CNI_ADD")
5 }
6
7 args := createIPAMArgs(netnsPath, m.cniBinPath, cniserver.CNI_ADD, id)
8 r, err := invoke.ExecPluginWithResult(m.cniBinPath+"/host-local", m.ipamConfig, args)
9 if err != nil {
10 return nil, nil, fmt.Errorf("failed to run CNI IPAM ADD: %v", err)
11 }
12
13 // We gave the IPAM plugin 0.2.0 config, so the plugin must return a 0.2.0 result
14 result, err := cni020.GetResult(r)
15 if err != nil {
16 return nil, nil, fmt.Errorf("failed to parse CNI IPAM ADD result: %v", err)
17 }
18 if result.IP4 == nil {
19 return nil, nil, fmt.Errorf("failed to obtain IP address from CNI IPAM")
20 }
21
22 return result, result.IP4.IP.IP, nil
23}
上面代碼第7行先經過createIPAMArgs方法構建一個參數變量args,變量定義以下:

1struct {
2 Command string
3 ContainerID string
4 NetNS string
5 PluginArgs [][2]string
6 PluginArgsStr string
7 IfName string
8 Path string
9}
構建後的變量的Command的值是「ADD」,這樣在調用host-local時就會執行ADD相關的操做。
第8行經過invoke.ExecPluginWithResult來調用執行host-local插件,傳入了上面建立的參數變量args,同時傳入了一個變量ipamConfig,ipamConfig裏面包含了pod所在node的子網相關配置以及一些host-local插件的配置,內容相似以下:

1{
2 "cniVersion":"0.3.1",
3 "name":"examplenet",
4 "ipam":{
5 "type":"host-local",
6 "ranges":[
7 [
8 {
9 "subnet":"203.0.113.0/24"
10 }
11 ]
12 ],
13 "dataDir":"/tmp/cni-example"
14 }
15}
調用host-local相似以下命令:

1echo '{ "cniVersion": "0.3.1", "name": "examplenet", "ipam": { "type": "host-local", "ranges": [ [{"subnet": "203.0.113.0/24"}]], "dataDir": "/tmp/cni-example" } }' | CNI_COMMAND=ADD CNI_CONTAINERID=example CNI_NETNS=/proc/48776/ns/net CNI_IFNAME=eth0 CNI_PATH=/opt/cni/bin /opt/cni/bin/host-local
調用返回的resut的值相似:

1{
2 "ips":[
3 {
4 "version":"4",
5 "address":"203.0.113.2/24",
6 "gateway":"203.0.113.1"
7 }
8 ]
9}
獲取的IP信息以及網關信息在上面代碼的第22行返回給調用者,也就是第三節中分析的podManager的setup方法的第10行。

IP釋放

當cniServer接收到釋放IP的請求時,會調用podManager的ipamDel方法,定義位於pkg/network/node/pod.go#L445,內容以下:

1// Run CNI IPAM release for the container
2func (m *podManager) ipamDel(id string) error {
3 args := createIPAMArgs("", m.cniBinPath, cniserver.CNI_DEL, id)
4 err := invoke.ExecPluginWithoutResult(m.cniBinPath+"/host-local", m.ipamConfig, args)
5 if err != nil {
6 return fmt.Errorf("failed to run CNI IPAM DEL: %v", err)
7 }
8 return nil
9}

該方法的邏輯跟ipamAdd同樣,都是經過調用host-local插件來完成相應的操做,不一樣的是該方法在調用時傳入了一個Command等於CNI_DEL的args,這樣在調用host-local時就會執行IP釋放的相關操做。
host-local會把全部已經分配過的IP記錄到本地,也就是ipamConfig配置的dataDir目錄下,在openshit環境下是記錄到/var/lib/cni/networks/openshift-sdn目錄下。目錄下的內容相似以下:

1[root@master227 ~]# ls /var/lib/cni/networks/openshift-sdn
210.128.0.114 10.128.0.116 last_reserved_ip.0
3[root@master227 ~]#
上面列出的每個以ip命名的文件都表明一個已經分配的IP,它的內容是該IP所在的pod的ID. 內容相似以下:

1[root@master227 ~]# cat /var/lib/cni/networks/openshift-sdn/10.128.0.114
27a1c2e242c2a2d750382837b81283952ad9878ae496195560f9854935d7e4d31[root@master227 ~]#
當分配IP時,host-local會在該目錄下添加一條記錄,釋放IP時會刪除相應的記錄。

關於host-local的邏輯再也不做分析,後面會有單獨的章節來分析,有興趣的能夠看看源碼,位於https://github.com/containern...

至此,IPAM的邏輯分析結束,下一節咱們分析cniServer是如何調用ovs controller來設置Pod ovs規則。

上面咱們分析了cniServer是如何經過IPAM插件來管理IP,本節主要分析cniServer是如何經過ovs controller設置pod相關的ovs規則。
OVS規則設置

openshift底層的網絡用的是ovs, 那麼在配置好pod IP以後,又是如何設置跟pod相關的ovs規則的呢?下面做一分析。
openshift node在啓動時會建立一個ovs controller,由它來完成ovs網絡配置的各類操做。在第三節咱們分析過,cniServer是經過調用ovs controller的SetUpPod方法來設置pod ovs規則,調用的代碼位於: pkg/network/node/pod.go#L544, 內容以下:

1ofport, err := m.ovs.SetUpPod(req.SandboxID, req.HostVeth, podIP, vnid)
SetUpPod的定義位於pkg/network/node/ovscontroller.go#L267,內容以下:

1func (oc *ovsController) SetUpPod(sandboxID, hostVeth string, podIP net.IP, vnid uint32) (int, error) {
2 ofport, err := oc.ensureOvsPort(hostVeth, sandboxID, podIP.String())
3 if err != nil {
4 return -1, err
5 }
6 return ofport, oc.setupPodFlows(ofport, podIP, vnid)
7}
在上面代碼的第2行,SetUpPod又調用了ensureOvsPort這個方法,該方法的定義位於pkg/network/node/ovscontroller.go#L227,內容以下:

1func (oc *ovsController) ensureOvsPort(hostVeth, sandboxID, podIP string) (int, error) {
2 return oc.ovs.AddPort(hostVeth, -1,
3 fmt.Sprintf(external-ids=sandbox="%s",ip="%s", sandboxID, podIP),
4 )
5}
如代碼所示,該方法又調用了ovs的AddPort方法,咱們再來分析AddPort方法。該方法的定義位於pkg/util/ovs/ovs.go#L31,內容以下:

1func (ovsif *ovsExec) AddPort(port string, ofportRequest int, properties ...string) (int, error) {
2 args := []string{"--may-exist", "add-port", ovsif.bridge, port}
3 if ofportRequest > 0 || len(properties) > 0 {
4 args = append(args, "--", "set", "Interface", port)
5 if ofportRequest > 0 {
6 args = append(args, fmt.Sprintf("ofport_request=%d", ofportRequest))
7 }
8 if len(properties) > 0 {
9 args = append(args, properties...)
10 }
11 }
12 _, err := ovsif.exec(OVS_VSCTL, args...)
13 if err != nil {
14 return -1, err
15 }
16 ofport, err := ovsif.GetOFPort(port)
17 if err != nil {
18 return -1, err
19 }
20 if ofportRequest > 0 && ofportRequest != ofport {
21 return -1, fmt.Errorf("allocated ofport (%d) did not match request (%d)", ofport, ofportRequest)
22 }
23 return ofport, nil
24}
分析上面的代碼你會發現,AddPort其實是調用了底層的ovs-vsctl命令將pod的host端的虛擬網卡加入到了ovs網橋br0上,這樣br0上的流量就能夠經過該網卡進入pod了。該方法的調用相似於下面的命令行,假設pod host端的網卡是veth3258a5e2:

1ovs-vsctl --may-exist add-port br0 veth3258a5e2
接着回到SetUpPod方法,在第6行中調用了setupPodFlows來設置pod IP的ovs規則,該方法的定義位於pkg/network/node/ovscontroller.go#L233,內容以下:

1func (oc *ovsController) setupPodFlows(ofport int, podIP net.IP, vnid uint32) error {
2 otx := oc.ovs.NewTransaction()
3
4 ipstr := podIP.String()
5 podIP = podIP.To4()
6 ipmac := fmt.Sprintf("00:00:xx:x/00:00:ff:ff:ff:ff", podIP[0], podIP[1], podIP[2], podIP[3])
7
8 // ARP/IP traffic from container
9 otx.AddFlow("table=20, priority=100, in_port=%d, arp, nw_src=%s, arp_sha=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, ipstr, ipmac, vnid)
10 otx.AddFlow("table=20, priority=100, in_port=%d, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:21", ofport, ipstr, vnid)
11 if oc.useConnTrack {
12 otx.AddFlow("table=25, priority=100, ip, nw_src=%s, actions=load:%d->NXM_NX_REG0[], goto_table:30", ipstr, vnid)
13 }
14
15 // ARP request/response to container (not isolated)
16 otx.AddFlow("table=40, priority=100, arp, nw_dst=%s, actions=output:%d", ipstr, ofport)
17
18 // IP traffic to container
19 otx.AddFlow("table=70, priority=100, ip, nw_dst=%s, actions=load:%d->NXM_NX_REG1[], load:%d->NXM_NX_REG2[], goto_table:80", ipstr, vnid, ofport)
20
21 return otx.Commit()
22}
在上面代碼的第9行到第19行,分別調用了AddFlow來設置各類ovs規則,第9行到第10行設置了從pod出去的ARP/IP流量的規則,第16行設置了進入POD的ARP流量規則,第19行設置了進入POD的IP流量規則。 AddFlow其實是調用了命令行工具ovs-ofctl來設置各類ovs規則。關於這些規則的詳細內容再也不做分析,感興趣的同窗能夠自行研究。

至此,ovs規則的設置流程分析完畢,openshit pod網絡配置的流程也所有分析完畢。

相關文章
相關標籤/搜索