前言html
使用過簡單的python的ZMQ:server開啓3個線程,client發送心跳包,若是服務端超過n秒沒應答,則從新鏈接。python
網上找的案例,server使用的zmq.device,可是一直不明白什麼含義。git
案例連接:http://nphard.me/2016/03/05/pyzmq-demo/api
client使用了超時重連,傳輸數據時,若是server沒有響應則從新鏈接並從新發送數據,這樣會致使,客戶端重複發送多條數據,被服務端接收處理,可是服務端並無回覆。(如下準備解釋,可是沒成功)多線程
如下爲參考網站,不少英文的看不懂。frontend
http://api.zeromq.org/socket
這裏是翻譯的目錄tcp
http://pyzmq.readthedocs.io/en/latest/ide
http://zguide.zeromq.org/py:all測試
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/index.html
https://wizardforcel.gitbooks.io/zmq-guide/content/chapter1.html
具體細節參考的中文資料在對應目錄內給出。
zmq的router和dealer是什麼
網上有不少資料介紹zmq的幾種模式,如下介紹req/rep應答模式。
首先確認一個概念:XREQ/XREP are aliases for ROUTER/DEALER. XREQ/XREP were used in ZeroMQ 2.x.
那麼router和dealer是什麼?直接上連接:
http://www.cnblogs.com/fengbohello/p/4743868.html
官網介紹了應答模式的幾種區別:http://zguide.zeromq.org/py:chapter3
如下是本身的理解,若是有問題請留言,謝謝。
參考畫圖:https://stackoverflow.com/questions/23581172/what-is-zmq-router-and-zmq-dealer-in-python-zeromq
zmq的device
參考官方api:http://pyzmq.readthedocs.io/en/latest/api/zmq.devices.html
參考device詳細介紹:http://pjwqq.iteye.com/blog/2260254
根據上圖給出代碼,運行device_func,frontend面向客戶端(發起請求方),綁定router接收請求,device內部經過backend與服務端鏈接,dealer向服務端發起請求,並將數據傳達給server。
因此流程就是這樣:client (req) -----> router/dealer (queue) -----> server (rep)
device 和 server 端:經過多線程寫在一塊兒的。運行前後順序是 一、device --- 二、server / client (隨便哪個先啓動都會等待後啓動鏈接,若是中途server異常,client須要超時重試, !!!!!^~^)
def device_func(): thread_num = 3 context = zmq.Context() url_router = 'inproc://ping-server' url_dealer = 'tcp://*:5559' # Socket do cliente frontend = context.socket(zmq.ROUTER) # 或者是 zmq.ROUTER zmq.XREP frontend.bind(url_dealer) # Socket do servidor backend = context.socket(zmq.DEALER) # 或者是 zmq.DEALER zmq.XREQ backend.bind(url_router) for i in range(1, thread_num + 1): thread = threading.Thread(target=server_func, args=(i, url_router, context)) thread.start() # property找不到 Device()報錯 zmq.device(zmq.QUEUE, frontend, backend) frontend.close() backend.close() context.term() def server_func(name, url_router, context): print(">>> start %d %s" % (name, '.' * 50)) socket = context.socket(zmq.REP) socket.connect(url_router) while True: try: message = socket.recv() r_data = msgpack.unpackb(message, encoding='utf-8') print('server %d received:' % name, r_data) data = 'server %d send: %s' % (name, utils.time_now()) print(data) s_data = msgpack.packb(data) socket.send(s_data) except: traceback.print_exc() socket.close() break
client端,鏈接device的frontend端(dealer):
def client2(): url_dealer = 'tcp://localhost:5559' context = zmq.Context() socket = context.socket(zmq.REQ) print("Collecting data from server…") socket.connect(url_dealer) pid = os.getpid() i = 0 while 1: i += 1 # data = input('\n>>:').strip() data = ('send - msg%d' % i) # print(data) s_data = msgpack.packb(data.encode()) socket.send(s_data) ret = socket.recv() r_data = msgpack.unpackb(ret, encoding='utf-8') print('>>>received msg%d server info: %s \n' % (i, r_data))
以上一塊兒理解了zmq的device、router、dealer。
官網指出:function zmq.
device
(device_type, frontend, backend) Deprecated since version libzmq-3.2: Use zmq.proxy 取而代之的 zmq.
proxy
(frontend, backend, capture)
同時又有 class zmq.devices.
Device
(device_type=3, in_type=None, out_type=None) 測試了跑不起來
備註:
inproc 是zmq本地 進程/線程 的傳輸方式。能夠自定義。
zmq的Polling
主要功能應該是 一個程序種須要創建多個socket鏈接時,須要接收數據,又要轉發數據等。參考如下連接:
http://pjwqq.iteye.com/blog/2260791
我這裏主要是使用到client的超時重試功能。重現客戶端不斷超時重試發送數據,服務端不斷接收回復。數據大量冗餘重複。
server:同上,不過加了一句 sleep:
time.sleep(6) message = socket.recv()
client:
def conn_socket(url_dealer): """ 創建socket鏈接""" try: context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect(url_dealer) poller = zmq.Poller() poller.register(socket, zmq.POLLIN) print('[%d] socket conn success' % os.getpid()) return socket, poller except: traceback.print_exc() def close_socket(socket, poller): """ 關閉socket鏈接""" try: socket.close() poller.unregister(socket) except: traceback.print_exc() def client1(): """ """ url_dealer = 'tcp://localhost:5559' socket, poller = conn_socket(url_dealer) try: i = 0 while 1: i += 1 # data = input('\n>>:').strip() data = 'msg%d' % i print('send:%s' % data) # print(data) s_data = msgpack.packb(data.encode()) socket.send(s_data) while True: # 超時後從新鏈接,參數是毫秒 if poller.poll(5 * 1000): ret = socket.recv() r_data = msgpack.unpackb(ret, encoding='utf-8') print('>>>received:%s' % r_data) break else: close_socket(socket, poller) socket, poller = conn_socket(url_dealer) print('*resend:%s' % data) socket.send(s_data) except Exception as e: traceback.print_exc() finally: close_socket(socket, poller) pass
注意看:我故意將server的sleep值 > client 的超時重試的值,那麼問題就重現了:
客戶端不停的鏈接重試重發數據:
服務端不停的接收回復:
看了官方文檔的後續的更多的高級功能介紹,發現並無找到解決方案,暫時智能把超時時間設置的足夠大一些,並在返回數據中添加一個狀態,重試1次加一個1,方便反查和監控報警。