單機服務的可靠性及可擴展性有限,某臺服務宕機可能會影響整個系統的正常使用;分佈式服務可以有效地解決這一問題,但同時分佈式服務也會帶來一些新的問題,如:服務發現(新增或者刪除了服務如何確保能讓客戶端知道),容災(某些服務出現故障如何讓客戶端只訪問正常的服務);ZooKeeper的提出主要是爲了解決分佈式服務的治理問題,它在分佈式環境中協調和管理服務。
node
Zookeeper協調管理服務的過程以下圖:python
服務端:每臺服務器都要向註冊中心Zookeeper進行註冊登記,而且保持與Zookeeper的鏈接,若是服務器與Zookeeper斷開了鏈接,Zookeeper將刪除該服務器的地址。
客戶端:須要服務的時候先向Zookeeper訂閱服務器的地址信息,Zookeeper返回給客戶端已註冊的服務器信息列表,客戶端從服務器信息列表中選擇服務器進行服務調用,若是Zookeeper記錄的服務器信息發生了變動,服務器會通知客戶端變動事件,客戶端能夠獲取最新的服務器信息。
ZooKeeper文件系統的數據結構是個樹狀結構,它的每一個節點(znode)由一個名稱標識,並用路徑/分割:json
ZooKeeper的節點類型有:服務器
1. 持久節點(ZooKeeper默認的節點類型,建立該節點的客戶端斷開鏈接後,持久節點仍然存在)數據結構
2. 順序節點(將10位的序列號附加到原始名稱來設置節點的路徑,如:/server0000000001)多線程
3. 臨時節點(當客戶端與ZooKeeper斷開鏈接時,臨時節點會自動刪除)app
RPC服務註冊到ZooKeeperdom
服務端:socket
1 import threading 2 import json 3 import socket 4 import sys 5 from kazoo.client import KazooClient 6 from divide_rpc import ServerStub 7 from divide_rpc import InvalidOperation 8 9 10 class ThreadServer(object): 11 def __init__(self, host, port, handlers): 12 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 13 self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 14 self.host = host 15 self.port = port 16 self.sock.bind((host, port)) 17 self.handlers = handlers 18 19 def serve(self): 20 """ 21 開始服務 22 """ 23 self.sock.listen(128) 24 self.register_zk() 25 print("開始監聽") 26 while True: 27 conn, addr = self.sock.accept() 28 print("創建連接%s" % str(addr)) 29 t = threading.Thread(target=self.handle, args=(conn,)) 30 t.start() 31 32 def handle(self, client): 33 stub = ServerStub(client, self.handlers) 34 try: 35 while True: 36 stub.process() 37 except EOFError: 38 print("客戶端關閉鏈接") 39 40 client.close() 41 42 def register_zk(self): 43 """ 44 註冊到zookeeper 45 """ 46 self.zk = KazooClient(hosts='127.0.0.1:2181') 47 self.zk.start() 48 self.zk.ensure_path('/rpc') # 建立根節點 49 value = json.dumps({'host': self.host, 'port': self.port}) 50 # 建立服務子節點 51 self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True) 52 53 54 class Handlers: 55 @staticmethod 56 def divide(num1, num2=1): 57 """ 58 除法 59 :param num1: 60 :param num2: 61 :return: 62 """ 63 if num2 == 0: 64 raise InvalidOperation() 65 val = num1 / num2 66 return val 67 68 69 if __name__ == '__main__': 70 if len(sys.argv) < 3: 71 print("usage:python server.py [host] [port]") 72 exit(1) 73 host = sys.argv[1] 74 port = sys.argv[2] 75 server = ThreadServer(host, int(port), Handlers) 76 server.serve()
服務端經過kazoo鏈接zookeeper,依次建立根節點和服務的子節點,當啓動多線程服務器的時候,會根據ip和端口建立不一樣的節點,依次啓動兩個server(800一、8002),查看zookeeper的節點信息:tcp
1 >>> from kazoo.client import KazooClient 2 >>> zk = KazooClient(hosts='127.0.0.1:2181') 3 >>> zk.start() 4 >>> children = zk.get_children("/rpc") 5 >>> print(children) 6 ['server0000000001', 'server0000000000']
客戶端:
1 import random 2 import time 3 import json 4 import socket 5 from divide_rpc import ( 6 ClientStub, InvalidOperation 7 ) 8 from kazoo.client import KazooClient 9 10 11 class DistributedChannel(object): 12 def __init__(self): 13 self._zk = KazooClient(hosts='127.0.0.1:2181') 14 self._zk.start() 15 self._get_servers() 16 17 def _get_servers(self, event=None): 18 """ 19 從zookeeper獲取服務器地址信息列表 20 """ 21 servers = self._zk.get_children('/rpc', watch=self._get_servers) 22 print(servers) 23 self._servers = [] 24 for server in servers: 25 data = self._zk.get('/rpc/' + server)[0] 26 if data: 27 addr = json.loads(data.decode()) 28 self._servers.append(addr) 29 30 def _get_server(self): 31 """ 32 隨機選出一個可用的服務器 33 """ 34 return random.choice(self._servers) 35 36 def get_connection(self): 37 """ 38 提供一個可用的tcp鏈接 39 """ 40 while True: 41 server = self._get_server() 42 print(server) 43 try: 44 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 45 sock.connect((server['host'], server['port'])) 46 except ConnectionRefusedError: 47 time.sleep(1) 48 continue 49 else: 50 break 51 return sock 52 53 54 channel = DistributedChannel() 55 56 for i in range(50): 57 try: 58 stub = ClientStub(channel) 59 val = stub.divide(i) 60 except InvalidOperation as e: 61 print(e.message) 62 else: 63 print(val) 64 time.sleep(1)
客戶端鏈接zookeeper,經過get_children來獲取服務器信息,並watch監聽服務器的變化狀況,啓動客戶端會發現它會調用8001端口的server和8002端口的server:
此時服務端新增長一個結點,8003,客戶端變化狀況:
能夠看出zookeeper總共有三個節點了,前面調用的server都是8001和8002,當8003加入後,zookeeper會發現並調用它
此時服務端斷開一個server,8001,客戶端變化狀況:
斷開server前客戶端會調用800一、800二、8003這三個服務,當斷開server 8001之後,zookeeper只會調用8002和8003這兩個server了