python學習筆記_week10

1、多進程multiprocessing

io 操做不佔用cpu,計算佔cpu(如1+1),上下文切換耗資源(多線程可能不如單線程快),python多線程不適合cup密集操做型的任務,適合io操做密集型的任務html

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.python

 1 import multiprocessing
 2 import time,threading
 3 def threading_run():
 4     print(threading.get_ident())
 5 def run(name):
 6     time.sleep(2)
 7     print('hello', name)
 8     t=threading.Thread(target=threading_run,)
 9     t.start()
10 if __name__ == '__main__':
11     for i in range(10):
12         p=multiprocessing.Process(target=run,args=('bob %s'%i,))
13         p.start()
14         # p.join()
多進程

To show the individual process IDs involved, here is an expanded example:linux

 1 from multiprocessing import Process
 2 import os
 3 def info(title):
 4     print(title)
 5     print('module name:', __name__)
 6     print('parent process:', os.getppid())
 7     print('process id:', os.getpid())
 8     print("\n\n")
 9 def f(name):
10     info('\033[31;1mcalled from child process function f\033[0m')
11     print('hello', name)
12 if __name__ == '__main__':
13     info('\033[32;1mmain process line\033[0m')
14     p = Process(target=f, args=('bob',))
15     p.start()
16     # p.join()
get進程id

2、進程間通信

同一進程下的線程能夠經過線程queue通信,但不一樣進程想要通信必須找中間人(翻譯)---進程Queue(不一樣於線程queue)  git

不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換,能夠用如下方法:程序員

Queues:使用方法跟threading裏的queue差很少github

 1 from multiprocessing import Process, Queue
 2 import threading
 3 #import queue
 4 # def f(q):
 5 #     q.put([42, None, 'hello'])
 6 def f(qq):
 7     print("in child:",qq.qsize())
 8     qq.put([42, None, 'hello'])
 9 if __name__ == '__main__':
10     q = Queue()
11     q.put("test123")
12     #p = threading.Thread(target=f,)
13     p = Process(target=f, args=(q,))
14     p.start()
15     p.join()
16     print("444",q.get_nowait())
17     print("444",q.get_nowait())
18      # prints "[42, None, 'hello']"
19     #print(q.get())  # prints "[42, None, 'hello']"
進程queue

Pipes:The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).web

 1 from multiprocessing import Process, Pipe
 2 def f(conn):
 3     conn.send([42, None, 'hello from child']) #發幾回收幾回,發多了,多的就收不到,發少了對方會等着
 4     conn.send([42, None, 'hello from child2'])
 5     print("from parent:",conn.recv())
 6     conn.close()
 7 if __name__ == '__main__':
 8     parent_conn, child_conn = Pipe()
 9     p = Process(target=f, args=(child_conn,))
10     p.start()
11     print(parent_conn.recv())  # prints "[42, None, 'hello']"
12     print(parent_conn.recv())  # prints "[42, None, 'hello']"
13     parent_conn.send("張洋可好") # prints "[42, None, 'hello']"
14     p.join()
pipe

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.數據庫

Managers:A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.編程

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array.windows

 1 from multiprocessing import Process, Manager
 2 import os
 3 def f(d, l):
 4     d[os.getpid()] = os.getpid()
 5     l.append(os.getpid())
 6     print(l)
 7 if __name__ == '__main__':
 8     with Manager() as manager: #不用加鎖,manager內部有鎖,控制數據不會亂
 9         d = manager.dict() #{}能夠在多個進程之間傳遞和共享的字典
10         l = manager.list(range(5))#能夠在多個進程之間傳遞和共享的列表
11         p_list = [] #多個進程,爲join用的
12         for i in range(10):
13             p = Process(target=f, args=(d, l))
14             p.start()
15             p_list.append(p)
16         for res in p_list:#等待結果
17             res.join()
18         print(d)
19         print(l)
manager

進程同步:Without using the lock output from the different processes is liable to get all mixed up.

