瞭解saltstack的通訊協議zeromq(二)

上文討論了PAIR/PAIR,REQ/REP兩種模式,如今看看PUB/SUB和PUSH/PULL模式。
html

PUB/SUB:發佈訂閱模式,跟咱們訂閱新聞相似的,採用異步IO,多對多模式,若是沒有訂閱,服務端發送的消息直接丟棄掉。python

wKioL1OtEcmDfgbUAACa4-9bWCI546.jpg

pub_server.py
git

import zmq
import random
import sys
import time

port = "5556"
if len(sys.argv) > 1:
        port =  sys.argv[1]
        int(port)

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
        topic = random.randrange(9999,10005)
        messagedata = random.randrange(1,215) - 80
        print "%d %d" % (topic, messagedata)
        socket.send("%d %d" % (topic, messagedata))
        time.sleep(1)

sub_client.pygithub

import sys
import time
import zmq

port = "5556"
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect("tcp://localhost:%s" % port)



#socket.set(zmq.UNSUBSCRIBE, messagedata)
topicfilter = "10001"
socket.set(zmq.SUBSCRIBE, topicfilter)

#Process 5 updates
total_value = 0
#for update_nbr in range (5):
while True:
        string = socket.recv()
        topic, messagedata = string.split()
#       total_value += int(messagedata)
        print topic, messagedata
        time.sleep(1)

zmq.SUBCRIBE是用來指明訂閱某種消息,這裏訂閱的是出現10001的信息dom


PUSH/PULL:任務分發模式,主要用於分佈式計算的,將不少個任務分發到worker,而後worker將計算結果發送到結果收集器。異步

wKioL1OtGOHjf4EhAACmQcZJt9k590.jpg

producer.pysocket

import time
import zmq
import random

context = zmq.Context()
sender = context.socket(zmq.PUSH)
sender.bind('tcp://*:5557')

# sync start of batch
# be sure all worker connect success
sink = context.socket(zmq.PUSH)
sink.connect('tcp://0.0.0.0:5558')
print 'Press Enter when the workers are ready:'
_ = raw_input()
print 'Sending tasks to workers...'
sink.send(b'0')


for task_nbr in xrange(1000000):
        workload = random.randint(1,10)
        sender.send_string(u'%i' % workload)

for i in range(10):
        sender.send_string(u'0')
time.sleep(1)

consumer.pytcp

import sys
import time
import zmq

context = zmq.Context()

# Socket to recevie messages on
receiver = context.socket(zmq.PULL)
receiver.connect('tcp://localhost:5557')

# socket to send messages
sender = context.socket(zmq.PUSH)
sender.connect('tcp://localhost:5558')

while True:
        a_str = receiver.recv_string()
        num = int(a_str)
        if num % 2 == 0 or a_str == u'0':
                sender.send_string(a_str)

result.py分佈式

import sys
import time
import zmq

context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

# Wait for start of batch
s = receiver.recv()
sum = 0
flag = 0
# Start our clock now
tstart = time.time()
while True:
        a_str = receiver.recv_string()
        num = int(a_str)
        sum += num
        if a_str == '0':
                flag += 1
        if flag == 10:
                break

tend = time.time()
tdiff = tend - tstart
total_msec = tdiff * 1000
print "Total elapsed time: %d msec" % total_msec

wKioL1OtPO6w0QYIAAES_IQ-b9I110.jpg

這個結果並不精確,分別是啓動1個、2個、3個、4個consumer.py進程的測試結果,說明計算縮短了時間。ide


Queue,Forwarder,Streamer分別是REQ/REP、PUB/SUB、PUSH/PULL的代理,用於代理不一樣網段的機器。

關於代理的用法,這裏不講述。請參考下面地址。


參考地址:

http://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/pyzmq.html

https://github.com/anjuke/zguide-cn

http://zguide.zeromq.org/

相關文章
相關標籤/搜索