冰凍三尺非一日之寒--有點難度

第九章+ 異步IO/隊列(day10)html

  1. Gevent協程
  2. Select\Poll\Epoll異步IO與事件驅動

 攜程python

  協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程git

  協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:程序員

  協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置github

協程的好處:編程

  • 無需線程上下文切換的開銷
  • 無需原子操做鎖定及同步的開銷
  • 方便切換控制流,簡化編程模型
  • 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

缺點:數組

  • 沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  • 進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

使用yield實現協程操做例子:安全

import time
import queue
def consumer(name):
    print("--->starting eating baozi...")
    while True:
        new_baozi = yield
        print("[%s] is eating baozi %s" % (name,new_baozi))
        #time.sleep(1)
 
def producer():
 
    r = con.__next__()
    r = con2.__next__()
    n = 0
    while n < 5:
        n +=1
        con.send(n)
        con2.send(n)
        print("\033[32;1m[producer]\033[0m is making baozi %s" %n )
 
 
if __name__ == '__main__':
    con = consumer("c1")
    con2 = consumer("c2")
    p = producer()

greenlet服務器

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3   
 4   
 5 from greenlet import greenlet
 6   
 7   
 8 def test1():
 9     print 12
10     gr2.switch()
11     print 34
12     gr2.switch()
13   
14   
15 def test2():
16     print 56
17     gr1.switch()
18     print 78
19   
20 gr1 = greenlet(test1)
21 gr2 = greenlet(test2)
22 gr1.switch()

gevent網絡

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

 1 import gevent
 2  
 3 def foo():
 4     print('Running in foo')
 5     gevent.sleep(0)
 6     print('Explicit context switch to foo again')
 7  
 8 def bar():
 9     print('Explicit context to bar')
10     gevent.sleep(0)
11     print('Implicit context switch back to bar')
12  
13 gevent.joinall([
14     gevent.spawn(foo),
15     gevent.spawn(bar),
16 ])

輸出:

Running in foo

Explicit context to bar

Explicit context switch to foo again

Implicit context switch back to bar

同步與異步的性能區別

 1 import gevent
 2  
 3 def task(pid):
 4     """
 5     Some non-deterministic task
 6     """
 7     gevent.sleep(0.5)
 8     print('Task %s done' % pid)
 9  
10 def synchronous():
11     for i in range(1,10):
12         task(i)
13  
14 def asynchronous():
15     threads = [gevent.spawn(task, i) for i in range(10)]
16     gevent.joinall(threads)
17  
18 print('Synchronous:')
19 synchronous()
20  
21 print('Asynchronous:')
22 asynchronous()
View Code

上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。

遇到IO阻塞時會自動切換任務

 1 from gevent import monkey; monkey.patch_all()
 2 import gevent
 3 from  urllib.request import urlopen
 4  
 5 def f(url):
 6     print('GET: %s' % url)
 7     resp = urlopen(url)
 8     data = resp.read()
 9     print('%d bytes received from %s.' % (len(data), url))
10  
11 gevent.joinall([
12         gevent.spawn(f, 'https://www.python.org/'),
13         gevent.spawn(f, 'https://www.yahoo.com/'),
14         gevent.spawn(f, 'https://github.com/'),
15 ])
View Code

經過gevent實現單線程下的多socket併發

 1 import sys
 2 import socket
 3 import time
 4 import gevent
 5  
 6 from gevent import socket,monkey
 7 monkey.patch_all()
 8  
 9  
10 def server(port):
11     s = socket.socket()
12     s.bind(('0.0.0.0', port))
13     s.listen(500)
14     while True:
15         cli, addr = s.accept()
16         gevent.spawn(handle_request, cli)
17  
18  
19  
20 def handle_request(conn):
21     try:
22         while True:
23             data = conn.recv(1024)
24             print("recv:", data)
25             conn.send(data)
26             if not data:
27                 conn.shutdown(socket.SHUT_WR)
28  
29     except Exception as  ex:
30         print(ex)
31     finally:
32         conn.close()
33 if __name__ == '__main__':
34     server(8001)
 1 import socket
 2  
 3 HOST = 'localhost'    # The remote host
 4 PORT = 8001           # The same port as used by the server
 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 6 s.connect((HOST, PORT))
 7 while True:
 8     msg = bytes(input(">>:"),encoding="utf8")
 9     s.sendall(msg)
10     data = s.recv(1024)
11     #print(data)
12  
13     print('Received', repr(data))
14 s.close()
一般,咱們寫服務器處理模型的程序時,有如下幾種模型:
(1)每收到一個請求,建立一個新的進程,來處理該請求;
(2)每收到一個請求,建立一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)中方法,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。
第(2)種方式,因爲要涉及到線程的同步,有可能會面臨 死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。
綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數 網絡服務器採用的方式
 

看圖說話講事件驅動模型

在UI編程中,經常要對鼠標點擊進行相應,首先如何得到鼠標點擊呢?
方式一:建立一個線程,該線程一直循環檢測是否有鼠標點擊,那麼這個方式有如下幾個缺點
1. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
2. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
3. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題;
因此,該方式是很是很差的。

