上次講了因爲GIL鎖的存在,Python的多線程是假的,用的仍是CPU的單核。Python的多線程只是利用了CPU的上下文切換,上下分切換也是佔用CPU的。那麼何時用多行程?
Python的多線程,適合IO密集型的任務,不適合CPU密集型的任務。
IO操做不佔用CPU,好比socket這種網絡編程的情景。
計算佔用CPU,因此大量計算的情景下多線程反而更慢,額外消耗了CPU切換上下文的計算。html
多進程的基本語法和多線程差很少:python
import multiprocessing import time def show(name): time.sleep(2) print('hello', name) if __name__ == '__main__': p = multiprocessing.Process(target=show, args=('Jack',)) p.start() p.join() # join的效果就是等待子進程執行完畢 print('執行結束')
上面的例子,只模塊名變了,其餘都和多線程差很少。linux
下面的例子打印了進程的id號:git
import multiprocessing import os, time def info(title): print(title, 'module name:', __name__) # 模塊名 time.sleep(0.3) # 加點停頓,能夠看出來,全部進程真的是並行處理的 print(title, 'parent process:', os.getppid()) # 父進程號 time.sleep(0.3) print(title, 'process id:', os.getpid()) # 進程號 def f(title): info(title) if __name__ == '__main__': info('main') for i in range(10): # 此次起10個進程 p = multiprocessing.Process(target=f, args=('p%s' % i,)) p.start()
能夠適當修改加長info的延時,,能夠去系統裏查看一下全部進程的狀況,以下圖:
上面起了10個子進程,加上主進程,一個11個python進程。
我是用pycharm執行的代碼,主進程的ID是8036,主進程的父進程是pycharm7832。
而後,全部的子進程,都是經過8036這個python的父進程開啓的。8036就是這些子進程的父進程。github
進程間的內存是獨立的,若是進程間須要交換數據,就須要藉助其餘方法web
下面的例子經過Queue實現了進程間的通訊,數據庫
from multiprocessing import Process, Queue import time def f(q): q.put([42, None, 'hello']) time.sleep(30) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" time.sleep(30)
這裏的Queue是 multiprocessing 多線程模塊裏的,不是以前的獨立的queue模塊。編程
from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() # 實例化管道後會生成2個對象,這2個對象是同樣的 p = Process(target=f, args=(child_conn,)) # 把管道的1頭交給子進程,本身操做另一頭 p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']"
上面實例化管道以後的兩個對象是同樣的,不要被名字誤導,這麼取名是隨進程的關係。
實例化後的兩個對象就是管道的兩頭。通信的雙方任意各取一頭操做,就能實現管道兩頭的通信。這裏是父進程留下一頭,把另外一頭傳遞給子進程操做。
這裏管道的操做使用send和recv,相似socket(不過沒有黏包)。這邊send一次對端就recv一次獲取數據,若是一邊send屢次,那麼對端也只能一次recv取到一次的數據,因此也得recv屢次才能取到所有的數據。若是數據取完了,再recv則會阻塞,等待管道對端send數據進來。windows
上面的2個方法,只是實現了數據的傳遞。
經過Manager生成的數據對象,能夠在多個進程間共享。如下的數據類型均可以經過Manager來生成,實現共享:
list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array
舉例說明:緩存
from multiprocessing import Process, Manager import os def f(d, l): pid = os.getpid() d['pid'] = pid # 每次這個key都會被重寫,最後打印的必定是最後一個操做的進程的結果 d[pid] = pid # 每次都是生成一個新的key,添加到字典裏 l.append(pid) # 每次往列表中添加一個元素 if __name__ == '__main__': with Manager() as manager: # 等於manager = Manager(),最後還省了一個manager.close() d = manager.dict() # 生成一個共享的字典 l = manager.list() # 生成一個共享的列表 p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
線程有線程鎖,進程也有進程鎖。用法也和線程鎖同樣,以下:
from multiprocessing import Process, Lock import time def f(l, n): l.acquire() print('hello world', n, end='', flush=True) time.sleep(0.1) print(" Finished", n) l.release() time.sleep(0.1) if __name__ == '__main__': lock = Lock() for n in range(10): Process(target=f, args=(lock, n)).start()
有鎖,可是咱們要鎖什麼?上面的代碼就算不加鎖應該也不必定會有什麼問題。線程鎖咱們鎖的是內存,由於線程共享的是同一塊內存空間。進程鎖鎖的是資源,進程就是各類資源的集合。好比例子中的print用到的就是屏幕輸出的資源,若是不加鎖,可能幾個進程同時想操做屏幕輸出內容,那麼就有可能會形成最終輸出的字符錯亂。不過打印數據應該是一串一次性進緩存的,應該也不會出現被插隊吧。
雖然試不出字符錯亂的狀況,可是對兩次print之間不要插入別的進程的內容。好比例子中去掉進程鎖就會出現兩段內容混亂的狀況。
和線程鎖的狀況同樣,加了鎖以後,其實就應該是暫時變成了串行執行了。
每起一個進程,就是克隆一份父進程的基本數據給子進程用。這樣開銷會很大(好比內存),系統資源是有限的,無限起進程,可能會致使系統癱瘓。因此要有進程池。
學習線程的時候也有相似的問題,不過線程佔用資源小,不容易致使系統癱瘓,可是必定會致使CPU頻繁切換上下文致使效率反而會下降。因此要有信號量。
設定進程池,能夠限制一次起的進程的數量。這點有點像信號量
進程池有兩個方法:
先看apply的例子:
from multiprocessing import Pool import time def Foo(i): time.sleep(1) print("In Foo", i) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply(func=Foo, args=(i,)) print('end') pool.close() # 老師說進程池這裏要先close再join,至於爲啥不知道 pool.join() # 進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。
執行的時候就是同時起5個進程(Pool中設置了進程池的大小),可是是一個一個依次執行的。執行完畢後會退出而後能夠將以後的進程放入進程池等待執行。
注意最後要先close再join。能夠試驗一下,若是不執行close,可是直接執行join的話會報錯。
使用這個方法就是並行效果了,一次並行執行5個。每完成1個會再將下一個放入進程池立刻執行:
from multiprocessing import Pool import time def Foo(i): time.sleep(1) print("In Foo", i) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,)) print('end') pool.close() pool.join() # 進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。
注意:仍是close和join的問題,剛纔異步的時候其實不寫也沒問題。可是這裏因爲是並行的,主進程執行完畢以後若是沒有join就會直接關閉了。子進程也不會再執行了,就像守護線程那樣。因此這裏必定要加上join,等待pool裏的進程都執行完畢。而後join前必需要close。
callback參數 :這裏還有一個callback參數,函數執行完成後能夠調用另外一個函數,這個叫回調函數。
回調函數 :就是前面的方法執行完以後,就會自動對用執行這個回調函數。而且是由主進程調用執行的。
舉例說明:
from multiprocessing import Pool import os import time def Foo(i): time.sleep(1) # raise return os.getpid(), os.getppid() def Bar(arg): print(os.getpid(), 'Foo 執行完畢,結果:', arg) if __name__ == '__main__': pool = Pool(5) for i in range(10): pool.apply_async(func=Foo, args=(i,), callback=Bar) print('end') pool.close() pool.join() # 進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。
回調函數只有在函數正常執行完以後纔會被調用。Foo中有一句raise,主動拋出一個錯誤,若是去掉註釋致使函數沒有正常執行完成,rais以前的print仍是會正常執行,可是不會調用callback的函數執行。
另外,這裏打印了每一個進程id,從id中能夠看到,Foo函數是由主進程啓動的子進程執行的。而callback的函數是由主進程來執行的。Foo的父進程id就是Bar的進程id
回調函數的意義,主要就是由於回調函數是由主進程執行的。若是子進程的執行結果須要記錄保留,那麼這部分工做就經過調用回調函數,由回調函數在主進程中來處理。好比將結果寫入數據庫,咱們就要讓每一個子進程都鏈接數據庫寫入數據,而是在主進程裏創建一個與數據庫的鏈接,統一將執行結果寫入數據庫。雖然調用的是同一個函數,可是經過回調函數調用在主進程中執行效率會更高。好比例子中的作法,Foo負責返回數據,回調函數統一打印Foo的執行結果。
協程,又稱微線程,纖程。英文名Coroutine。一句話說明:協程是一種用戶態的輕量級線程。
協程的好處:
所謂原子操做是指不會被線程調度機制打斷的操做;這種操做一旦開始,就一直運行到結束,中間不會有任何
協程的缺點:
以前學習生成器的時候,經過yield實現了單線程下多併發。可是那也不是真正的協程。
協程的定義:
總結:使用協程就是爲了高效。協程如何實現高效?一遇到IO操做就切換,由於IO操做耗時可是不佔用CPU,此時切換到另外一個協程,高效的利用CPU。
問題:什麼時候切換回來?IO操做結束了就能夠切換回來。如何知道IO操做結束了?往下學 ...
這裏寫個例子,回顧一下yield的用法。可是yield並不知足咱們前面對協程的定義。
下面的例子會先啓動B,B會啓動A。B中打印後切換到A執行,A返回後循環。A中打印後經過yield返回,循環。A和B之間經過yield和send來傳遞count的值,每次都自增1。
import time def print_A(): count = 0 while True: print('A'.center(9, '-'), count) time.sleep(0.1) count = yield count+1 # 從B那裏send過來的值,賦值給count。而後回到開頭執行打印,把count值自增1後再返回給B def print_B(func): count = next(func) # 要先next一下,啓動A,這樣A會先運行一次到yield的地方返回 while True: print('B'.center(9, '-'), count) time.sleep(0.1) count = func.send(count+1) # 將值傳遞給A的yield,並獲取A的返回值 if __name__ == '__main__': a = print_A() print_B(a)
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator。
這個是第三方模塊,因此須要安裝。安裝的話直接安裝下面要講的gevent模塊就行了。
greenlet 模塊實現的是協程的手動切換,其實就和yield差很少。不過用起來更好理解了。
gevent 模塊才能實現咱們要的自動切換,可是gevent是在greenlet的基礎上進行了封裝,實現了自動切換。因此安裝的時候順便把有依賴關係的greenlet模塊一塊兒裝好了。如今天然也是先看一下greenlet模塊的用法。
from greenlet import greenlet def test1(): print('A1') gr2.switch() print('A2') gr2.switch() def test2(): print("B1") gr1.switch() print('B2') if __name__ == '__main__': gr1 = greenlet(test1) # 啓動一個協程 gr2 = greenlet(test2) # 再啓動一個協程 gr1.switch() # 手動切換一下
執行一下switch,就完成了切換的操做。比yield更直觀。不太重點是學下一個模塊。
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet,它是以C擴展模塊形式接入Python的輕量級協程。Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
import gevent def func1(): print("This is in func1") gevent.sleep(2) print("End of func1") def func2(): print("This is in func2") gevent.sleep(1) print("End of func2") if __name__ == '__main__': gevent.joinall([gevent.spawn(func1), gevent.spawn(func2)]) # 上面是把要啓動的方法加到列表裏一塊兒處理了,推薦就這麼作 # 其實也可使用start和join一個一個啓動, f1 = gevent.spawn(func1) f2 = gevent.spawn(func2) f1.start() # 沒有start,貌似也同樣,註釋掉試一下 f2.start() f1.join() # 沒有join主線程會直接退出,就不會等待上面的協程的執行結果了 f2.join()
要啓動全部的協程,經過gevent.joinall。參數是一個列表,列表中依次啓動須要進行自動切換操做的協程。而且阻塞等待全部協程處理完畢。至關於start和join
上面的gevent.sleep是一個模擬IO操做,不會像time.sleep那樣停在那裏,而是會當作有一個幾秒的IO操做。上面的輸出結果是:
This is in func1 This is in func2 End of func2 End of func1
首先執行了func1,打印了第一行。而後以後是一個IO操做,因此切換到了下一個協程。
切換到func2,打印了一行,以後又是一個IO操做,此時再切換。不過此時已經沒有可操做的協程了。沒別的協程了,fun1也沒好。
以後是fun2的IO操做先執行完畢,因此最終切換到fun2的時候,打印了func2的第二行,打印前會頓1秒。
最後func1的IO操做也結束了,因而切換到fun1,打印fun1的第二行,打印前會再頓1秒。
這根本就不是爬蟲,這裏先講如何將一個網頁保存到本地,由於這就是一個比較耗時的IO操做。剛纔的例子中咱們是用sleep來模擬的。直接上代碼就行了:
from urllib import request def f(url): print('GET: %s' % url) resp = request.urlopen(url) # 給網址發一個請求 data = resp.read() # 讀取到的就是整個網頁的內容 with open('url.html', 'wb') as file: # 將網頁保存下來 file.write(data) print('%d betes received from %s' % (len(data), url)) if __name__ == '__main__': f('http://www.python.org/')
而後咱們來多爬幾個網頁,看下協程的效果。此次計算一下整個過程的時間:
from urllib import request import gevent import time def f(url): print('GET: %s' % url) resp = request.urlopen(url) # 給網址發一個請求 data = resp.read() # 讀取到的就是整個網頁的內容 # with open('url.html', 'wb') as file: # 保存網頁的操做就先不用了 # file.write(data) print('%d betes received from %s' % (len(data), url)) if __name__ == '__main__': g_list = [] url_list = ['http://www.python.org/', 'http://www.yahoo.com/', 'http://github.com/'] for url in url_list: g_list.append(gevent.spawn(f, url)) start_time = time.time() gevent.joinall(g_list) print('運行時間:', time.time()-start_time)
上面這段雖然用了gevent,可是仍是串行的。究竟是串行仍是並行,只要看f函數開始的時候的第一句print是何時出現的就知道了。這裏之因此是串行,是由於,並無看到IO切換的命令,就是f函數裏沒有相似gevent.sleep這樣的切換命令。可是,其實gevent是能夠自動判斷是否有IO操做的。因此這裏的問題是gevent發現不了urllib模塊裏的IO操做。
因此真正要作的是在開頭加上一句,讓gevent可以發現這些IO操做。
from urllib import request import gevent import time # 導入模塊,添加下面這2句 from gevent import monkey monkey.patch_all() # 把當前程序的全部的IO操做給我單獨的作上標記 def f(url): print('GET: %s' % url) resp = request.urlopen(url) # 給網址發一個請求 data = resp.read() # 讀取到的就是整個網頁的內容 # with open('url.html', 'wb') as file: # 保存網頁的操做就先不用了 # file.write(data) print('%d betes received from %s' % (len(data), url)) if __name__ == '__main__': g_list = [] url_list = ['http://www.python.org/', 'http://www.yahoo.com/', 'http://github.com/'] for url in url_list: g_list.append(gevent.spawn(f, url)) start_time = time.time() gevent.joinall(g_list) print('運行時間:', time.time()-start_time)
gevent裏有專門的socket模塊,固然其中大部分都是import原生的socket模塊。咱們可使用gevent的socket實現一個單線程下高併發的socket_server。
服務端:
# 就算這裏不導入socket,也會在gevent模塊裏導入。可是pycharm裏下面的socket.socket會顯示個錯誤,不影響運行 # 並且到下面也是要導入的,這裏顯示的聲明一下,提早導入也不會影響效率 # import socket # 建議導入 # 下面兩句也能夠不要,這裏不加也能識別到socket的IO操做 # 只因此能識別,是由於下面的socket已經不是原生的socket了,而是gevent修改後的socket # from gevent import monkey # monkey.patch_all() import gevent from gevent import socket def server(port): # 這裏pycharm裏會顯示個錯誤,不影響運行。由於gevent的socket模塊裏沒有socket這個方法 # 可是其實gevent會把原生的socket所有導入的。就是運行的時候會有socket.socket這個方法 server = socket.socket() server.bind(('localhost', port)) server.listen(500) print("監聽已經開始") while True: conn, addr = server.accept() print("發現鏈接請求:\n%s\n%s" % (conn, addr)) gevent.spawn(handle_request, conn) # 上面的函數創建了鏈接後,就將鏈接做爲handle_request的參數,啓動一個gevent的協程 # 下面的方法是經過協程啓動的,是協程併發運行的 def handle_request(conn): while True: data = conn.recv(1024) if not data: break print("recv:", data.decode('utf-8')) conn.send(data.upper()) conn.close() print("斷開鏈接:", conn) if __name__ == '__main__': server(8002)
客戶端只須要以前的客戶端就能夠了。這裏測試一下效率,起100個線程,每一個線程發送100條消息:
import socket import threading HOST = 'localhost' # The remote host PORT = 8002 # The same port as used by the server def client(i): client = socket.socket() client.connect((HOST, PORT)) for j in range(100): msg = "hello %s %s" % (i, j) client.send(msg.encode('utf-8')) data = client.recv(1024) print('Received:', data.decode('utf-8')) client.close() if __name__ == '__main__': for i in range(100): t = threading.Thread(target=client, args=(i,)) t.start()
這裏有一個問題,記一下。 gevent.spawn()
一般,咱們寫服務器處理模型的程序時,有如下幾種模型:
三種方法各有千秋,以前應該都說過,這裏就當總結一下:
事件驅動模型,目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件(好比web頁面),這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。
在面對以下的環境時,事件驅動模型一般是一個好的選擇:
此處重申一下協程開篇提出的問題,只要一遇到IO就註冊一個事件,而後主程序就能夠繼續幹其它的事情了,直到IO處理完畢後,繼續恢復以前中斷的任務,這本質上是怎麼實現的呢?
用戶空間與內核空間:如今操做系統都是採用虛擬存儲器,那麼對32位操做系統而言,它的尋址空間(虛擬存儲空間)爲4G(2的32次方)。操做系統的核心是內核,獨立於普通的應用程序,能夠訪問受保護的內存空間,也有訪問底層硬件設備的全部權限。爲了保證用戶進程不能直接操做內核(kernel),保證內核的安全,操做系統將虛擬空間劃分爲兩部分,一部分爲內核空間,一部分爲用戶空間。針對linux操做系統而言,將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱爲內核空間,而將較低的3G字節(從虛擬地址0x00000000到0xBFFFFFFF),供各個進程使用,稱爲用戶空間。
緩存 I/O :又被稱做標準 I/O,大多數文件系統的默認 I/O 操做都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操做系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。
對於一次IO訪問(以read舉例),數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。因此說,當一個read操做發生時,它會經歷兩個階段:
正是由於上面的兩個階段,linux系統產生了下面五種網絡模式的方案:
五種模型的比較:
用的最多的是IO多路複用。雖然看似異步IO更好,反正用的很少。另外多線程+阻塞模式也是一個方案,可是多線程的開銷較大(相對於單線程),更適合處理少許的併發(多少算少?看你係統能起多少個線程,不過和進程比線程的開銷還不算特別大)。要處理高併發,推薦仍是使用IO多路複用。
文件描述符(File descriptor) :是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核爲每個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者建立一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫每每會圍繞着文件描述符展開。可是文件描述符這一律念每每只適用於UNIX、Linux這樣的操做系統。
I/O多路複用就是經過一種機制,一個進程能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),就通知程序進行相應的讀寫操做。
IO多路複用的三種機制:
select :最先出現,有些缺點,優點就是幾乎在全部平臺上都支持。
pool :解決了部分缺點,可是本質上沒多大差異,能夠認爲是個過分階段。
epool :如今用這個,性能最好的。可是不是全部系統都支持,windows就不支持。
Python的select()方法直接調用操做系統的IO接口,它監控sockets,open files,and pipes(全部帶fileno()方法的文件句柄)什麼時候變成readable 和writeable,或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器。
寫一個socket的例子來理解
注意:socket必須得運行在非阻塞模式下。以前用的都是默認的阻塞模式,阻塞模式下,若是沒有數據會等待。非阻塞模式下,若是沒有數據就會拋出異常。因此咱們須要用select來幫咱們監視
先寫到accept以前,accept以前只是設置,執行到accept在阻塞模式下會進入阻塞。咱們暫時只要能收到客戶端請求創建起鏈接就好。如今是阻塞模式,因此直接accept沒有數據就會報錯。因此就須要select來解決了,監視到有活動的鏈接再返回並繼續執行accept。
import select import socket server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("監聽已經開啓") server.setblocking(False) # 設置爲非阻塞 inputs = [server, ] outputs = [] # 3個參數,讀列表,寫列表,異常列表,就是你想讓內核監視哪些連接 # 異常列表監視的仍是連接返回的內容,那麼仍是在inputs裏,因此第三個參數仍是填inputs # 開始什麼都沒有,只有server,先把server加到inputs裏 # 就是select監視到server活動了,就能夠返回了 # 返回3個數據,監視到有活動的3個列表(讀列表,寫列表,異常列表) readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 這裏就是阻塞的,直到select監視到列表中有活動的連接,纔會繼續 # 非阻塞socket就是經過select來實現阻塞 print(readable, writeable, exceptional) # 下面有打印的內容 # [<socket.socket fd=472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] [] # 第一個fd,就是文件描述符,非負整數。 conn, addr = server.accept() print(conn, addr)
測試的客戶端,只發不收:
import socket HOST, PORT = "localhost", 9000 client = socket.socket() client.connect((HOST, PORT)) while True: msg = input(">>:").strip() if len(msg) == 0: break client.send(msg.encode('utf-8')) # 下面2行是接收服務的返回是要用的,暫時先註釋掉,到後面再啓用測試返回數據 # data = client.recv(1024) # print('Received:', data.decode('utf-8')) client.close()
上面連接已經能夠創建起來了,那麼來接收一條數據吧。這裏recv以前固然仍是要用select來監視是否有數據進來,有才會執行到recv。
import select import socket server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("監聽已經開啓") server.setblocking(False) # 設置爲非阻塞 inputs = [server, ] outputs = [] # 3個參數,讀列表,寫列表,異常列表,就是你想讓內核監視哪些連接 # 異常列表監視的仍是連接返回的內容,那麼仍是在inputs裏,因此第三個參數仍是填inputs # 若是inputs裏是讀的消息,會返回到readable列表裏,若是是異常消息,就返回到exceptional列表裏 # 開始什麼都沒有,只有server,先把server加到inputs裏 # 就是select監視到server活動了,就能夠返回了 # 返回3個數據,監視到活動的3個列表(讀列表,寫列表,異常列表) readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 這裏就是阻塞的,直到select監視到列表中有活動的連接,纔會繼續 # 非阻塞socket就是經過select來實現阻塞 print(readable, writeable, exceptional) # 打印一下監視到的活動連接 for r in readable: # 可能一次返回多個鏈接啊,因此得寫個循環 conn, addr = server.accept() # 已是非阻塞了,不通過select監視就會報錯。能夠註釋掉select試一下 print(conn, addr) inputs.append(conn) # 把連接加入監視列表 # 如今conn加到了inputs的監視列表裏了,就能夠經過select監視conn是否有數據進來了 readable, writeable, exceptional = select.select(inputs, outputs, inputs) print(readable, writeable, exceptional) # 看看打印內容,inputs裏的活動連接是conn,其實也多是server data = conn.recv(1024) print(data.decode('utf-8'))
這裏能夠再試一下,應該能夠接收到數據。可是這裏有個問題。由於如今inputs列表裏監視的內容是2個了,一個是server,一個是conn。若是再監視到server活動,說明又有新連接進來,若是監視到的是conn的活動,那麼纔是收到數據了。因此咱們要在for循環裏判斷收到的活動連接是誰的。
再多作一步,加上一層while循環,讓服務端始終處於這麼一個循環之中:select返回活動連接 ==> for循環處理全部的活動連接 循環繼續。
import select import socket server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("監聽已經開啓") server.setblocking(False) # 設置爲非阻塞 inputs = [server, ] outputs = [] # 3個參數,讀列表,寫列表,異常列表,就是你想讓內核監視哪些連接 # 異常列表監視的仍是連接返回的內容,那麼仍是在inputs裏,因此第三個參數仍是填inputs # 開始什麼都沒有,只有server,先把server加到inputs裏 # 就是select監視到server活動了,就能夠返回了 # 返回3個數據,監視到活動的3個列表(讀列表,寫列表,異常列表) while True: # select返回活動連接==>for循環處理全部的活動連接,循環往復 readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 這裏就是阻塞的,直到select監視到列表中有活動的連接,纔會繼續 # 非阻塞socket就是經過select來實現阻塞 print(readable, writeable, exceptional) # 下面有打印的內容 # [<socket.socket fd=472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] [] # 第一個fd,就是文件描述符,非負整數。 for r in readable: # 可能一次返回多個鏈接啊,因此得寫個循環 # 如今inputs裏有server 和 conn了, # 若是readable返回的是server的活動,表示來了一個新連接 # 若是readable返回的是conn的活動,表示收到了conn發來的數據 if r is server: conn, addr = server.accept() # 已是非阻塞了,不通過select監視就會報錯。能夠註釋掉select試一下 print(conn, addr) inputs.append(conn) # 把連接加入監視列表 else: data = conn.recv(1024) print(data.decode('utf-8'))
如今新連接也能再連上來了,可是問題是舊的鏈接沒有另外保存,最新連上的連接會再次賦值給conn。就連接發來的數據,致使select返回,可是會用conn去嘗試recv。如今conn是新的鏈接,因此是空的,因而就報錯。那麼解決這個事情就是要保存每個conn,就是說要再用一個列表保存全部的conn,再寫一個for循環?我一開始是這麼想的。
其實全部的鏈接都保存在inputs裏了,for循環的時候就是取出每個連接,在for循環裏面應該使用變量r,而不是conn。難怪以前用conn的時候,pycharm會提示 ‘name can not be defined’ 。第一次循環的時候沒有conn這個變量,不過也不會進到那個if裏
接下來繼續,咱們能夠直接把數據發回去,這裏沒有阻塞的問題,就不搞了。換個方法,不直接發回去,先把消息保存到隊列裏。而後統一發送,這樣就是發消息也是多路複用的形式。
import select import socket import queue server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("監聽已經開啓") server.setblocking(False) # 設置爲非阻塞 inputs = [server, ] outputs = [] data_queue = {} # 存放返回給客戶端消息的隊列,每一個客戶端鏈接一個隊列,就是一個item # 3個參數,讀列表,寫列表,異常列表,就是你想讓內核監視哪些連接 # 異常列表監視的仍是連接返回的內容,那麼仍是在inputs裏,因此第三個參數仍是填inputs # 開始什麼都沒有,只有server,先把server加到inputs裏 # 就是select監視到server活動了,就能夠返回了 # 返回3個數據,監視到活動的3個列表(讀列表,寫列表,異常列表) while True: # select返回活動連接==>for循環處理全部的活動連接,循環往復 readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 這裏就是阻塞的,直到select監視到列表中有活動的連接,纔會繼續 # 非阻塞socket就是經過select來實現阻塞 print('select返回:\n', readable, writeable, exceptional) # 下面有打印的內容 # [<socket.socket fd=472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] [] # 第一個fd,就是文件描述符,非負整數。 for r in readable: # 可能一次返回多個鏈接啊,因此得寫個循環 # 如今inputs裏有server 和 conn了, # 若是readable返回的是server的活動,表示來了一個新連接 # 若是readable返回的是conn的活動,表示收到了conn發來的數據 if r is server: conn, addr = r.accept() # r就是server print('接收到新的客戶端鏈接:\n', conn, addr) inputs.append(conn) data_queue[conn] = queue.Queue() # 建立鏈接的消息隊列 else: data = r.recv(1024) # r就是conn print(data.decode('utf-8')) # 接下來處理髮數據 # 把有數據返回的鏈接當道outputs列表裏,這樣下次循環select就會監視到 # 要發的數據也得保存,這裏用隊列來保存準備發送的數據 # 要爲每一個鏈接分別建一個隊列,不能把消息搞混,這裏用字典 # 字典的key就是鏈接,value就是該鏈接的消息隊列 outputs.append(r) # data_queue = {} 在使用前,要先創建一個空字典。這句放到While循環外 # data_queue[conn] = queue.Queue # 在客戶端創建鏈接時,就建立好鏈接的消息隊列。這句放在上面處理server.accept()裏面 data_queue[r].put(data.upper()) # 而後就先無論了。等到select再監視的時候,會返回到writeable列表裏。 # 因此後面還要寫一個writeable的for循環 # 雖然是在上面的for循環裏添加的,可是要等到在執行一次select後纔會在writeable裏有返回值 for w in writeable: data = data_queue[w].get() # 從隊列裏取出數據。這裏get了以後,這條消息就從隊列裏移除了 w.send(data) # 發數據,注意data的數據類型 outputs.remove(w) # 從outputs裏移除這個活動的鏈接,不然下次過來還有嘗試在發數據,可是消息隊列裏是空的
這裏把以前客戶端註釋掉的內容去掉測試一下收數據。而且已經能夠接入多個客戶端了,開2個試一下就行了。
剩下就是客戶端斷開的問題了。斷開有兩種狀況:
一種是正常斷開,客戶端close(),會發送一個空給服務的,那麼要在 data = r.recv(1024)
以後判斷一下是否是空,就和以前寫的socket服務器同樣。
還有一種是強行關閉客戶端,這時inputs仍然會收到活躍鏈接,可是recv的時候會拋出異常「ConnectionResetError」,這裏大概要把recv放到try裏,若是捕獲到異常,就斷開客戶端。
另外還有一個exceptional異常列表有返回的狀況,這裏也粗暴的斷開客戶端處理了好了。
客戶端斷開就是要清除掉字典和列表中的這個鏈接的信息。
import select import socket import queue server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("監聽已經開啓") server.setblocking(False) # 設置爲非阻塞 inputs = [server, ] outputs = [] data_queue = {} # 存放返回給客戶端消息的隊列,每一個客戶端鏈接一個隊列,就是一個item while True: # select返回活動連接==>for循環處理全部的活動連接,循環往復 # 返回3個數據,監視到活動的3個列表(讀列表,寫列表,異常列表) readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 這裏就是阻塞的,直到select監視到列表中有活動的連接,纔會繼續 print('select返回:\n', readable, writeable, exceptional) for r in readable: # 可能一次返回多個鏈接啊,因此得寫個循環 # 如今inputs裏有server 和 conn了, # 若是readable返回的是server的活動,表示來了一個新連接 # 若是readable返回的是conn的活動,表示收到了conn發來的數據 if r is server: conn, addr = r.accept() # r就是server print('接收到新的客戶端鏈接:\n', conn, addr) inputs.append(conn) # 將新鏈接加入到select監視列表中 data_queue[conn] = queue.Queue() # 建立新鏈接的消息隊列 else: # 這裏有3中狀況,有數據,有空數據(正常斷開),無數據(一旦recv就報錯) try: data = r.recv(1024) except Exception as error: print("recv時捕獲到異常:%s" % error) # 清除鏈接的4個操做,這段代碼重複用了3次,應該專門寫個函數引用 # 1 從讀列表中清除,這裏其實不用判斷,可是後面的for循環裏可能會嘗試重複remove # 2 若是還有沒發出去的消息,把鏈接從寫列表中清除 # 3 關閉鏈接 # 4 若是還有沒發出去的消息,把消息隊列的對象從字典裏清除 if r in inputs: inputs.remove(r) if r in outputs: outputs.remove(r) r.close() if r in data_queue: del data_queue[r] else: # 這裏處理能recv到數據,收到數據就加入outputs列表。爲空就清除鏈接 if data: print(data.decode('utf-8')) outputs.append(r) data_queue[r].put(data.upper()) else: print("客戶端已斷開:\n", r) inputs.remove(r) if r in outputs: outputs.remove(r) if r in data_queue: del data_queue[r] # 雖然是上面的for循環裏添加的,可是要等到在執行一次select後纔會在writeable裏有返回值 for w in writeable: data = data_queue[w].get() # 從隊列裏取出數據。這裏get了以後,這條消息就從隊列裏移除了 w.send(data) # 發數據,注意data的數據類型 outputs.remove(w) # 從outputs裏移除這個活動的鏈接,不然下次過來還有嘗試在發數據,可是消息隊列裏是空的 # 還有一個exceptional沒處理,仍是和上面同樣,再寫一個for循環 # 異常處理這裏仍是簡單粗暴把異常列表中的鏈接清除就行了 for e in exceptional: print("異常列表有返回:", e) if e in inputs: inputs.remove(e) if e in outputs: outputs.remove(e) e.close() if e in data_queue: del data_queue[e]
如今就能夠處理高併發了,寫一個多線程的客戶端,每一個線程循環發數據測試一下。
測試客戶端:
import socket import threading HOST = 'localhost' PORT = 9000 def client(i): client = socket.socket() client.connect((HOST, PORT)) for j in range(100): msg = "hello %s %s" % (i, j) client.send(msg.encode('utf-8')) data = client.recv(1024) print('Received:', data.decode('utf-8')) client.close() if __name__ == '__main__': for i in range(100): t = threading.Thread(target=client, args=(i,)) t.start()
到這裏感受好了,可是還有3地方有問題,下面一個一個說明。上面的代碼經不起強行斷開客戶端的考驗。
非阻塞模式下,若是調用recv()沒有發現任何數據,或send()調用沒法當即發送數據,那麼將引發異常。我這裏看到的是 "ConnectionResetError" 。
產生的緣由是強行斷開客戶端,致使這個鏈接已經失效,可是鏈接還在select返回的列表裏。這時以後的for循環裏還會嘗試去send或recv這個鏈接就會拋出異常。
解決辦法 :send 和 recv 的時候都得用try,而後捕獲到異常後,就把這個鏈接清理掉
data = data_queue[w].get()
這裏有時候會報錯 "KeyError" ,就是字典裏已經沒有這個key了。那就是在另外2個for循環裏已經將這個key清除了。這裏清除的時候沒清writeable列表,因此在清除鏈接的時候加一句:
if r in writeable: writeable.remove(r)
或者是用字典的get方法讀取,這樣在讀取不存在的key的時候,會返回None:
data = data_queue.get(w) if data: data = data.get() else: if w in outputs: outputs.remove(w) continue
上面有3處地方會清除鏈接,這裏能夠另外寫一個函數,須要清除的時候調用函數就行了。另外可能會再兩個for循環裏同時會要清除同一個鏈接,這樣在第二次清除的時候若是不作if判斷,就會報錯沒法刪除列表裏不存在的元素。這裏保險起見仍是在remove以前都加上if判斷。應該只有第一個for循環裏的inputs是必定有的不用判斷,別的地方均可能會報錯。
說好了3個問題,這裏還有一個暫時沒事出問題,就是可能發數據的時候一次發不完。如今的寫法都是默認一次send就是發完的,send以後直接從outputs裏remove掉。
也能夠換一種方法,send以後,不從移出outputs列表。那麼下次while循環還會進來,此時get隊列的時候要用get_nowait無阻塞模式取隊列,若是空會拋出隊列空的異常,那麼這個必定是發完了。此時再把連接移出outputs列表。若是須要能夠把上面字典 "KeyError" 一塊兒處理了,繼續用 data = data_queue[w].get()
取列表,放到try裏,捕獲2種異常分別處理掉。
後面的最終版本沒這麼用,我怕再踩到坑,先把想法記着。
下面給出一個全部遇到的問題都解決了的最終版本:
import select import socket import queue server = socket.socket() server.bind(('localhost', 9000)) server.listen() print("監聽已經開啓") server.setblocking(False) # 設置爲非阻塞 inputs = [server, ] outputs = [] data_queue = {} # 存放返回給客戶端消息的隊列,每一個客戶端鏈接一個隊列,就是一個item while True: # select返回活動連接==>for循環處理全部的活動連接,循環往復 # 返回3個數據,監視到活動的3個列表(讀列表,寫列表,異常列表) readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 這裏就是阻塞的,直到select監視到列表中有活動的連接,纔會繼續 print('select返回:\n', readable, writeable, exceptional) for r in readable: # 可能一次返回多個鏈接啊,因此得寫個循環 # 如今inputs裏有server 和 conn了, # 若是readable返回的是server的活動,表示來了一個新連接 # 若是readable返回的是conn的活動,表示收到了conn發來的數據 if r is server: conn, addr = r.accept() # r就是server print('接收到新的客戶端鏈接:\n', conn, addr) conn.setblocking(False) inputs.append(conn) # 將新鏈接加入到select監視列表中 data_queue[conn] = queue.Queue() # 建立新鏈接的消息隊列 else: # 這裏有3中狀況,有數據,有空數據(正常斷開),無數據(一旦recv就報錯) try: data = r.recv(1024) except Exception as error: print("recv時捕獲到異常:%s" % error) # 清除鏈接的4個操做,這段代碼重複用了3次,應該專門寫個函數引用 # 1 從讀列表中清除,這裏其實不用判斷,可是後面的for循環裏可能會嘗試重複remove # 2 若是還有沒發出去的消息,把鏈接從寫列表中清除 # 3 關閉鏈接 # 4 若是還有沒發出去的消息,把消息隊列的對象從字典裏清除 if r in inputs: inputs.remove(r) if r in outputs: outputs.remove(r) r.close() if r in data_queue: del data_queue[r] else: # 這裏處理能recv到數據,收到數據就加入outputs列表。爲空就清除鏈接 if data: print(data.decode('utf-8')) if r not in outputs: outputs.append(r) data_queue[r].put(data.upper()) else: print("客戶端已斷開:\n", r if r in inputs: inputs.remove(r) if r in outputs: outputs.remove(r) r.close() if r in data_queue: del data_queue[r] # 雖然是上面的for循環裏添加的,可是要等到在執行一次select後纔會在writeable裏有返回值 for w in writeable: data = data_queue.get(w) # 先從字典裏取出這個隊列 if data: # 隊列存在,取數據 data = data.get() # 從隊列裏取出數據。這裏get了以後,這條消息就從隊列裏移除了 else: # 隊列不存在,這個鏈接已經被清除了,remove掉,下一個循環 if w in outputs: outputs.remove(w) continue try: w.send(data) # 發數據,注意data的數據類型 except Exception as error: print("send時捕獲到異常:%s" % error) if r in inputs: inputs.remove(r) if r in outputs: outputs.remove(r) r.close() if r in data_queue: del data_queue[r] else: if w in outputs: outputs.remove(w) # 有可能一次send不完,這裏也能夠不remove,用另一個方法 # 開頭用get_nowait無阻塞模式取隊列,捕獲到隊列爲空的異常,再remove掉這個鏈接 # 還有一個exceptional沒處理,仍是和上面同樣,再寫一個for循環 # 異常處理這裏仍是簡單粗暴把異常列表中的鏈接清除就行了 for e in exceptional: print("異常列表有返回:", e) if e in inputs: inputs.remove(e) if e in outputs: outputs.remove(e) e.close() if e in data_queue: del data_queue[e]
學了那麼多都沒用,應該也用不到,並且可能還會有別的問題。最可怕的是別人都不這麼用,有問題都找不到人解決。主要是經過select的使用來了解IO多路複用。除了select,還有poll和epoll。
epoll 更高效,可是代碼也更復雜。不過這些咱們都用不到,都太底層了。平時用用已經封裝好的模塊就行了。這裏經過學習大概瞭解一下底層是怎麼實現的。剩下的會用模塊就行了,如今至少知道模塊中是怎麼運做的了。
最後就是學一下下面的已經封裝的selectors模塊,底層都清楚了,學習使用下面的模塊已經沒有難度了。
平時直接用已經封裝好的模塊,簡單坑又少。selectors模塊,默認會用epoll,若是系統不支持,就用select,完美。
理解了前面select的機制,在使用這個模塊就簡單了。步驟都同樣。步驟都同樣,可是都封裝好了。
import selectors import socket def accept(sock, mask): """創建新鏈接並註冊""" conn, addr = sock.accept() print('接收到新的客戶端鏈接:\n', conn, addr) conn.setblocking(False) # sock裏已經設置爲False,這裏貌似沒意義,反正沒差 # 上面已經創建好鏈接了,把新鏈接註冊到sel裏,這是第二次註冊了 # 第一次註冊是註冊server接受客戶端鏈接請求的鏈接 # 這裏是鏈接創建後收發數據的鏈接,這個鏈接若是發現是活動的,調用的就是read方法了 sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): """接收數據""" data = conn.recv(1024) if data: print(data.decode('utf-8')) conn.send(data.upper()) else: print("客戶端已斷開:\n", conn) sel.unregister(conn) # 註銷這個鏈接 conn.close() def server(port): sock = socket.socket() sock.bind(('localhost', port)) sock.listen() print("監聽已經開啓") sock.setblocking(False) # 下面就是select方法,也多是epoll # 註冊你的socket,就是讓select監視,監視到有活動的鏈接,就調用accept函數 sel.register(sock, selectors.EVENT_READ, accept) # 上面尚未開始監視,只是先把select準備好 while True: events = sel.select() # 這裏就仍是監視了 # 這裏會阻塞,一旦有活動的鏈接,就會返回給events列表 for key, mask in events: callback = key.data # key.data就是sel.register裏的accpet這個函數 # 如今callback就是accpet這個函數了,下面加上括號填上參數就執行了 callback(key.fileobj, mask) # key.fileobj就是sel.register裏的sock if __name__ == '__main__': sel = selectors.DefaultSelector() # 老套路,用以前先實例化一個對象 server(10001)
仍是有那個老問題,客戶端強制斷開,服務端會報錯 "ConnectionResetError" 。
SELECT版FTP :