1 from multiprocessing import Process, Lock
2 def f(l, i):
3     l.acquire()#得到鎖,共享屏幕,防止打印時亂掉
4     print('hello world', i)
5     l.release()
6 if __name__ == '__main__':
7     lock = Lock()
8     for num in range(10):
9         Process(target=f, args=(lock, num)).start()
lock

3、進程池

---也能夠有線程池(經過信號量),但線程開銷不大,線程多最多會致使cpu切換過於頻繁,可是進程多了容易把系統搞癱。  

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

進程池中有兩個方法:apply(串行,同步執行)、apply_async(並行,異步執行)

 1 from  multiprocessing import Process,Pool
 2 import time,os
 3 def Foo(i):
 4     time.sleep(2)
 5     print("in process",os.getpid())
 6     return i+100
 7 def Bar(arg):
 8     print('-->exec done:',arg,os.getpid())
 9 #在windows上啓動多進程必須寫if __name__ == '__main__':
10 if __name__ == '__main__':#手動執行會執行下面的代碼,被當作模塊導入時不會執行
11     pool = Pool(5) #process=5,容許進程池裏同時放入5個進程
12     print("主進程的pid:", os.getpid())
13     for i in range(10):
14         pool.apply_async(func=Foo, args=(i,),callback=Bar)#callback,回調,執行完Foo以後執行Bar,Foo不執行完,不執行Bar
15         #回調的內存id和主進程同樣,好比備份數據庫時能夠在主進程和MySQL創建一條連接,以後子進程回調進行備份,若是沒有回調,每一條子進程都要創建一條連接,效率低
16         # pool.apply(func=Foo, args=(i,)) #串行
17         # pool.apply_async(func=Foo, args=(i,)) #並行
18     pool.close() #得先close再join---死記
19     pool.join() #進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。
20     print('end')
進程池

3、協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是協程:協程是一種用戶態的輕量級線程。(cpu都不知道協程的存在,協程是用戶本身控制的,在單線程下實現併發的效果,遇到io操做就切換)

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:

協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

協程的好處:

1.無需線程上下文切換的開銷

2.無需原子操做鎖定及同步的開銷

  "原子操做(atomic operation)是不須要synchronized",所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另外一個線程)。原子操做能夠是一個步驟,也能夠是多個操做步驟,可是其順序是不能夠被打亂,或者切割掉只執行部分。視做總體是原子性的核心。

3.方便切換控制流,簡化編程模型

4.高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。

缺點:

1.沒法利用多核資源:協程的本質是個單線程,它不能同時將單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。

2.進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序

使用yield實現協程操做例子:

 1 import time
 2 import queue
 3 def consumer(name):
 4     print("--->starting eating baozi...")
 5     while True:
 6         new_baozi = yield #程序返回,喚醒時接收數據
 7         print("[%s] is eating baozi %s" % (name, new_baozi))
 8         # time.sleep(1)
 9 def producer():
10     r = con.__next__() #調用yield第一次是生成器
11     r = con2.__next__()
12     n = 0
13     while n < 5:
14         n += 1
15         con.send(n)
16         con2.send(n)
17         time.sleep(1)
18         print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
19 if __name__ == '__main__':
20     con = consumer("c1")
21     con2 = consumer("c2")
22     p = producer()
yield

看樓上的例子,我問你這算不算作是協程呢?你說,我他媽哪知道呀,你前面說了一堆廢話,可是並沒告訴我協程的標準形態呀,我腚眼一想,以爲你說也對,那好,咱們先給協程一個標準定義,即符合什麼條件就能稱之爲協程:

1.必須在只有一個單線程裏實現併發

2.修改共享數據不需加鎖

3.用戶程序裏本身保存多個控制流的上下文棧

4.一個協程遇到IO操做自動切換到其它協程

基於上面這4點定義,咱們剛纔用yield實現的程並不能算是合格的線程,由於它有一點功能沒實現,哪一點呢?

Greenlet:

greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

 1 from greenlet import greenlet
 2 def test1():
 3     print(12)
 4     gr2.switch()
 5     print(34)
 6     gr2.switch()
 7 def test2():
 8     print(56)
 9     gr1.switch()