方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數;

 

 

 

事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。

讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。

 

在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執 行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。

在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這 使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其 被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是 實現不當就會致使出現微妙且使人痛不欲生的bug。

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當 I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序盡 可能的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。

當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:

  1. 程序中有許多任務,並且…
  2. 任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…
  3. 在等待事件到來時,某些任務會阻塞。

當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。

網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。

Select\Poll\Epoll異步IO 

http://www.cnblogs.com/alex3714/p/4372426.html 

番外篇 http://www.cnblogs.com/alex3714/articles/5876749.html 

select 多併發socket 例子

 1 #_*_coding:utf-8_*_
 2 __author__ = 'Alex Li'
 3 
 4 import select
 5 import socket
 6 import sys
 7 import queue
 8 
 9 
10 server = socket.socket()
11 server.setblocking(0)
12 
13 server_addr = ('localhost',10000)
14 
15 print('starting up on %s port %s' % server_addr)
16 server.bind(server_addr)
17 
18 server.listen(5)
19 
20 
21 inputs = [server, ] #本身也要監測呀,由於server自己也是個fd
22 outputs = []
23 
24 message_queues = {}
25 
26 while True:
27     print("waiting for next event...")
28 
29     readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏
30 
31     for s in readable: #每一個s就是一個socket
32 
33         if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了,
34             #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀
35             #新鏈接進來了,接受這個鏈接
36             conn, client_addr = s.accept()
37             print("new connection from",client_addr)
38             conn.setblocking(0)
39             inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接
40             #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到
41             #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的
42 
43             message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送
44 
45         else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了
46             #客戶端的數據過來了,在這接收
47             data = s.recv(1024)
48             if data:
49                 print("收到來自[%s]的數據:" % s.getpeername()[0], data)
50                 message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端
51                 if s not  in outputs:
52                     outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端
53 
54 
55             else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀
56                 print("客戶端斷開了",s)
57 
58                 if s in outputs:
59                     outputs.remove(s) #清理已斷開的鏈接
60 
61                 inputs.remove(s) #清理已斷開的鏈接
62 
63                 del message_queues[s] ##清理已斷開的鏈接
64 
65 
66     for s in writeable:
67         try :
68             next_msg = message_queues[s].get_nowait()
69 
70         except queue.Empty:
71             print("client [%s]" %s.getpeername()[0], "queue is empty..")
72             outputs.remove(s)
73 
74         else:
75             print("sending msg to [%s]"%s.getpeername()[0], next_msg)
76             s.send(next_msg.upper())
77 
78 
79     for s in exeptional:
80         print("handling exception for ",s.getpeername())
81         inputs.remove(s)
82         if s in outputs:
83             outputs.remove(s)
84         s.close()
85 
86         del message_queues[s]
87 
88 select socket server
select socket server
 1 #_*_coding:utf-8_*_
 2 __author__ = 'Alex Li'
 3 
 4 
 5 import socket
 6 import sys
 7 
 8 messages = [ b'This is the message. ',
 9              b'It will be sent ',
10              b'in parts.',
11              ]
12 server_address = ('localhost', 10000)
13 
14 # Create a TCP/IP socket
15 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
16           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
17           ]
18 
19 # Connect the socket to the port where the server is listening
20 print('connecting to %s port %s' % server_address)
21 for s in socks:
22     s.connect(server_address)
23 
24 for message in messages:
25 
26     # Send messages on both sockets
27     for s in socks:
28         print('%s: sending "%s"' % (s.getsockname(), message) )
29         s.send(message)
30 
31     # Read responses on both sockets
32     for s in socks:
33         data = s.recv(1024)
34         print( '%s: received "%s"' % (s.getsockname(), data) )
35         if not data:
36             print(sys.stderr, 'closing socket', s.getsockname() )
37 
38 select socket client
select socket client

selectors模塊

This module allows high-level and efficient I/O multiplexing, built upon the select module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.

 1 import selectors
 2 import socket
 3  
 4 sel = selectors.DefaultSelector()
 5  
 6 def accept(sock, mask):
 7     conn, addr = sock.accept()  # Should be ready
 8     print('accepted', conn, 'from', addr)
 9     conn.setblocking(False)
10     sel.register(conn, selectors.EVENT_READ, read)
11  
12 def read(conn, mask):
13     data = conn.recv(1000)  # Should be ready
14     if data:
15         print('echoing', repr(data), 'to', conn)
16         conn.send(data)  # Hope it won't block
17     else:
18         print('closing', conn)
19         sel.unregister(conn)
20         conn.close()
21  
22 sock = socket.socket()
23 sock.bind(('localhost', 10000))
24 sock.listen(100)
25 sock.setblocking(False)
26 sel.register(sock, selectors.EVENT_READ, accept)
27  
28 while True:
29     events = sel.select()
30     for key, mask in events:
31         callback = key.data
32         callback(key.fileobj, mask)
相關文章
相關標籤/搜索