衆所周知,Spark是一個快速、通用的大規模數據處理平臺,和Hadoop的MapReduce計算框架相似。可是相對於MapReduce,Spark憑藉其可伸縮、基於內存計算等特色,以及能夠直接讀寫Hadoop上任何格式數據的優點,使批處理更加高效,並有更低的延遲。實際上,Spark已經成爲輕量級大數據快速處理的統一平臺。
Spark做爲一個數據計算平臺和框架,更多的是關注Spark Application的管理,而底層實際的資源調度和管理更多的是依靠外部平臺的支持:html
Spark官方支持四種Cluster Manager:Spark standalone cluster manager、Mesos、YARN和Kubernetes。因爲咱們TalkingData是使用Kubernetes做爲資源的調度和管理平臺,因此Spark On Kubernetes對於咱們是最好的解決方案。node
目前市面上有不少搭建Kubernetes的方法,好比Scratch、Kubeadm、Minikube或者各類託管方案。由於咱們須要簡單快速地搭建功能驗證集羣,因此選擇了Kubeadm做爲集羣的部署工具。部署步驟很簡單,在master上執行:git
kubeadm init
在node上執行:github
kubeadm join --token : --discovery-token-ca-cert-hash sha256:
具體配置可見官方文檔:https://kubernetes.io/docs/setup/independent/create-cluster-kubeadm/。
須要注意的是因爲國內網絡限制,不少鏡像沒法從k8s.gcr.io獲取,咱們須要將之替換爲第三方提供的鏡像,好比:https://hub.docker.com/u/mirrorgooglecontainers/。docker
Kubernetes網絡默認是經過CNI實現,主流的CNI plugin有:Linux Bridge、MACVLAN、Flannel、Calico、Kube-router、Weave Net等。Flannel主要是使用VXLAN tunnel來解決pod間的網絡通訊,Calico和Kube-router則是使用BGP。因爲軟VXLAN對宿主機的性能和網絡有不小的損耗,BGP則對硬件交換機有必定的要求,且咱們的基礎網絡是VXLAN實現的大二層,因此咱們最終選擇了MACVLAN。
CNI MACVLAN的配置示例以下:apache
{ "name": "mynet", "type": "macvlan", "master": "eth0", "ipam": { "type": "host-local", "subnet": "10.0.0.0/17", "rangeStart": "10.0.64.1", "rangeEnd": "10.0.64.126", "gateway": "10.0.127.254", "routes": [ { "dst": "0.0.0.0/0" }, { "dst": "10.0.80.0/24", "gw": "10.0.0.61" } ] } }
Pod subnet是10.0.0.0/17,實際pod ip pool是10.0.64.0/20。cluster cidr是10.0.80.0/24。咱們使用的IPAM是host-local,規則是在每一個Kubernetes node上創建/25的子網,能夠提供126個IP。咱們還配置了一條到cluster cidr的靜態路由10.0.80.0/24,網關是宿主機。這是由於容器在macvlan配置下egress並不會經過宿主機的iptables,這點和Linux Bridge有較大區別。在Linux Bridge模式下,只要指定內核參數net.bridge.bridge-nf-call-iptables = 1,全部進入bridge的流量都會經過宿主機的iptables。通過分析kube-proxy,咱們發現可使用KUBE-FORWARD這個chain來進行pod到service的網絡轉發:後端
-A FORWARD -m comment --comment "kubernetes forward rules" -j KUBE-FORWARD -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -s 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod source rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT -A KUBE-FORWARD -d 10.0.0.0/17 -m comment --comment "kubernetes forwarding conntrack pod destination rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
最後經過KUBE-SERVICES使用DNAT到後端的pod。pod訪問其餘網段的話,就經過物理網關10.0.127.254。
還有一個須要注意的地方是出於kernel security的考慮,link物理接口的macvlan是沒法直接和物理接口通訊的,這就致使容器並不能將宿主機做爲網關。咱們採用了一個小技巧,避開了這個限制。咱們從物理接口又建立了一個macvlan,將物理IP移到了這個接口上,物理接口只做爲網絡入口:api
$ cat /etc/sysconfig/network-scripts/ifcfg-eth0 DEVICE=eth0 IPV6INIT=no BOOTPROTO=none $ cat /etc/sysconfig/network-scripts/ifcfg-macvlan DEVICE=macvlan NAME=macvlan BOOTPROTO=none ONBOOT=yes TYPE=macvlan DEVICETYPE=macvlan DEFROUTE=yes PEERDNS=yes PEERROUTES=yes IPV4_FAILURE_FATAL=no IPADDR=10.0.0.61 PREFIX=17 GATEWAY=10.0.127.254 MACVLAN_PARENT=eth0 MACVLAN_MODE=bridge
這樣兩個macvlan是能夠互相通訊的。網絡
默認配置下,Kubernetes使用kube-dns進行DNS解析和服務發現。但在實際使用時,咱們發如今pod上經過service domain訪問service老是有5秒的延遲。使用tcpdump抓包,發現延遲出如今DNS AAAA。進一步排查,發現問題是因爲netfilter在conntrack和SNAT時的Race Condition致使。簡言之,DNS A和AAAA記錄請求報文是並行發出的,這會致使netfilter在_nf_conntrack_confirm時認爲第二個包是重複的(由於有相同的五元組),從而丟包。具體可看我提的issue:https://github.com/kubernetes/kubernetes/issues/62628。一個簡單的解決方案是在/etc/resolv.conf中增長options single-request-reopen,使DNS A和AAAA記錄請求報文使用不一樣的源端口。我提的PR在:https://github.com/kubernetes/kubernetes/issues/62628,你們能夠參考。咱們的解決方法是不使用Kubernetes service,設置hostNetwork=true使用宿主機網絡提供DNS服務。由於咱們的基礎網絡是大二層,因此pod和node能夠直接通訊,這就避免了conntrack和SNAT。框架
因爲Spark的抽象設計,咱們可使用第三方資源管理平臺調度和管理Spark做業,好比Yarn、Mesos和Kubernetes。目前官方有一個experimental項目,能夠將Spark運行在Kubernetes之上:https://spark.apache.org/docs/latest/running-on-kubernetes.html。
當咱們經過spark-submit將Spark做業提交到Kubernetes集羣時,會執行如下流程:
因爲Spark driver和executor都運行在Kubernetes pod中,而且咱們使用Docker做爲container runtime enviroment,因此首先咱們須要創建Spark的Docker鏡像。
在Spark distribution中已包含相應腳本和Dockerfile,能夠經過如下命令構建鏡像:
$ ./bin/docker-image-tool.sh -r <repo> -t my-tag build $ ./bin/docker-image-tool.sh -r <repo> -t my-tag push
在構建Spark鏡像後,咱們能夠經過如下命令提交做業:
$ bin/spark-submit \ --master k8s://https://: \ --deploy-mode cluster \ --name spark-pi \ --class org.apache.spark.examples.SparkPi \ --jars https://path/to/dependency1.jar,https://path/to/dependency2.jar --files hdfs://host:port/path/to/file1,hdfs://host:port/path/to/file2 --conf spark.executor.instances=5 \ --conf spark.kubernetes.container.image= \ https://path/to/examples.jar
其中,Spark master是Kubernetes api server的地址,能夠經過如下命令獲取:
$ kubectl cluster-info Kubernetes master is running at http://127.0.0.1:6443
Spark的做業代碼和依賴,咱們能夠在--jars、--files和最後位置指定,協議支持http、https和HDFS。
執行提交命令後,會有如下輸出:
任務結束,會輸出:
咱們能夠在本地使用kubectl port-forward訪問Driver UI:
$ kubectl port-forward <driver-pod-name> 4040:4040
執行完後經過http://localhost:4040訪問。
Spark的全部日誌均可以經過Kubernetes API和kubectl CLI進行訪問:
$ kubectl -n=<namespace> logs -f <driver-pod-name>
在Kubernetes中,咱們可使用namespace在多用戶間實現資源分配、隔離和配額。Spark On Kubernetes一樣支持配置namespace建立Spark做業。
首先,建立一個Kubernetes namespace:
$ kubectl create namespace spark
因爲咱們的Kubernetes集羣使用了RBAC,因此還需建立serviceaccount和綁定role:
$ kubectl create serviceaccount spark -n spark $ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark
並在spark-submit中新增如下配置:
$ bin/spark-submit \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.kubernetes.namespace=spark \ ...
考慮到咱們Spark做業的一些特色和計算資源隔離,前期咱們仍是選擇了較穩妥的物理隔離方案。具體作法是爲每一個組提供單獨的Kubernetes namespace,計算任務都在各自namespace裏提交。計算資源以物理機爲單位,折算成cpu和內存,歸入Kubernetes統一管理。在Kubernetes集羣裏,經過node label和PodNodeSelector將計算資源和namespace關聯。從而實如今提交Spark做業時,計算資源老是選擇namespace關聯的node。
具體作法以下:
一、建立node label
$ kubectl label nodes <node_name> spark:spark
二、開啓Kubernetes admission controller
咱們是使用kubeadm安裝Kubernetes集羣,因此修改/etc/kubernetes/manifests/kube-apiserver.yaml,在--admission-control後添加PodNodeSelector。
$ cat /etc/kubernetes/manifests/kube-apiserver.yaml apiVersion: v1 kind: Pod metadata: annotations: scheduler.alpha.kubernetes.io/critical-pod: "" creationTimestamp: null labels: component: kube-apiserver tier: control-plane name: kube-apiserver namespace: kube-system spec: containers: - command: - kube-apiserver - --secure-port=6443 - --proxy-client-cert-file=/etc/kubernetes/pki/front-proxy-client.crt - --admission-control=Initializers,NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,NodeRestriction,ResourceQuota,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,PodNodeSelector ...
三、配置PodNodeSelector
在namespace的annotations中添加scheduler.alpha.kubernetes.io/node-selector: spark=spark。
apiVersion: v1 kind: Namespace metadata: annotations: scheduler.alpha.kubernetes.io/node-selector: spark=spark name: spark
完成以上配置後,能夠經過spark-submit測試結果:
$ spark-submit --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.kubernetes.namespace=spark --master k8s://https://xxxx:6443 --deploy-mode cluster --name spark-pi --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=5 --conf spark.kubernetes.container.image=xxxx/library/spark:v2.3 http://xxxx:81/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar
咱們能夠看到,Spark做業全分配到了關聯的hadooptest-001到003三個node上。
Kubernetes的集羣狀態基本都保存在etcd中,因此etcd是HA的關鍵所在。因爲咱們目前還處在半生產狀態,HA這方面未過多考慮。有興趣的同窗能夠查看:https://kubernetes.io/docs/setup/independent/high-availability/。
在Spark On Yarn下,能夠開啓yarn.log-aggregation-enable將日誌收集聚合到HDFS中,以供查看。可是在Spark On Kubernetes中,則缺乏這種日誌收集機制,咱們只能經過Kubernetes pod的日誌輸出,來查看Spark的日誌:
$ kubectl -n=<namespace> logs -f <driver-pod-name>
收集和聚合日誌,咱們後面會和ES結合。
監控
咱們TalkingData內部有本身的監控平臺OWL[2](已開源),將來咱們計劃編寫metric plugin,將Kubernetes接入OWL中。
混合部署
爲了保證Spark做業時刻有可用的計算資源,咱們前期採用了物理隔離的方案。顯而易見,這種方式大幅下降了物理資源的使用率。下一步咱們計劃採用混部方案,經過如下三種方式實現:
在採用如下兩種方法增長資源使用率時,集羣可能會面臨資源短缺和可用性的問題:
這會致使運行資源大於實際物理資源的狀況(我稱之爲資源擠兌)。一種作法是給資源劃分等級,優先保證部分等級的資源供給。另外一種作法是實現資源的水平擴展,動態補充可用資源,並在峯值事後自動釋放。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。