10     print(78)
11 gr1 = greenlet(test1) #啓動一個協程
12 gr2 = greenlet(test2)
13 gr1.switch() #手動切換
greenlet

感受確實用着比generator還簡單了呢,但好像尚未解決一個問題,就是遇到IO操做,自動切換,對不對?

Gevent :

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

 1 import gevent
 2 def func1():
 3     print('\033[31;1m李闖在跟海濤搞...\033[0m')
 4     gevent.sleep(2)#自動判斷io,程序花最長io這麼多時間,要是串行會花3s
 5     print('\033[31;1m李闖又回去跟繼續跟海濤搞...\033[0m')
 6 def func2():
 7     print('\033[32;1m李闖切換到了跟海龍搞...\033[0m')
 8     gevent.sleep(1)
 9     print('\033[32;1m李闖搞完了海濤,回來繼續跟海龍搞...\033[0m')
10 def func3():
11     print("running func3")
12     gevent.sleep(0) #io遇到會切換
13     print("running func3 again")
14 gevent.joinall([
15     gevent.spawn(func1),#生成一個協程
16     gevent.spawn(func2),
17     gevent.spawn(func3),
18 ])
gevent

同步與異步的性能區別 

 1 from gevent import monkey
 2 monkey.patch_all()
 3 import gevent
 4 from  urllib.request import urlopen
 5 import time
 6 def f(url):
 7     print('GET: %s' % url)
 8     resp = urlopen(url)
 9     data = resp.read()
10     print('%d bytes received from %s.' % (len(data), url))
11 urls = [ 'https://www.python.org/',
12          'https://www.yahoo.com/',
13          'https://github.com/'
14          ]
15 time_start = time.time()
16 for url in urls:
17     f(url)
18 print("同步cost",time.time() - time_start)
19 async_time_start = time.time()
20 gevent.joinall([
21     gevent.spawn(f, 'https://www.python.org/'),
22     gevent.spawn(f, 'https://www.yahoo.com/'),
23     gevent.spawn(f, 'https://github.com/'),
24 ])
25 print("異步cost",time.time()-async_time_start )
同步異步

遇到IO阻塞時會自動切換任務

 1 from urllib import request
 2 import gevent
 3 from gevent import monkey
 4 monkey.patch_all() #把當前程序的全部的io操做給我單獨作上標記
 5 def f(url):
 6     print('GET: %s' % url)
 7     resp = request.urlopen(url)#請求一個url
 8     data = resp.read()#取到結果,data就是要下載的網頁
 9     print('%d bytes received from %s.' % (len(data), url))
10 gevent.joinall([
11     gevent.spawn(f, 'https://www.python.org/'),
12     gevent.spawn(f, 'https://www.yahoo.com/'),
13     gevent.spawn(f, 'https://github.com/'),
14 ])
爬蟲

4、論事件驅動與異步IO

一般,咱們寫服務器處理模型的程序時,有如下幾種模型:
(1)每收到一個請求,建立一個新的進程,來處理該請求;ForkingTCPServer(通常不用,耗資源)
(2)每收到一個請求,建立一個新的線程,來處理該請求;ThreadingTCPServer
(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)中方法,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。
第(2)種方式,因爲要涉及到線程的同步,有可能會面臨死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都複雜。(事件驅動模式)
綜合考慮各方面因素,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式(Nginx,python不少模型)

看圖說話講事件驅動模型

在UI編程中,經常要對鼠標點擊進行響應,首先如何得到鼠標點擊呢?
方式一:建立一個線程,該線程一直循環檢測是否有鼠標點擊,那麼這個方式有如下幾個缺點
1. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
2. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
3. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題;
因此,該方式是很是很差的。
方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
1. 有一個事件(消息)隊列;
2. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);
3. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
4. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數;

事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。

讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了。

在單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。

在多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。

在事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。

當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:

1.程序中有許多任務,並且…

2.任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…

3.在等待事件到來時,某些任務會阻塞。

當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。

