使用Kazoo操做ZooKeeper服務治理

單機服務的可靠性及可擴展性有限,某臺服務宕機可能會影響整個系統的正常使用;分佈式服務可以有效地解決這一問題,但同時分佈式服務也會帶來一些新的問題,如:服務發現(新增或者刪除了服務如何確保能讓客戶端知道),容災(某些服務出現故障如何讓客戶端只訪問正常的服務);ZooKeeper的提出主要是爲了解決分佈式服務的治理問題,它在分佈式環境中協調和管理服務。
node

Zookeeper協調管理服務的過程以下圖:python

 

服務端:每臺服務器都要向註冊中心Zookeeper進行註冊登記,而且保持與Zookeeper的鏈接,若是服務器與Zookeeper斷開了鏈接,Zookeeper將刪除該服務器的地址。
客戶端:須要服務的時候先向Zookeeper訂閱服務器的地址信息,Zookeeper返回給客戶端已註冊的服務器信息列表,客戶端從服務器信息列表中選擇服務器進行服務調用,若是Zookeeper記錄的服務器信息發生了變動,服務器會通知客戶端變動事件,客戶端能夠獲取最新的服務器信息。
ZooKeeper文件系統的數據結構是個樹狀結構,它的每一個節點(znode)由一個名稱標識,並用路徑/分割:json

 

 ZooKeeper的節點類型有:服務器

  1. 持久節點(ZooKeeper默認的節點類型,建立該節點的客戶端斷開鏈接後,持久節點仍然存在)數據結構

  2. 順序節點(將10位的序列號附加到原始名稱來設置節點的路徑,如:/server0000000001)多線程

  3. 臨時節點(當客戶端與ZooKeeper斷開鏈接時,臨時節點會自動刪除)app

 

RPC服務註冊到ZooKeeperdom

服務端:socket

 1 import threading
 2 import json
 3 import socket
 4 import sys
 5 from kazoo.client import KazooClient
 6 from divide_rpc import ServerStub
 7 from divide_rpc import InvalidOperation
 8 
 9 
10 class ThreadServer(object):
11     def __init__(self, host, port, handlers):
12         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
13         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
14         self.host = host
15         self.port = port
16         self.sock.bind((host, port))
17         self.handlers = handlers
18 
19     def serve(self):
20         """
21         開始服務
22         """
23         self.sock.listen(128)
24         self.register_zk()
25         print("開始監聽")
26         while True:
27             conn, addr = self.sock.accept()
28             print("創建連接%s" % str(addr))
29             t = threading.Thread(target=self.handle, args=(conn,))
30             t.start()
31 
32     def handle(self, client):
33         stub = ServerStub(client, self.handlers)
34         try:
35             while True:
36                 stub.process()
37         except EOFError:
38             print("客戶端關閉鏈接")
39 
40         client.close()
41 
42     def register_zk(self):
43         """
44         註冊到zookeeper
45         """
46         self.zk = KazooClient(hosts='127.0.0.1:2181')
47         self.zk.start()
48         self.zk.ensure_path('/rpc')  # 建立根節點
49         value = json.dumps({'host': self.host, 'port': self.port})
50         # 建立服務子節點
51         self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)
52 
53 
54 class Handlers:
55     @staticmethod
56     def divide(num1, num2=1):
57         """
58         除法
59         :param num1:
60         :param num2:
61         :return:
62         """
63         if num2 == 0:
64             raise InvalidOperation()
65         val = num1 / num2
66         return val
67 
68 
69 if __name__ == '__main__':
70     if len(sys.argv) < 3:
71         print("usage:python server.py [host] [port]")
72         exit(1)
73     host = sys.argv[1]
74     port = sys.argv[2]
75     server = ThreadServer(host, int(port), Handlers)
76     server.serve()

服務端經過kazoo鏈接zookeeper,依次建立根節點和服務的子節點,當啓動多線程服務器的時候,會根據ip和端口建立不一樣的節點,依次啓動兩個server(800一、8002),查看zookeeper的節點信息:tcp

1 >>> from kazoo.client import KazooClient
2 >>> zk = KazooClient(hosts='127.0.0.1:2181')
3 >>> zk.start() 
4 >>> children = zk.get_children("/rpc")
5 >>> print(children)
6 ['server0000000001', 'server0000000000']

 

客戶端:

 1 import random
 2 import time
 3 import json
 4 import socket
 5 from divide_rpc import (
 6     ClientStub, InvalidOperation
 7 )
 8 from kazoo.client import KazooClient
 9 
10 
11 class DistributedChannel(object):
12     def __init__(self):
13         self._zk = KazooClient(hosts='127.0.0.1:2181')
14         self._zk.start()
15         self._get_servers()
16 
17     def _get_servers(self, event=None):
18         """
19         從zookeeper獲取服務器地址信息列表
20         """
21         servers = self._zk.get_children('/rpc', watch=self._get_servers)
22         print(servers)
23         self._servers = []
24         for server in servers:
25             data = self._zk.get('/rpc/' + server)[0]
26             if data:
27                 addr = json.loads(data.decode())
28                 self._servers.append(addr)
29 
30     def _get_server(self):
31         """
32         隨機選出一個可用的服務器
33         """
34         return random.choice(self._servers)
35 
36     def get_connection(self):
37         """
38         提供一個可用的tcp鏈接
39         """
40         while True:
41             server = self._get_server()
42             print(server)
43             try:
44                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
45                 sock.connect((server['host'], server['port']))
46             except ConnectionRefusedError:
47                 time.sleep(1)
48                 continue
49             else:
50                 break
51         return sock
52 
53 
54 channel = DistributedChannel()
55 
56 for i in range(50):
57     try:
58         stub = ClientStub(channel)
59         val = stub.divide(i)
60     except InvalidOperation as e:
61         print(e.message)
62     else:
63         print(val)
64     time.sleep(1)

 

客戶端鏈接zookeeper,經過get_children來獲取服務器信息,並watch監聽服務器的變化狀況,啓動客戶端會發現它會調用8001端口的server和8002端口的server:

此時服務端新增長一個結點,8003,客戶端變化狀況:

能夠看出zookeeper總共有三個節點了,前面調用的server都是8001和8002,當8003加入後,zookeeper會發現並調用它

此時服務端斷開一個server,8001,客戶端變化狀況:

斷開server前客戶端會調用800一、800二、8003這三個服務,當斷開server 8001之後,zookeeper只會調用8002和8003這兩個server了

相關文章
相關標籤/搜索