上文討論了PAIR/PAIR,REQ/REP兩種模式,如今看看PUB/SUB和PUSH/PULL模式。
html
PUB/SUB:發佈訂閱模式,跟咱們訂閱新聞相似的,採用異步IO,多對多模式,若是沒有訂閱,服務端發送的消息直接丟棄掉。python
pub_server.py
git
import zmq import random import sys import time port = "5556" if len(sys.argv) > 1: port = sys.argv[1] int(port) context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:%s" % port) while True: topic = random.randrange(9999,10005) messagedata = random.randrange(1,215) - 80 print "%d %d" % (topic, messagedata) socket.send("%d %d" % (topic, messagedata)) time.sleep(1)
sub_client.pygithub
import sys import time import zmq port = "5556" # Socket to talk to server context = zmq.Context() socket = context.socket(zmq.SUB) print "Collecting updates from weather server..." socket.connect("tcp://localhost:%s" % port) #socket.set(zmq.UNSUBSCRIBE, messagedata) topicfilter = "10001" socket.set(zmq.SUBSCRIBE, topicfilter) #Process 5 updates total_value = 0 #for update_nbr in range (5): while True: string = socket.recv() topic, messagedata = string.split() # total_value += int(messagedata) print topic, messagedata time.sleep(1)
zmq.SUBCRIBE是用來指明訂閱某種消息,這裏訂閱的是出現10001的信息dom
PUSH/PULL:任務分發模式,主要用於分佈式計算的,將不少個任務分發到worker,而後worker將計算結果發送到結果收集器。異步
producer.pysocket
import time import zmq import random context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind('tcp://*:5557') # sync start of batch # be sure all worker connect success sink = context.socket(zmq.PUSH) sink.connect('tcp://0.0.0.0:5558') print 'Press Enter when the workers are ready:' _ = raw_input() print 'Sending tasks to workers...' sink.send(b'0') for task_nbr in xrange(1000000): workload = random.randint(1,10) sender.send_string(u'%i' % workload) for i in range(10): sender.send_string(u'0') time.sleep(1)
consumer.pytcp
import sys import time import zmq context = zmq.Context() # Socket to recevie messages on receiver = context.socket(zmq.PULL) receiver.connect('tcp://localhost:5557') # socket to send messages sender = context.socket(zmq.PUSH) sender.connect('tcp://localhost:5558') while True: a_str = receiver.recv_string() num = int(a_str) if num % 2 == 0 or a_str == u'0': sender.send_string(a_str)
result.py分佈式
import sys import time import zmq context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.bind("tcp://*:5558") # Wait for start of batch s = receiver.recv() sum = 0 flag = 0 # Start our clock now tstart = time.time() while True: a_str = receiver.recv_string() num = int(a_str) sum += num if a_str == '0': flag += 1 if flag == 10: break tend = time.time() tdiff = tend - tstart total_msec = tdiff * 1000 print "Total elapsed time: %d msec" % total_msec
這個結果並不精確,分別是啓動1個、2個、3個、4個consumer.py進程的測試結果,說明計算縮短了時間。ide
Queue,Forwarder,Streamer分別是REQ/REP、PUB/SUB、PUSH/PULL的代理,用於代理不一樣網段的機器。
關於代理的用法,這裏不講述。請參考下面地址。
參考地址:
http://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/pyzmq.html