事件驅動python
簡而言之,事件驅動分爲二個部分:第一,註冊事件;第二,觸發事件。react
自定義事件驅動框架git
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #!/usr/bin/env python 4 # -*- coding:utf-8 -*- 5 6 # event_drive.py 7 event_list = [] 8 def run(): 9 for event in event_list: 10 obj = event() 11 obj.execute() 12 class BaseHandler(object): 13 """ 14 用戶必須繼承該類,從而規範全部類的方法(相似於接口的功能) 15 """ 16 def execute(self): 17 raise Exception('you must overwrite execute')
程序員使用上面定義的框架:程序員
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #!/usr/bin/env python 4 # -*- coding:utf-8 -*- 5 6 from fram import event_drive 7 8 class MyHandler(event_drive.BaseHandler): 9 10 def execute(self): 11 print ('event-drive execute MyHandler') 12 13 14 event_drive.event_list.append(MyHandler) 15 event_drive.run()
Protocols描述瞭如何以異步的方式處理網絡中的事件。HTTP、DNS以及IMAP是應用層協議中的例子。Protocols實現了IProtocol接口,它包含以下的方法:redis
makeConnection 在transport對象和服務器之間創建一條鏈接 connectionMade 鏈接創建起來後調用 dataReceived 接收數據時調用 connectionLost 關閉鏈接時調用
Transports表明網絡中兩個通訊結點之間的鏈接。Transports負責描述鏈接的細節,好比鏈接是面向流式的仍是面向數據報的,流控以及可靠性。TCP、UDP和Unix套接字可做爲transports的例子。它們被設計爲「知足最小功能單元,同時具備最大程度的可複用性」,並且從協議實現中分離出來,這讓許多協議能夠採用相同類型的傳輸。Transports實現了ITransports接口,它包含以下的方法:緩存
write 以非阻塞的方式按順序依次將數據寫到物理鏈接上 writeSequence 將一個字符串列表寫到物理鏈接上 loseConnection 將全部掛起的數據寫入,而後關閉鏈接 getPeer 取得鏈接中對端的地址信息 getHost 取得鏈接中本端的地址信息
將transports從協議中分離出來也使得對這兩個層次的測試變得更加簡單。能夠經過簡單地寫入一個字符串來模擬傳輸,用這種方式來檢查。服務器
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from twisted.internet import protocol 4 from twisted.internet import reactor 5 6 class Echo(protocol.Protocol): 7 def dataReceived(self, data):#只要twisted一收到數據 ,就會調用此方法 8 self.transport.write(data) # 把收到的數據 返回給客戶端 9 10 def main(): 11 factory = protocol.ServerFactory() #定義基礎工廠類 12 factory.protocol = Echo #socketserver中handle 13 14 reactor.listenTCP(9000,factory) 15 reactor.run() 16 17 if __name__ == '__main__': 18 main()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from twisted.internet import reactor, protocol 4 5 # a client protocol 6 7 class EchoClient(protocol.Protocol): 8 """Once connected, send a message, then print the result.""" 9 def connectionMade(self): #鏈接創建成功,就會自動調用此方法 10 print("connection is build, sending data...") 11 self.transport.write("hello alex!") 12 13 def dataReceived(self, data):#一接收到數據就把它寫回 14 "As soon as any data is received, write it back." 15 print "Server said:", data 16 #self.transport.loseConnection() 17 exit('exit') 18 19 def connectionLost(self, reason):#關閉鏈接 20 print "====connection lost===" 21 22 class EchoFactory(protocol.ClientFactory): 23 protocol = EchoClient #handle 24 25 def clientConnectionFailed(self, connector, reason): 26 print "Connection failed - goodbye!" 27 reactor.stop() 28 29 def clientConnectionLost(self, connector, reason): 30 print "Connection lost - goodbye!" 31 reactor.stop() 32 33 34 # this connects the protocol to a server running on port 8000 35 def main(): 36 f = EchoFactory() 37 reactor.connectTCP("localhost", 9000, f) 38 reactor.run() 39 40 # this only runs if the module was *not* imported 41 if __name__ == '__main__': 42 main()
運行服務器端腳本將啓動一個TCP服務器,監聽端口1234上的鏈接。服務器採用的是Echo協議,數據經TCP transport對象寫出。運行客戶端腳本將對服務器發起一個TCP鏈接,回顯服務器端的迴應而後終止鏈接並中止reactor事件循環。這裏的Factory用來對鏈接的雙方生成protocol對象實例。兩端的通訊是異步的,connectTCP負責註冊回調函數到reactor事件循環中,當socket上有數據可讀時通知回調處理。網絡
一個傳送文件的例子:app
服務器端,負載均衡
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #_*_coding:utf-8_*_ 4 # This is the Twisted Fast Poetry Server, version 1.0 5 6 import optparse, os 7 8 from twisted.internet.protocol import ServerFactory, Protocol 9 10 11 def parse_args(): 12 usage = """usage: %prog [options] poetry-file 13 14 This is the Fast Poetry Server, Twisted edition. 15 Run it like this: 16 17 python twisted_sendfile.py <path-to-poetry-file> 18 19 If you are in the base directory of the twisted-intro package, 20 you could run it like this: 21 22 python twisted-server-1/fastpoetry.py poetry/ecstasy.txt 23 24 to serve up John Donne's Ecstasy, which I know you want to do. 25 """ 26 27 parser = optparse.OptionParser(usage) 28 29 help = "The port to listen on. Default to a random available port." 30 parser.add_option('--port', type='int', help=help) 31 32 help = "The interface to listen on. Default is localhost." 33 parser.add_option('--iface', help=help, default='localhost') 34 35 options, args = parser.parse_args() 36 #print("--arg:",options,args) 37 #print("-->",options.port) 38 39 if len(args) != 1: 40 parser.error('Provide exactly one poetry file.') 41 poetry_file = args[0] 42 43 if not os.path.exists(args[0]): 44 parser.error('No such file: %s' % poetry_file) 45 46 return options, poetry_file 47 48 49 class PoetryProtocol(Protocol): #handle 50 def connectionMade(self): 51 self.transport.write(self.factory.poem) 52 self.transport.loseConnection() 53 54 55 class PoetryFactory(ServerFactory): #基礎類 56 protocol = PoetryProtocol 57 def __init__(self, poem): 58 self.poem = poem 59 60 def main(): 61 options, poetry_file = parse_args() 62 poem = open(poetry_file).read() 63 factory = PoetryFactory(poem) 64 from twisted.internet import reactor 65 port = reactor.listenTCP(options.port or 9000, factory, 66 interface=options.iface) 67 print 'Serving %s on %s.' % (poetry_file, port.getHost()) 68 reactor.run() 69 70 71 if __name__ == '__main__': 72 main()
客戶端:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # This is the Twisted Get Poetry Now! client, version 3.0. 4 5 # NOTE: This should not be used as the basis for production code. 6 7 import optparse 8 9 from twisted.internet.protocol import Protocol, ClientFactory 10 11 12 def parse_args(): 13 usage = """usage: %prog [options] [hostname]:port ... 14 15 This is the Get Poetry Now! client, Twisted version 3.0 16 Run it like this: 17 18 python get-poetry-1.py port1 port2 port3 ... 19 """ 20 21 parser = optparse.OptionParser(usage) 22 _, addresses = parser.parse_args() 23 print('==addr:',_,addresses) 24 if not addresses: 25 print parser.format_help() 26 parser.exit() 27 28 def parse_address(addr): 29 if ':' not in addr: 30 host = '127.0.0.1' 31 port = addr 32 else: 33 host, port = addr.split(':', 1) 34 if not port.isdigit(): 35 parser.error('Ports must be integers.') 36 return host, int(port) 37 #return parse_address(addresses) 38 return map(parse_address, addresses) 39 40 class PoetryProtocol(Protocol): 41 42 poem = '' 43 def dataReceived(self, data): 44 self.poem += data 45 #self.factory = PoetryClientFactory 46 print('[%s] recv:[%s]' %(self.transport.getPeer(),len(self.poem))) 47 def connectionLost(self, reason): 48 self.poemReceived(self.poem) 49 50 def poemReceived(self, poem): 51 self.factory.poem_finished(poem) 52 53 54 class PoetryClientFactory(ClientFactory): 55 protocol = PoetryProtocol #handle method 56 def __init__(self, callback): 57 self.callback = callback 58 def poem_finished(self, poem): 59 self.callback(poem) 60 #self.get_poem(poem) 61 62 63 def get_poetry(host, port, callback): 64 """ 65 Download a poem from the given host and port and invoke 66 callback(poem) 67 when the poem is complete. 68 """ 69 from twisted.internet import reactor 70 factory = PoetryClientFactory(callback) 71 reactor.connectTCP(host, port, factory) 72 73 74 def poetry_main(): 75 addresses = parse_args() #((172.0.0.1,9000),(...)) 76 from twisted.internet import reactor 77 poems = [] 78 79 def got_poem(poem): 80 poems.append(poem) 81 if len(poems) == len(addresses): 82 reactor.stop() 83 84 for address in addresses: 85 host, port = address 86 get_poetry(host, port, got_poem) 87 reactor.run() 88 89 print("main loop done...") 90 #for poem in poems: 91 # Eprint poem 92 93 if __name__ == '__main__': 94 poetry_main()
Redis:
redis是一個key-value存儲系統。和Memcached相似,它支持存儲的value類型相對更多,包括string(字符串)、list(鏈表)、set(集合)、zset(sorted set --有序集合)和hash(哈希類型)。這些數據類型都支持push/pop、add/remove及取交集並集和差集及更豐富的操做,並且這些操做都是原子性的。在此基礎上,redis支持各類不一樣方式的排序。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。
Python操做Redis:
API使用
redis-py 的API的使用能夠分類爲:
1.基本操做
以前咱們已經知道,redis是以key-value的形式存儲的,因此咱們在操做的時候。首先咱們將redis所在主機的ip和發佈端口做爲參數實例化了一個對象r,而後執行set('name','Peony_Y'),這樣咱們就在內存中存儲了一個key爲name,值爲‘Peony_Y’的項。咱們能夠理解爲{'name':'Peony_Y'},當咱們要讀取的以後,只須要get('name'),就會獲得'Peony_Y'的值。
2.鏈接池
redis-py使用connection pool來管理對一個redis server的全部鏈接,避免每次創建、釋放鏈接的開銷。默認,每一個Redis實例都會維護一個本身的鏈接池。能夠直接創建一個鏈接池,而後做爲參數Redis,這樣就能夠實現多個Redis實例共享一個鏈接池。
三、管道
redis-py默認在執行每次請求都會建立(鏈接池申請鏈接)和斷開(歸還鏈接池)一次鏈接操做,若是想要在一次請求中指定多個命令,則可使用pipline實現一次請求指定多個命令,而且默認狀況下一次pipline 是原子性操做。
四、發佈訂閱
發佈者:服務器
訂閱者:Dashboad和數據處理
Demo以下:
定義一個redishelper類,創建與redis鏈接,定義頻道爲fm92.4,定義發佈public及訂閱subscribe方法。
訂閱者:導入剛剛咱們寫好的類,實例化對象,調用訂閱方法,就可使用while True接收信息了。
發佈者:導入剛剛咱們寫好的類,實例化對象,調用發佈方法,下例發佈了一條消息‘hello’
RabbitMQ隊列
RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。
實現最簡單的隊列通訊
發送(send)端
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 #聲明queue 8 #hannel.queue_declare(queue='task_q',durable=True) 9 10 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 11 channel.basic_publish(exchange='', 12 routing_key='task_q', 13 body='Hello World! 35', 14 properties=pika.BasicProperties( 15 delivery_mode = 2, # make message persistent 16 ) 17 ) 18 print(" [x] Sent 'Hello World!'") 19 connection.close()
接收(recive)端
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 8 #You may ask why we declare the queue again ‒ we have already declared it in our previous code. 9 # We could avoid that if we were sure that the queue already exists. For example if send.py program 10 #was run before. But we're not yet sure which program to run first. In such cases it's a good 11 # practice to repeat declaring the queue in both programs. 12 #channel.queue_declare(queue='task_q',durable=True) 13 14 def callback(ch, method, properties, body): 15 print("-->ch") 16 print(" [x] Received %r" % body) 17 ch.basic_ack(delivery_tag = method.delivery_tag) 18 19 20 channel.basic_consume(callback, 21 queue='task_q', 22 no_ack=True) 23 24 print(' [*] Waiting for messages. To exit press CTRL+C') 25 channel.start_consuming()
工做隊列
在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少
消費的提供者的代碼
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 #聲明queue 8 channel.queue_declare(queue='task_queue') 9 10 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 11 import sys 12 13 message = ' '.join(sys.argv[1:]) or "Hello World!" 14 channel.basic_publish(exchange='', 15 routing_key='task_queue', 16 body=message, 17 properties=pika.BasicProperties( 18 delivery_mode = 2, # make message persistent 19 )) 20 print(" [x] Sent %r" % message) 21 connection.close()
消費者的代碼
1 import pika,time 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 time.sleep(body.count(b'.')) 12 print(" [x] Done") 13 ch.basic_ack(delivery_tag = method.delivery_tag) 14 15 16 channel.basic_consume(callback, 17 queue='task_queue', 18 ) 19 20 print(' [*] Waiting for messages. To exit press CTRL+C') 21 channel.start_consuming()
此時,先啓動消息生產者,而後再分別啓動3個消費者,經過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上
一、acknowledgment 消息不丟失
no-ack = False,若是生產者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host='10.211.55.4')) 5 channel = connection.channel() 6 7 channel.queue_declare(queue='hello') 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 import time 12 time.sleep(10) 13 print 'ok' 14 ch.basic_ack(delivery_tag = method.delivery_tag) 15 16 channel.basic_consume(callback, 17 queue='hello', 18 no_ack=False) 19 20 print(' [*] Waiting for messages. To exit press CTRL+C') 21 channel.start_consuming()
二、durable 消息不丟失
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue='hello', durable=True) 8 9 channel.basic_publish(exchange='', 10 routing_key='hello', 11 body='Hello World!', 12 properties=pika.BasicProperties( 13 delivery_mode=2, # make message persistent 14 )) 15 print(" [x] Sent 'Hello World!'") 16 connection.close()
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue='hello', durable=True) 8 9 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % body) 12 import time 13 time.sleep(10) 14 print 'ok' 15 ch.basic_ack(delivery_tag = method.delivery_tag) 16 17 channel.basic_consume(callback, 18 queue='hello', 19 no_ack=False) 20 21 print(' [*] Waiting for messages. To exit press CTRL+C') 22 channel.start_consuming()
三、消息獲取順序
默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue='hello') 8 9 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % body) 12 import time 13 time.sleep(10) 14 print 'ok' 15 ch.basic_ack(delivery_tag = method.delivery_tag) 16 17 channel.basic_qos(prefetch_count=1) 18 19 channel.basic_consume(callback, 20 queue='hello', 21 no_ack=False) 22 23 print(' [*] Waiting for messages. To exit press CTRL+C') 24 channel.start_consuming()
四、發佈訂閱
發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。
exchange type = fanout
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='logs', 10 type='fanout') 11 12 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 13 channel.basic_publish(exchange='logs', 14 routing_key='', 15 body=message) 16 print(" [x] Sent %r" % message) 17 connection.close()
1 #!/usr/bin/env python 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='logs', 9 type='fanout') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 channel.queue_bind(exchange='logs', 15 queue=queue_name) 16 17 print(' [*] Waiting for logs. To exit press CTRL+C') 18 19 def callback(ch, method, properties, body): 20 print(" [x] %r" % body) 21 22 channel.basic_consume(callback, 23 queue=queue_name, 24 no_ack=True) 25 26 channel.start_consuming()
五、關鍵字發送
exchange type = direct
以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
五、關鍵字發送
exchange type = direct
以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='direct_logs', 10 type='direct') 11 12 result = channel.queue_declare(exclusive=True) 13 queue_name = result.method.queue 14 15 severities = sys.argv[1:] 16 if not severities: 17 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) 18 sys.exit(1) 19 20 for severity in severities: 21 channel.queue_bind(exchange='direct_logs', 22 queue=queue_name, 23 routing_key=severity) 24 25 print(' [*] Waiting for logs. To exit press CTRL+C') 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 channel.basic_consume(callback, 31 queue=queue_name, 32 no_ack=True) 33 34 channel.start_consuming() 35 36 消費者
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs', 9 type='direct') 10 11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 12 message = ' '.join(sys.argv[2:]) or 'Hello World!' 13 channel.basic_publish(exchange='direct_logs', 14 routing_key=severity, 15 body=message) 16 print(" [x] Sent %r:%r" % (severity, message)) 17 connection.close()
六、模糊匹配
exchange type = topic
在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。
1 發送者路由值 隊列中 2 old.boy.python old.* -- 不匹配 3 old.boy.python old.# -- 匹配
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='topic_logs', 10 type='topic') 11 12 result = channel.queue_declare(exclusive=True) 13 queue_name = result.method.queue 14 15 binding_keys = sys.argv[1:] 16 if not binding_keys: 17 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) 18 sys.exit(1) 19 20 for binding_key in binding_keys: 21 channel.queue_bind(exchange='topic_logs', 22 queue=queue_name, 23 routing_key=binding_key) 24 25 print(' [*] Waiting for logs. To exit press CTRL+C') 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 channel.basic_consume(callback, 31 queue=queue_name, 32 no_ack=True) 33 34 channel.start_consuming() 35 36 消費者
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='topic_logs', 10 type='topic') 11 12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 13 message = ' '.join(sys.argv[2:]) or 'Hello World!' 14 channel.basic_publish(exchange='topic_logs', 15 routing_key=routing_key, 16 body=message) 17 print(" [x] Sent %r:%r" % (routing_key, message)) 18 connection.close() 19 20 生產者