網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。

此處要提出一個問題,就是,上面的事件驅動模型中,只要一遇到IO就註冊一個事件,而後主程序就能夠繼續幹其它的事情了,只到io處理完畢後,繼續恢復以前中斷的任務,這本質上是怎麼實現的呢?哈哈,下面咱們就來一塊兒揭開這神祕的面紗。。。。

----io多路複用:http://www.cnblogs.com/alex3714/articles/5876749.html 

同步IO和異步IO,阻塞IO和非阻塞IO分別是什麼,到底有什麼區別?不一樣的人在不一樣的上下文下給出的答案是不一樣的。因此先限定一下本文的上下文。如下討論的背景是Linux環境下的network IO。

在 Linux 的緩存 I/O 機制中,操做系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。從內核態到用戶態。

1.io模式

對於一次IO訪問(以read舉例),數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。因此說,當一個read操做發生時,它會經歷兩個階段:
1. 等待數據準備 (Waiting for the data to be ready)
2. 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)

正式由於這兩個階段,linux系統產生了下面五種網絡模式的方案。
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路複用( IO multiplexing)
- 信號驅動 I/O( signal driven IO)
- 異步 I/O(asynchronous IO)

注:因爲signal driven IO在實際中並不經常使用,因此我這隻說起剩下的四種IO Model。

阻塞 I/O(blocking IO)

在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:

當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據(對於網絡IO來講,不少時候數據在一開始尚未到達。好比,尚未收到一個完整的UDP包。這個時候kernel就要等待足夠的數據到來)。這個過程須要等待,也就是說數據被拷貝到操做系統內核的緩衝區中是須要一個過程的。而在用戶進程這邊,整個進程會被阻塞(固然,是進程本身選擇的阻塞)。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。

因此,blocking IO的特色就是在IO執行的兩個階段都被block了。

單線程阻塞模式下沒辦法實現多路io

非阻塞 I/O(nonblocking IO)

linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:

當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。

因此,nonblocking IO的特色是用戶進程須要不斷的主動詢問kernel數據好了沒有。

I/O 多路複用( IO multiplexing)

IO multiplexing就是咱們說的select,poll,epoll,有些地方也稱這種IO方式爲event driven IO。select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程。

當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。

因此,I/O 多路複用的特色是經過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)(索引值)其中的任意一個進入讀就緒狀態,select()函數就能夠返回。

這個圖和blocking IO的圖其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。

因此,若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。)

在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

異步 I/O(asynchronous IO)

linux下的asynchronous IO其實用得不多。先看一下它的流程:

用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了。

2.總結

blocking和non-blocking的區別

調用blocking IO會一直block住對應的進程直到操做完成,而non-blocking IO在kernel還準備數據的狀況下會馬上返回。

synchronous IO和asynchronous IO的區別

在說明synchronous IO和asynchronous IO的區別以前,須要先給出二者的定義。POSIX的定義是這樣子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;

二者的區別就在於synchronous IO作」IO operation」的時候會將process阻塞。按照這個定義,以前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。

有人會說,non-blocking IO並無被block啊。這裏有個很是「狡猾」的地方,定義中所指的」IO operation」是指真實的IO操做,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,若是kernel的數據沒有準備好,這時候不會block進程。可是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。

而asynchronous IO則不同,當進程發起IO 操做以後,就直接返回不再理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程當中,進程徹底沒有被block。

各個IO Model的比較如圖所示:

經過上面的圖片,能夠發現non-blocking IO和asynchronous IO的區別仍是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,可是它仍然要求進程去主動的check,而且當數據準備完成之後,也須要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則徹底不一樣。它就像是用戶進程將整個IO操做交給了他人(kernel)完成,而後他人作完後發信號通知。在此期間,用戶進程不須要去檢查IO操做的狀態,也不須要主動的去拷貝數據。

5、Select\Poll\Epoll異步IO http://www.cnblogs.com/alex3714/p/4372426.html 

