第九章+ 異步IO/隊列(day10)html
攜程python
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程。git
協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:程序員
協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置github
協程的好處:編程
缺點:數組
使用yield實現協程操做例子:安全
import time import queue def consumer(name): print("--->starting eating baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while n < 5: n +=1 con.send(n) con2.send(n) print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
greenlet服務器
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 4 5 from greenlet import greenlet 6 7 8 def test1(): 9 print 12 10 gr2.switch() 11 print 34 12 gr2.switch() 13 14 15 def test2(): 16 print 56 17 gr1.switch() 18 print 78 19 20 gr1 = greenlet(test1) 21 gr2 = greenlet(test2) 22 gr1.switch()
gevent網絡
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
1 import gevent 2 3 def foo(): 4 print('Running in foo') 5 gevent.sleep(0) 6 print('Explicit context switch to foo again') 7 8 def bar(): 9 print('Explicit context to bar') 10 gevent.sleep(0) 11 print('Implicit context switch back to bar') 12 13 gevent.joinall([ 14 gevent.spawn(foo), 15 gevent.spawn(bar), 16 ])
輸出:
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
同步與異步的性能區別
1 import gevent 2 3 def task(pid): 4 """ 5 Some non-deterministic task 6 """ 7 gevent.sleep(0.5) 8 print('Task %s done' % pid) 9 10 def synchronous(): 11 for i in range(1,10): 12 task(i) 13 14 def asynchronous(): 15 threads = [gevent.spawn(task, i) for i in range(10)] 16 gevent.joinall(threads) 17 18 print('Synchronous:') 19 synchronous() 20 21 print('Asynchronous:') 22 asynchronous()
上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn
。 初始化的greenlet列表存放在數組threads
中,此數組被傳給gevent.joinall
函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。
遇到IO阻塞時會自動切換任務
1 from gevent import monkey; monkey.patch_all() 2 import gevent 3 from urllib.request import urlopen 4 5 def f(url): 6 print('GET: %s' % url) 7 resp = urlopen(url) 8 data = resp.read() 9 print('%d bytes received from %s.' % (len(data), url)) 10 11 gevent.joinall([ 12 gevent.spawn(f, 'https://www.python.org/'), 13 gevent.spawn(f, 'https://www.yahoo.com/'), 14 gevent.spawn(f, 'https://github.com/'), 15 ])
經過gevent實現單線程下的多socket併發
1 import sys 2 import socket 3 import time 4 import gevent 5 6 from gevent import socket,monkey 7 monkey.patch_all() 8 9 10 def server(port): 11 s = socket.socket() 12 s.bind(('0.0.0.0', port)) 13 s.listen(500) 14 while True: 15 cli, addr = s.accept() 16 gevent.spawn(handle_request, cli) 17 18 19 20 def handle_request(conn): 21 try: 22 while True: 23 data = conn.recv(1024) 24 print("recv:", data) 25 conn.send(data) 26 if not data: 27 conn.shutdown(socket.SHUT_WR) 28 29 except Exception as ex: 30 print(ex) 31 finally: 32 conn.close() 33 if __name__ == '__main__': 34 server(8001)
1 import socket 2 3 HOST = 'localhost' # The remote host 4 PORT = 8001 # The same port as used by the server 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 6 s.connect((HOST, PORT)) 7 while True: 8 msg = bytes(input(">>:"),encoding="utf8") 9 s.sendall(msg) 10 data = s.recv(1024) 11 #print(data) 12 13 print('Received', repr(data)) 14 s.close()
在UI編程中,經常要對鼠標點擊進行相應,首先如何得到鼠標點擊呢?
方式一:建立一個線程,該線程一直循環檢測是否有鼠標點擊,那麼這個方式有如下幾個缺點:
1. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
2. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
3. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題;
因此,該方式是很是很差的。
方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數;
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。
讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。
在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執 行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。
在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這 使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其 被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是 實現不當就會致使出現微妙且使人痛不欲生的bug。
在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當 I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序盡 可能的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。
當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:
當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。
網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。
http://www.cnblogs.com/alex3714/p/4372426.html
番外篇 http://www.cnblogs.com/alex3714/articles/5876749.html
1 #_*_coding:utf-8_*_ 2 __author__ = 'Alex Li' 3 4 import select 5 import socket 6 import sys 7 import queue 8 9 10 server = socket.socket() 11 server.setblocking(0) 12 13 server_addr = ('localhost',10000) 14 15 print('starting up on %s port %s' % server_addr) 16 server.bind(server_addr) 17 18 server.listen(5) 19 20 21 inputs = [server, ] #本身也要監測呀,由於server自己也是個fd 22 outputs = [] 23 24 message_queues = {} 25 26 while True: 27 print("waiting for next event...") 28 29 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏 30 31 for s in readable: #每一個s就是一個socket 32 33 if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了, 34 #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀 35 #新鏈接進來了,接受這個鏈接 36 conn, client_addr = s.accept() 37 print("new connection from",client_addr) 38 conn.setblocking(0) 39 inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接 40 #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到 41 #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的 42 43 message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送 44 45 else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了 46 #客戶端的數據過來了,在這接收 47 data = s.recv(1024) 48 if data: 49 print("收到來自[%s]的數據:" % s.getpeername()[0], data) 50 message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端 51 if s not in outputs: 52 outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端 53 54 55 else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀 56 print("客戶端斷開了",s) 57 58 if s in outputs: 59 outputs.remove(s) #清理已斷開的鏈接 60 61 inputs.remove(s) #清理已斷開的鏈接 62 63 del message_queues[s] ##清理已斷開的鏈接 64 65 66 for s in writeable: 67 try : 68 next_msg = message_queues[s].get_nowait() 69 70 except queue.Empty: 71 print("client [%s]" %s.getpeername()[0], "queue is empty..") 72 outputs.remove(s) 73 74 else: 75 print("sending msg to [%s]"%s.getpeername()[0], next_msg) 76 s.send(next_msg.upper()) 77 78 79 for s in exeptional: 80 print("handling exception for ",s.getpeername()) 81 inputs.remove(s) 82 if s in outputs: 83 outputs.remove(s) 84 s.close() 85 86 del message_queues[s] 87 88 select socket server
1 #_*_coding:utf-8_*_ 2 __author__ = 'Alex Li' 3 4 5 import socket 6 import sys 7 8 messages = [ b'This is the message. ', 9 b'It will be sent ', 10 b'in parts.', 11 ] 12 server_address = ('localhost', 10000) 13 14 # Create a TCP/IP socket 15 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), 16 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 17 ] 18 19 # Connect the socket to the port where the server is listening 20 print('connecting to %s port %s' % server_address) 21 for s in socks: 22 s.connect(server_address) 23 24 for message in messages: 25 26 # Send messages on both sockets 27 for s in socks: 28 print('%s: sending "%s"' % (s.getsockname(), message) ) 29 s.send(message) 30 31 # Read responses on both sockets 32 for s in socks: 33 data = s.recv(1024) 34 print( '%s: received "%s"' % (s.getsockname(), data) ) 35 if not data: 36 print(sys.stderr, 'closing socket', s.getsockname() ) 37 38 select socket client
selectors模塊
This module allows high-level and efficient I/O multiplexing, built upon the select
module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.
1 import selectors 2 import socket 3 4 sel = selectors.DefaultSelector() 5 6 def accept(sock, mask): 7 conn, addr = sock.accept() # Should be ready 8 print('accepted', conn, 'from', addr) 9 conn.setblocking(False) 10 sel.register(conn, selectors.EVENT_READ, read) 11 12 def read(conn, mask): 13 data = conn.recv(1000) # Should be ready 14 if data: 15 print('echoing', repr(data), 'to', conn) 16 conn.send(data) # Hope it won't block 17 else: 18 print('closing', conn) 19 sel.unregister(conn) 20 conn.close() 21 22 sock = socket.socket() 23 sock.bind(('localhost', 10000)) 24 sock.listen(100) 25 sock.setblocking(False) 26 sel.register(sock, selectors.EVENT_READ, accept) 27 28 while True: 29 events = sel.select() 30 for key, mask in events: 31 callback = key.data 32 callback(key.fileobj, mask)