以前就瞭解過kafka,看的似懂非懂,最近項目組中引入了kafka,恰好接着這個機會再次學習下。
Kafka在不少公司被用做分佈式高性能消息隊列,kafka以前我只用過redis的list來作簡單的隊列處理,也還算好用,可能數據量比較小,也是單機運行,未出現過問題,用做輕量級消息隊列仍是比較好用的。而redis的做者antirez,設計redis的初衷並非用來作消息隊列,但用它作消息隊列的人貌似還挺多,以致於後來antirez後來新開了個項目disque,專門用來作消息隊列,但這個不是本文的重點。node
在瞭解kafka的時候,發現他與zookeeper綁定的比較緊密,爲了更好的理解kafka,我必須先將zookeeper搞明白。
ZooKeeper是一種分佈式協調服務,用於管理大型主機。在分佈式環境中協調和管理服務是一個複雜的過程。ZooKeeper經過其簡單的架構和API解決了這個問題。 ZooKeeper容許開發人員專一於核心應用程序邏輯,而沒必要擔憂應用程序的分佈式特性。python
這是從互聯網上引用的一段話,分佈式應用不一樣於單機引用,維護起來很是複雜,如今的分佈式系統大部分已經離不開zookeeper(或者相似的解決方案)了,zookeeper簡化了分佈式應用的管理和部署,本文就經過實例來探討學習下zookeeper。nginx
本人也是持着學習的態度來寫本篇文章的,後文的實例都未在生產環境中使用過,都是學習以後的實踐整理,偏向於應用,對其中的算法原理並未深究。有瑕疵遺漏的地方還望斧正。git
假如,咱們線上有個服務器集羣,成百上千臺服務器,若是更新代碼的時候怎麼更新呢,一臺臺機器去更新?就算是強大的麒麟臂爬也要累折了o(╯□╰)o,今天咱們就試試用zookeeper來給服務器集羣部署代碼。github
zookeeper提供了節點watch的功能,zookeeper的client(對外提供服務的server)監控zookeeper上的節點(znode),當節點變更的時候,client會收到變更事件和變更後的內容,基於zookeeper的這個特性,咱們能夠給服務器集羣中的全部機器(client)都註冊watch事件,監控特定znode,節點中存儲部署代碼的配置信息,須要更新代碼的時候,修改znode中的值,服務器集羣中的每一臺server都會收到代碼更新事件,而後觸發調用,更新目標代碼。也能夠很容易的橫向擴展,能夠隨意的增刪機器,機器啓動的時候註冊監控節點事件便可。redis
個人機器數量有限,在本地模擬zookeeper集羣和服務器集羣,原理都是同樣的,可能具體實施的時候有些小異。算法
在本機經過3個端口模擬zookeeper集羣,多個目錄模擬服務器集羣。docker
本文只是模擬,爲了方便,全部的節點全在一臺機器上,效果是相似的。apache
建立/path/to/zookeeper/conf/zoo1.cfg
,/path/to/zookeeper/conf/zoo2.cfg
,/path/to/zookeeper/conf/zoo3.cfg
三個文件,配置分別以下:json
zoo1.cfg
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/tmp/zk1/data dataLogDir=/tmp/zk1/log clientPort=2181 server.1=localhost:2888:3888 server.2=localhost:2899:3899 server.3=localhost:2877:3877
zoo2.cfg
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/tmp/zk2/data dataLogDir=/tmp/zk2/log clientPort=2182 server.1=localhost:2888:3888 server.2=localhost:2899:3899 server.3=localhost:2877:3877
zoo3.cfg
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/tmp/zk3/data dataLogDir=/tmp/zk3/log clientPort=2183 server.1=localhost:2888:3888 server.2=localhost:2899:3899 server.3=localhost:2877:3877
配置文件中dataDir
,dataLogDir
,clientPort
這三個配置是有差異的。
分別在3個節點對應的dataDir
中創建myid
文件,裏面輸入服務器標識號
echo 1 > /tmp/zk1/data/myid echo 2 > /tmp/zk2/data/myid echo 3 > /tmp/zk3/data/myid
啓動三個節點
bin/zkServer.sh start conf/zoo1.cfg bin/zkServer.sh start conf/zoo2.cfg bin/zkServer.sh start conf/zoo3.cfg
查看三個節點,能夠看到一、3號接節點是follower節點,2號節點是leader節點
➜ zookeeper bin/zkServer.sh status conf/zoo3.cfg ZooKeeper JMX enabled by default Using config: conf/zoo3.cfg Mode: follower ➜ zookeeper bin/zkServer.sh status conf/zoo2.cfg ZooKeeper JMX enabled by default Using config: conf/zoo2.cfg Mode: leader ➜ zookeeper bin/zkServer.sh status conf/zoo1.cfg ZooKeeper JMX enabled by default Using config: conf/zoo1.cfg Mode: follower
from kazoo.client import KazooClient import time import json import subprocess import os zk = KazooClient(hosts="10.222.76.148:2181, 10.222.76.148:2182, 10.222.76.148:2183") zk.start() FILE_DIR = os.path.split(os.path.realpath(__file__))[0] '''切換到指定文件夾,不存在的話建立並切換''' def go_dir(dir_name): if os.path.exists(dir_name): pass else: os.makedirs(dir_name) os.chdir(dir_name) '''從git獲取代碼''' def handle_watch(data): try: info = json.loads(data) if not isinstance(info, dict): raise Exception("節點數據不是json穿") if not "relativePath" in info: raise Exception("節點json缺乏[relativePath]字段") if not "url" in info: raise Exception("節點json缺乏[url]字段") if not "commitId" in info: raise Exception("節點json缺乏[commitId]字段") chdir = os.path.join(FILE_DIR, info["relativePath"]) go_dir(chdir) print("開始執行git clone ...") res = subprocess.call(['git', 'status']) if 0 == res: res = subprocess.call(['git', 'pull']) else: res = subprocess.call(['git', 'clone', info["url"], '.']) if 0 != res: raise Exception("clone/pull代碼失敗") commitId = subprocess.check_output(["git", "rev-parse", "HEAD"]) commitId = commitId.decode() commitId = commitId.strip() if commitId != info["commitId"]: raise Exception("正確版本Id[%s],當前版本Id[%s]" % (commitId, info["commitId"])) except Exception as e: print(e) print("更新失敗") return 1 else: print("正確版本Id[%s],當前版本Id[%s]" % (commitId, info["commitId"])) print("更新成功") return 0 finally: pass @zk.DataWatch("/app/business/config") def watch_node(data, stat): if data: data = data.decode("utf-8") handle_watch(data) else: print("數據爲空") while True: time.sleep(100) print('tick')
新建2個文件夾模擬server集羣,複製client.py
到每一個服務器中
mkdir /tmp/server1 mkdir /tmp/server2
分別運行服務器上監控zookeeper節點變更的代碼:
python3 /tmp/server1/client.py python3 /tmp/server2/client.py
啓動以後,像znode節點/app/business/config
中寫入信息:
from kazoo.client import KazooClient import json zk = KazooClient(hosts="192.168.0.105:2181, 192.168.0.105:2182, 192.168.0.105:2183") zk.start() znode = { "url": "https://github.com/aizuyan/daemon.git", "commitId": "d5f5f144c66f0a36d452e9e13067b21d3d89b743", "relativePath": "daemon" } znode = json.dumps(znode) znode = bytes(znode, encoding="utf-8") zk.set("/app/business/config", znode);
寫完以後,會看到上面兩個模擬的服務器會立刻收到信息:
開始執行git clone ... On branch master Your branch is up-to-date with 'origin/master'. nothing to commit, working tree clean Already up-to-date. 正確版本Id[aea4096f490ff9556124fa5059ca702cc2acdf0e],當前版本Id[aea4096f490ff9556124fa5059ca702cc2acdf0e] 更新成功 開始執行git clone ... On branch master Your branch is up-to-date with 'origin/master'. nothing to commit, working tree clean Already up-to-date. 正確版本Id[aea4096f490ff9556124fa5059ca702cc2acdf0e],當前版本Id[aea4096f490ff9556124fa5059ca702cc2acdf0e] 更新成功
配合上git的hook機制,能夠作一個完整的系統,當代碼有更新的時候更新保存代碼信息znode上的數據,zookeeper push到全部watch這個節點的服務器,服務器更新代碼,全部服務器完成一次更新操做。
註冊一個持久節點/service/business/what
,他下面的每一個子節點都是一個可用服務,保存了服務的地址端口等信息,服務調用者經過zookeeper獲取/service/business/what
全部子節點信息來獲得可用的服務。下面的節點都是臨時節點,服務器啓動的時候會過來註冊一個臨時節點,服務器掛掉以後或主動關閉以後,臨時節點會自動移除,這樣就能夠保證使用者獲取的what服務都是可用的,並且能夠動態的擴容縮容。
我在本地經過docker來模擬服務器集羣,集羣中的全部nginx都經過各自的80端口對外提供服務。經過python-nmap
定時掃描端口占用狀況,若是是open
狀態則可對外提供服務,若是是closed
狀態,則中止對外提供服務。若是因爲網絡抖動刪除了臨時節點,網絡恢復以後,會從新掃描到自身服務可用,而後建立臨時節點。
容器中啓動一個nginx,經過一個進程監控nginx綁定的端口,當端口對外提供服務時,我就認爲服務可用,當端口中止對外提供服務時,我就認爲服務不可用,相應的刪除或者建立臨時節點,代碼以下所示:
from kazoo.client import KazooClient import time import nmap import os import json ZNODE_BASE_PATH="/service/business/what/" zk = KazooClient( hosts="192.168.0.105:2181, 192.168.0.105:2182, 192.168.0.105:2183" ) zk.start() znode = ZNODE_BASE_PATH+"/s"+os.environ["PORT"] def get_server_info(): server_info = (os.environ["URL"], os.environ["PORT"]) return server_info def is_port_run(ip, port): nm = nmap.PortScanner() info = nm.scan(ip, port) state = info['scan'][ip]['tcp'][int(port)]['state'] ret = False if state == "open": ret = True return ret server_info = get_server_info() server_info = json.dumps(server_info).encode("utf-8") while True: time.sleep(2) is_alive = is_port_run("127.0.0.1", "80") if is_alive: if not zk.exists(znode): zk.create(znode, server_info, ephemeral=True, makepath=True) else: if zk.exists(znode): zk.delete(znode)
每一個服務器綁定的端口信息經過docker運行的時候傳入參數決定,這樣就能夠經過同一個鏡像方便的建立多個容器實例了,方便快捷,下面是dockerfile
:
FROM python:latest MAINTAINER Liam Yan # 擴充源 RUN grep '^deb ' /etc/apt/sources.list | sed 's/^deb/deb-src/g' > /etc/apt/sources.list.d/deb-src.list RUN apt-get update -y RUN apt-get install nginx -y RUN mkdir /usr/share/nginx/logs RUN apt-get install nmap -y RUN pip3 install python-nmap RUN pip3 install kazoo ADD nginx.conf /etc/nginx/nginx.conf ADD is_alive.py /usr/local/is_alive.py ADD run.sh /usr/local/run.sh EXPOSE 80 CMD ["/bin/bash", "/usr/local/run.sh"]
其中nginx.conf
是容器中的nginx配置文件,最簡單的就能夠,只要能夠驗證該服務器是否可用便可,但必定要注意,要在nginx配置文件中加入daemon off;
,否則docker可能會啓動以後立刻退出。is_alive.py
就是上面的用來檢測容器中的服務是否可用。run.sh
內容以下,啓動一個後臺監控進程以後,再啓動nginx。
nohup python3 /usr/local/is_alive.py & nginx
經過dockerfile建立鏡像docker build --rm -t zookeeper_test .
,建立成功以後運行5個服務器:
docker run -e "URL=127.0.0.1" -e "PORT=9099" --name yrt5 -p 9099:80 -d nzookeeper_test docker run -e "URL=127.0.0.1" -e "PORT=9098" --name yrt4 -p 9098:80 -d nzookeeper_test docker run -e "URL=127.0.0.1" -e "PORT=9097" --name yrt3 -p 9097:80 -d nzookeeper_test docker run -e "URL=127.0.0.1" -e "PORT=9096" --name yrt2 -p 9096:80 -d nzookeeper_test docker run -e "URL=127.0.0.1" -e "PORT=9095" --name yrt1 -p 9095:80 -d nzookeeper_test
啓動以後運行docker ps -a
,能夠看到,端口能夠隨便取,只要別衝突就行,
➜ zookeeper git:(master) docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 5ae23ae351ed nginx_python_alive "/bin/bash /usr/loca…" Less than a second ago Up 2 seconds 0.0.0.0:9096->80/tcp yrt5 e4a961e7853e nginx_python_alive "/bin/bash /usr/loca…" 44 seconds ago Up 49 seconds 0.0.0.0:9095->80/tcp yrt4 f96650b188be nginx_python_alive "/bin/bash /usr/loca…" 35 minutes ago Up 35 minutes 0.0.0.0:9099->80/tcp yrt3 084f71db25f2 nginx_python_alive "/bin/bash /usr/loca…" 35 minutes ago Up 35 minutes 0.0.0.0:9090->80/tcp yrt2 159199bee2ed nginx_python_alive "/bin/bash /usr/loca…" 36 minutes ago Up 36 minutes 0.0.0.0:8080->80/tcp yrt1
經過讀取/service/business/what
節點下的全部子節點就能夠獲取到全部的可用服務,代碼以下:
from kazoo.client import KazooClient import json def get_servers(): zk = KazooClient(hosts="192.168.0.105:2181, 192.168.0.105:2182, 192.168.0.105:2183") zk.start() ZNODE = "/service/business/what" children = zk.get_children(ZNODE) servers = [] for child in children: child_znode = ZNODE + "/" + child child_server_info, stat = zk.get(child_znode) child_server_info = child_server_info.decode() child_server_info = json.loads(child_server_info) servers.append(child_server_info[0] + ":" + child_server_info[1]) return servers
運行以後獲得可用服務列表['127.0.0.1:9096', '127.0.0.1:9095', '127.0.0.1:8080', '127.0.0.1:9099', '127.0.0.1:9090']
,使用者只須要隨機選擇一個使用就能夠了。
除此以外,還能夠在從zookeeper獲取可用服務列表的時候加一層緩存,提升性能,額外一個進程watch/service/business/what
的子節點變更,當有子節點變更的時候,刪除緩存,這樣就能夠作到緩存中的內容'時時'和zookeeper中保持一致了
至此大概對zookeeper在實際應用中的做用有了大概瞭解,這對我理解他在kafka中的做用有很大的幫助。在kafka中,zookeeper負責的是存儲kafka中的元數據信息,隊列的數據是不會存儲到zookeeper的,kafka是分佈式的,zookeeper協調broker、producer、consumer之間的關係,當有新的角色加入的時候,更新zookeeper中的數據,其餘角色就能夠獲得通知,並做出相應的調整,不須要停機更新配置,作到動態擴容。下圖來自互聯網,比較清晰的展現了zookeeper中存儲的kafka元信息數據。
zookeeper在kafka中充當的更像是分佈式服務中配置中心的角色,全部配置信息、公共信息都丟到這裏來了,此爲吾之愚見,望斧正。