windows不支持epoll,支持select。epoll如今用的最多,監視的文件描述符的數量沒有限制。市面上Nginx,Tornado,Twisted所謂的異步io實際上是io多路複用。

select,epoll至關於底層的東西,平時用的時候都是用封裝好的,下面只講下select

Python select 

Python的select()方法直接調用操做系統的IO接口,它監控sockets,open files, and pipes(全部帶fileno()方法的文件句柄)什麼時候變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器。

注意:Using Python’s file objects with select() works for Unix, but is not supported under Windows.

接下來經過echo server例子要以瞭解select 是如何經過單進程實現同時處理多個非阻塞的socket鏈接的

 1 import select
 2 import socket
 3 import queue
 4 server=socket.socket()
 5 server.bind(("localhost",9000))
 6 server.listen(1000)
 7 server.setblocking(False) #設置爲不阻塞
 8 msg_dic={}
 9 inputs=[server,]
10 #inputs=[server,conn,coon2] #[conn] server活躍表明來了新連接,conn活躍表明傳數據
11 outputs=[] #r1
12 while True:
13     readable,writeable,exceptional=select.select(inputs,outputs,inputs) #交給select監測
14     print(readable,writeable,exceptional)
15     for r in readable:
16         if r is server:#表明來了一個新連接
17             conn,addr=server.accept()#沒有連接就報錯
18             print("來了個新連接",addr)
19             inputs.append(conn) #是由於這個新創建的連接還沒發數據過來,如今就接收的話程序就報錯了,
20             #因此要想實現這個客戶端發數據來時server端能知道,就須要讓select再監測這個conn
21             msg_dic[conn]=queue.Queue()#初始化一個隊列,後面要存返回給這個客戶端的數據
22         else:
23             data=r.recv(1024) #conn.recv的話,conn2來數據會出錯
24             print("收到數據",data)
25             msg_dic[r].put(data)
26             outputs.append(r) #放入返回的連接隊列裏
27             # r.send(data) 發給客戶端不會出錯,可是客戶端不收的話數據就沒了
28             # print("send done")
29     for w in writeable: #要返回給客戶端的連接列表
30         data_to_client=msg_dic[w].get()
31         w.send(data_to_client) #返回給客戶端源數據,不會當即返回,而是下一次循環返回
32         outputs.remove(w) #確保下次循環的時候writeable不會返回這個已經處理完的連接了
33     for e in exceptional:
34         if e in outputs:
35             outputs.remove(e)
36         inputs.remove(e)
37         del msg_dic[e]
python_select_server
 1 import socket
 2 HOST = 'localhost'  # The remote host
 3 PORT = 9000  # The same port as used by the server
 4 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 5 s.connect((HOST, PORT))
 6 while True:
 7     msg = bytes(input(">>:"), encoding="utf8")
 8     s.sendall(msg)
 9     data = s.recv(1024)
10     print('Received',data) #repr格式化輸出
11 s.close()
client

 完整版:

 1 import select
 2 import socket
 3 import sys
 4 import queue
 5 server = socket.socket()
 6 server.setblocking(0)
 7 server_addr = ('localhost',9998)
 8 print('starting up on %s port %s' % server_addr)
 9 server.bind(server_addr)
10 server.listen(5)
11 inputs = [server, ] #本身也要監測呀,由於server自己也是個fd
12 outputs = []
13 message_queues = {}
14 while True:
15     print("waiting for next event...")
16     readable, writeable, exeptional = select.select(inputs,outputs,inputs) #若是沒有任何fd就緒,那程序就會一直阻塞在這裏
17     for s in readable: #每一個s就是一個socket
18         if s is server: #別忘記,上面咱們server本身也當作一個fd放在了inputs列表裏,傳給了select,若是這個s是server,表明server這個fd就緒了,
19             #就是有活動了, 什麼狀況下它纔有活動? 固然 是有新鏈接進來的時候 呀
20             #新鏈接進來了,接受這個鏈接
21             conn, client_addr = s.accept()
22             print("new connection from",client_addr)
23             conn.setblocking(0)
24             inputs.append(conn) #爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接
25             #就會被交給select去監聽,若是這個鏈接的客戶端發來了數據 ,那這個鏈接的fd在server端就會變成就續的,select就會把這個鏈接返回,返回到
26             #readable 列表裏,而後你就能夠loop readable列表,取出這個鏈接,開始接收數據了, 下面就是這麼幹 的
27             message_queues[conn] = queue.Queue() #接收到客戶端的數據後,不馬上返回 ,暫存在隊列裏,之後發送
28         else: #s不是server的話,那就只能是一個 與客戶端創建的鏈接的fd了
29             #客戶端的數據過來了,在這接收
30             data = s.recv(1024)
31             if data:
32                 print("收到來自[%s]的數據:" % s.getpeername()[0], data)
33                 message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端
34                 if s not  in outputs:
35                     outputs.append(s) #爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端
36             else:#若是收不到data表明什麼呢? 表明客戶端斷開了呀
37                 print("客戶端斷開了",s)
38                 if s in outputs:
39                     outputs.remove(s) #清理已斷開的鏈接
40                 inputs.remove(s) #清理已斷開的鏈接
41                 del message_queues[s] ##清理已斷開的鏈接
42     for s in writeable:
43         try :
44             next_msg = message_queues[s].get_nowait()
45         except queue.Empty:
46             print("client [%s]" %s.getpeername()[0], "queue is empty..")
47             outputs.remove(s)
48         else:
49             print("sending msg to [%s]"%s.getpeername()[0], next_msg)
50             s.send(next_msg.upper())
51     for s in exeptional:
52         print("handling exception for ",s.getpeername())
53         inputs.remove(s)
54         if s in outputs:
55             outputs.remove(s)
56         s.close()
57         del message_queues[s]
server
 1 import socket
 2 import sys
 3 messages = [ b'This is the message. ',
 4              b'It will be sent ',
 5              b'in parts.',
 6              ]
 7 server_address = ('localhost', 9998)
 8 # Create a TCP/IP socket
 9 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(400)]
10 print(socks)
11 # Connect the socket to the port where the server is listening
12 print('connecting to %s port %s' % server_address)
13 for s in socks:
14     s.connect(server_address)
15 for message in messages:
16     # Send messages on both sockets
17     for s in socks:
18         print('%s: sending "%s"' % (s.getsockname(), message) )
19         s.send(message)
20     # Read responses on both sockets
21     for s in socks:
22         data = s.recv(1024)
23         print( '%s: received "%s"' % (s.getsockname(), data) )
24         if not data:
25             print('closing socket', s.getsockname() )
client

selectors模塊

This module allows high-level and efficient I/O multiplexing, built upon the select module primitives. Users are encouraged to use this module instead, unless they want precise control over the OS-level primitives used.

 1 import selectors
 2 import socket
 3 sel = selectors.DefaultSelector()
 4 def accept(sock, mask):
 5     conn, addr = sock.accept()  # Should be ready
 6     print('accepted', conn, 'from', addr,mask)
 7     conn.setblocking(False)
 8     sel.register(conn, selectors.EVENT_READ, read) #新連接註冊read回調函數
 9 def read(conn, mask):
10     data = conn.recv(1024)  # Should be ready
11     if data:
12         print('echoing', repr(data), 'to', conn)
13         conn.send(data)  # Hope it won't block
14     else:
15         print('closing', conn)
16         sel.unregister(conn)
17         conn.close()
18 sock = socket.socket()
19 sock.bind(('localhost', 9999))
20 sock.listen(5000)
21 sock.setblocking(False)
22 sel.register(sock, selectors.EVENT_READ, accept) #註冊事件
23 while True:
24     events = sel.select() #有多是epoll,看系統支持啥。默認是阻塞,有活動連接就返回活動的連接列表
25     for key, mask in events:
26         callback = key.data #至關於調accept
27         callback(key.fileobj, mask) #key.fileobj文件句柄,至關於連接還沒創建好的r
selector

做業:用select或selector作一個ftp多併發的上傳和下載

相關文章
相關標籤/搜索