Python全棧【進程、線程、IO多路複用】 |
本節內容:javascript
進程 |
一、進程就是一個程序在一個數據集上的一次動態執行過程,進程是資源分配的最小單元。html
二、進程通常由程序、數據集、進程控制塊三部分組成。java
編寫的程序用來描述進程要完成哪些功能以及如何完成;
數據集則是程序在執行過程當中所須要使用的資源;
進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感知進程存在的惟一標誌。
三、線程的上一級就是進程,進程可包含不少線程,進程和線程的區別是進程間的數據不共享,多進程也能夠用來處理多任務,不過多進程很消耗資源,python
計算型的任務最好交給多進程來處理,IO密集型最好交給多線程來處理,此外進程的數量應該和cpu的核數保持一致。linux
線程 |
一、線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,由線程ID、程序計數器、寄存器集合和堆棧共同組成。
二、線程的引入減少了程序併發執行時的開銷,提升了操做系統的併發性能。
三、線程沒有本身的系統資源。
四、多任務能夠由多進程完成,也能夠由一個進程內的多線程完成,一個進程內的全部線程,共享同一塊內存python中建立線程比較簡單,導入threading模塊,下面來看一下代碼中如何建立多線程。web
建立一個新線程:編程
import time import threading def f1(i): time.sleep(1) print(i) if __name__ == '__main__': for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.start() print('start') # 主線程等待子線程完成,子線程併發執行 # 每次數字順序不一 # start # 0 # 1 # 2 # 4 # 3
主線程從上到下執行,建立5個子線程,打印出'start',而後等待子線程執行完結束,若是想讓線程要一個個依次執行完,而不是併發操做,那麼就要使用join方法。windows
import time import threading def f1(i): time.sleep(1) print(i) if __name__ == '__main__': li = [] for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.start() li.append(t) for t in li: t.join() print('start') # 每次數字順序不一 # 0 # 4 # 3 # 2 # 1 # start
上面的代碼不適用join的話,主線程會默認等待子線程結束,纔會結束,還有一種守護線程,即子線程守護主線程,主線程結束守護線程也結束。數組
import time import threading def f1(i): time.sleep(1) print(i) if __name__ == '__main__': for i in range(5): t = threading.Thread(target=f1, args=(i,)) t.setDaemon(True) #守護線程設置在start以前 t.start() print('start') # start
除此以外,本身還能夠爲線程自定義名字,經過 t = threading.Thread(target=f1, args=(i,), name='mythread{}'.format(i)) 中的name參數,除此以外,Thread還有一下一些方法緩存
t.getName() : 獲取線程的名稱
t.setName() : 設置線程的名稱
t.name : 獲取或設置線程的名稱
t.is_alive() : 判斷線程是否爲激活狀態
t.isAlive() :判斷線程是否爲激活狀態
t.isDaemon() : 判斷是否爲守護線程
線程鎖 |
死鎖示例:
import threading,time class MyThread(threading.Thread): def funcA(self): A.acquire() print(self.name,'got A',time.ctime()) time.sleep(2) B.acquire() print(self.name,'got B',time.ctime()) B.release() A.release() def funcB(self): B.acquire() print(self.name,'got A',time.ctime()) time.sleep(1) A.acquire() print(self.name,'got B',time.ctime()) A.release() B.release() def run(self): self.funcA() self.funcB() li = [] A = threading.Lock() B = threading.Lock() for i in range(5): t = MyThread() li.append(t) for j in li: j.start() for k in li: k.join() # Thread-1 got A # Thread-1 got B # Thread-1 got A # Thread-2 got A #線程阻塞,1與2都不先釋放鎖,就出現了死鎖 # 在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源, # 就會形成死鎖,由於系統判斷這部分資源都正在使用,全部這兩個線程在無外力做用下將一直等待下去。
解決方法:遞歸鎖
import threading,time class MyThread(threading.Thread): def funcA(self): r_lock.acquire() print(self.name,'got A',time.ctime()) time.sleep(2) r_lock.acquire() print(self.name,'got B',time.ctime()) r_lock.release() r_lock.release() def funcB(self): r_lock.acquire() print(self.name,'got A',time.ctime()) time.sleep(1) r_lock.acquire() print(self.name,'got B',time.ctime()) r_lock.release() r_lock.release() def run(self): self.funcA() self.funcB() li = [] r_lock = threading.RLock() for i in range(5): t = MyThread() li.append(t) for j in li: j.start() for k in li: k.join()
Lock若是屢次獲取鎖的時候會出錯,而RLock容許在同一線程中被屢次acquire,可是須要用n次的release才能真正釋放所佔用的瑣,一個線程獲取了鎖在釋放以前,其餘線程只有等待。
爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。
線程間通信Event |
Event是線程間通訊最多見的機制之一,主要用於主線程控制其餘線程的執行,主要用過wait,clear,set,這三個方法來實現的
紅綠燈示例:
import time import threading def lighter(): count = 0 while 1: if count<30: if not event.is_set(): event.set() print('\33[32;1m綠燈\33[1m') elif count<34: print('\33[33;1m黃燈\33[1m') elif count<60: event.clear() print('\33[31;1m紅燈\33[1m') else: count = 0 count+=1 time.sleep(0.2) def car(n): count =0 while 1: event.wait() print('汽車【%s】經過'%n) count+=1 time.sleep(1) event = threading.Event() l1 =threading.Thread(target=lighter) l1.start() c1 = threading.Thread(target=car,args=('奔馳',)) c1.start() # 綠燈 # 汽車【奔馳】經過 # 綠燈 # 綠燈 # 綠燈 # 綠燈 # 黃燈 # 汽車【奔馳】經過 # 黃燈 # 黃燈 # 黃燈 # 紅燈 # 紅燈 # 紅燈
import threading,time class Boss(threading.Thread): def run(self): print("BOSS:今晚你們都要加班到22:00。") print(event.isSet()) event.set() time.sleep(5) print("BOSS:<22:00>能夠下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__=="__main__": event=threading.Event() threads=[] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join()
信號量(semaphore) |
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)
BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。
import threading,time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(5) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(5) thrs=[] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
隊列 |
隊列的方法:
q = queue.Queue(maxsize=0) # 構造一個先進顯出隊列,maxsize指定隊列長度,爲0時,表示隊列長度無限制。 q.join() # 等到隊列爲kong的時候,在執行別的操做 q.qsize() # 返回隊列的大小 (不可靠) q.empty() # 當隊列爲空的時候,返回True 不然返回False (不可靠) q.full() # 當隊列滿的時候,返回True,不然返回False (不可靠) q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,參數block默認爲True,表示當隊列滿時,會等待 # 爲False時爲非阻塞,此時若是隊列已滿,會引起queue.Full 異常。 可選參數timeout,表示會阻塞設置的時間, # 若是在阻塞時間裏 隊列仍是沒法放入,則引起 queue.Full 異常 q.get(block=True, timeout=None) # 移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞 # 阻塞的話若此時隊列爲空,則引起queue.Empty異常。 可選參數timeout,表示會阻塞設置的時間, q.get_nowait() # 等效於 get(item,block=False)
隊列的三種進出模式:
import queue q= queue.Queue(3) q.put(12) q.put('hello') q.put({'name':'alex'}) while 1: data = q.get() print(data) print('================') ###################################################### import queue q= queue.LifoQueue(3) q.put(12) q.put('hello') q.put({'name':'alex'}) q.qsize() while 1: data = q.get() print(data) print('================') ####################################################### import queue q= queue.PriorityQueue() q.put([2,12]) q.put([1,'hello']) q.put([3,{'name':'alex'}]) q.qsize() while 1: data = q.get() print(data[1]) print('================')
生產者消費者模型:
def producer(num): for i in range(num): q.put(i) print('將{}添加到隊列中'.format(i)) time.sleep(1) def consumer(num): count = 0 while count < num: i = q.get() print('將{}從隊列取出'.format(i)) time.sleep(2) count += 1 q = queue.Queue(10) t1 = threading.Thread(target=producer, args=(10,)) t1.start() t2 = threading.Thread(target=consumer, args=(10,)) t2.start()
進程與線程的區別 |
一個程序至少有一個進程,一個進程至少有一個線程.(進程能夠理解成線程的容器)
進程在執行過程當中擁有獨立的內存單元,而多個線程共享內存,從而極大地提升了程序的運行效率。
線程在執行過程當中與進程仍是有區別的。每一個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。
可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位.
線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程本身基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧)可是它可與同屬一個進程的其餘的線程共享進程所擁有的所有資源.
一個線程能夠建立和撤銷另外一個線程;同一個進程中的多個線程之間能夠併發執行.
協程 |
協程,又稱微線程,協程執行看起來有點像多線程,可是事實上協程就是隻有一個線程,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯,此外由於只有一個線程,不須要多線程的鎖機制,也不存在同時寫變量衝突。協程的適用場景:當程序中存在大量不須要CPU的操做時(IO)下面來看一個利用協程例子
import time import queue def consumer(name): print("--->ready to eat baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while 1: time.sleep(1) print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) ) con.send(n) con2.send(n+1) n +=2 if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
import gevent import requests,time start=time.time() def f(url): print('GET: %s' % url) resp =requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://www.baidu.com/'), gevent.spawn(f, 'https://www.sina.com.cn/'), ]) # f('https://www.python.org/') # # f('https://www.yahoo.com/') # # f('https://baidu.com/') # # f('https://www.sina.com.cn/') print("cost time:",time.time()-start)
事件驅動 |
傳統的編程是以下線性模式的:
開始--->代碼塊A--->代碼塊B--->代碼塊C--->代碼塊D--->......--->結束
每個代碼塊裏是完成各類各樣事情的代碼,但編程者知道代碼塊A,B,C,D...的執行順序,惟一可以改變這個流程的是數據。輸入不一樣的數據,根據條件語句判斷,流程或許就改成A--->C--->E...--->結束。每一次程序運行順序或許都不一樣,但它的控制流程是由輸入數據和你編寫的程序決定的。若是你知道這個程序當前的運行狀態(包括輸入數據和程序自己),那你就知道接下來甚至一直到結束它的運行流程。
對於事件驅動型程序模型,它的流程大體以下:
開始--->初始化--->等待
與上面傳統編程模式不一樣,事件驅動程序在啓動以後,就在那等待,等待什麼呢?等待被事件觸發。傳統編程下也有「等待」的時候,好比在代碼塊D中,你定義了一個input(),須要用戶輸入數據。但這與下面的等待不一樣,傳統編程的「等待」,好比input(),你做爲程序編寫者是知道或者強制用戶輸入某個東西的,或許是數字,或許是文件名稱,若是用戶輸入錯誤,你還須要提醒他,並請他從新輸入。事件驅動程序的等待則是徹底不知道,也不強制用戶輸入或者幹什麼。只要某一事件發生,那程序就會作出相應的「反應」。這些事件包括:輸入信息、鼠標、敲擊鍵盤上某個鍵還有系統內部定時器觸發。
一般,咱們寫服務器處理模型的程序時,有如下幾種模型:
(1)每收到一個請求,建立一個新的進程,來處理該請求;
(2)每收到一個請求,建立一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求
第三種就是協程、事件驅動的方式,通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式
示例:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <p onclick="fun()">點我呀</p> <script type="text/javascript"> function fun() { alert('約嗎?') } </script> </body> </html>
在UI編程中,經常要對鼠標點擊進行相應,首先如何得到鼠標點擊呢? 兩種方式:
那麼這個方式有如下幾個缺點:
目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。
另外兩種常見的編程範式是(單線程)同步以及多線程編程。
最初的問題:怎麼肯定IO操做完了切回去呢?經過回調函數
1.要理解事件驅動和程序,就須要與非事件驅動的程序進行比較。實際上,現代的程序大可能是事件驅動的,好比多線程的程序,確定是事件驅動的。早期則存在許多非事件驅動的程序,這樣的程序,在須要等待某個條件觸發時,會不斷地檢查這個條件,直到條件知足,這是很浪費cpu時間的。而事件驅動的程序,則有機會釋放cpu從而進入睡眠態(注意是有機會,固然程序也可自行決定不釋放cpu),當事件觸發時被操做系統喚醒,這樣就能更加有效地使用cpu. 2.再說什麼是事件驅動的程序。一個典型的事件驅動的程序,就是一個死循環,並以一個線程的形式存在,這個死循環包括兩個部分,第一個部分是按照必定的條件接收並選擇一個要處理的事件,第二個部分就是事件的處理過程。程序的執行過程就是選擇事件和處理事件,而當沒有任何事件觸發時,程序會因查詢事件隊列失敗而進入睡眠狀態,從而釋放cpu。 3.事件驅動的程序,一定會直接或者間接擁有一個事件隊列,用於存儲未能及時處理的事件。 4.事件驅動的程序的行爲,徹底受外部輸入的事件控制,因此,事件驅動的系統中,存在大量這種程序,並以事件做爲主要的通訊方式。 5.事件驅動的程序,還有一個最大的好處,就是能夠按照必定的順序處理隊列中的事件,而這個順序則是由事件的觸發順序決定的,這一特性每每被用於保證某些過程的原子化。 6.目前windows,linux,nucleus,vxworks都是事件驅動的,只有一些單片機多是非事件驅動的。
注意,事件驅動的監聽事件是由操做系統調用的cpu來完成的
IO多路複用 |
基礎:
1.用戶空間和內核空間(用戶態和內核態)
內核態:就是系統的最高指令集,控制權限由cpu控制
用戶態:最高指令爲系統,用戶操做控制系統,再又系統去執行操做內核態。
2. 進程切換
爲了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復之前掛起的某個進程的執行。這種行爲被稱爲進程切換,這種切換是由操做系統來完成的。所以能夠說,任何進程都是在操做系統內核的支持下運行的,是與內核緊密相關的。
從一個進程的運行轉到另外一個進程上運行,這個過程當中通過下面這些變化:
保存處理機上下文,包括程序計數器和其餘寄存器。
更新PCB信息。
把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列。
選擇另外一個進程執行,並更新其PCB。
更新內存管理的數據結構。
恢復處理機上下文。
注:總而言之就是很耗資源的
3.進程的阻塞
正在執行的進程,因爲期待的某些事件未發生,如請求系統資源失敗、等待某種操做的完成、新數據還沒有到達或無新工做作等,則由系統自動執行阻塞原語(Block),使本身由運行狀態變爲阻塞狀態。可見,進程的阻塞是進程自身的一種主動行爲,也所以只有處於運行態的進程(得到CPU),纔可能將其轉爲阻塞狀態。當進程進入阻塞狀態,是不佔用CPU資源的。
4.文件描述符
文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。
文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核爲每個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者建立一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫每每會圍繞着文件描述符展開。可是文件描述符這一律念每每只適用於UNIX、Linux這樣的操做系統。
5. 緩存 I/O
緩存 I/O 又被稱做標準 I/O,大多數文件系統的默認 I/O 操做都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操做系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。用戶空間無法直接訪問內核空間的,內核態到用戶態的數據拷貝
思考:爲何數據必定要先到內核區,直接到用戶內存不是更直接嗎?
緩存 I/O 的缺點:
數據在傳輸過程當中須要在應用程序地址空間和內核進行屢次數據拷貝操做,這些數據拷貝操做所帶來的 CPU 以及內存開銷是很是大的。
I/O
對於一個network IO (這裏咱們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另外一個就是系統內核(kernel)。當一個read操做發生時,它會經歷兩個階段:
一、 等待數據準備 (Waiting for the data to be ready)
二、 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)
記住這兩點很重要,由於這些IO Model的區別就是在兩個階段上各有不一樣的狀況。
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對於network io來講,不少時候數據在一開始尚未到達(好比,尚未收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來。
因此,blocking IO的特色就是在IO執行的兩個階段都被block了。
從圖中能夠看出,當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回。
因此,用戶進程實際上是須要不斷的主動詢問kernel數據好了沒有。
注意:
在網絡IO時候,非阻塞IO也會進行recvform系統調用,檢查數據是否準備好,與阻塞IO不同,」非阻塞將大的整片時間的阻塞分紅N多的小的阻塞, 因此進程不斷地有機會 ‘被’ CPU光顧」。
即每次recvform系統調用之間,cpu的權限還在進程手中,這段時間是能夠作其餘事情的。
也就是說非阻塞的recvform系統調用調用以後,進程並無被阻塞,內核立刻返回給進程,若是數據還沒準備好,此時會返回一個error。進程在返回以後,能夠乾點別的事情,而後再發起recvform系統調用。重複上面的過程,循環往復的進行recvform系統調用。這個過程一般被稱之爲輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。
須要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。
優勢:可以在等待任務完成的時間裏幹其餘活了(包括提交其餘任務,也就是 「後臺」 能夠有多個任務在同時執行)。
缺點:任務完成的響應延遲增大了,由於每過一段時間纔去輪詢一次read操做,而任務可能在兩次輪詢之間的任意時間完成。這會致使總體數據吞吐量的下降。
當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。
這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。
可是,用select的優點在於它能夠同時處理多個connection。(多說一句。因此,若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。)
在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
注意1:select函數返回結果中若是有文件可讀了,那麼進程就能夠經過調用accept()或recv()來讓kernel將位於內核中準備到的數據copy到用戶區。
注意2: select的優點在於能夠處理多個鏈接,不適用於單個鏈接
I/O多路複用指經過一種機制,能夠監視多個描述符,一旦某個描述符就緒(通常是讀就緒或者寫就緒),可以通知程序進行相應的讀寫操做。下面看一下 select,poll,epoll的介紹
select select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。 select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。 select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。 另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。 poll poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。 poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。 另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。 epoll 直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。 epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。 epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。 另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
windows下只支持select,示例:
#*************************server.py import socket import select sk=socket.socket() sk.bind(("127.0.0.1",9904)) sk.listen(5) while True: r,w,e=select.select([sk,],[],[],5) for i in r: # conn,add=i.accept() #print(conn) print("hello") print('>>>>>>') #*************************client.py import socket sk=socket.socket() sk.connect(("127.0.0.1",9904)) while 1: inp=input(">>").strip() sk.send(inp.encode("utf8")) data=sk.recv(1024) print(data.decode("utf8"))
請思考:爲何不調用accept,會反覆print?
Answer:由於select是水平觸發
select實現併發聊天示例:
#***********************server.py import socket import select sk=socket.socket() sk.bind(("127.0.0.1",8801)) sk.listen(5) inputs=[sk,] while True: r,w,e=select.select(inputs,[],[],5) print(len(r)) for obj in r: if obj==sk: conn,add=obj.accept() print(conn) inputs.append(conn) else: data_byte=obj.recv(1024) print(str(data_byte,'utf8')) inp=input('回答%s號客戶>>>'%inputs.index(obj)) obj.sendall(bytes(inp,'utf8')) print('>>',r) #***********************client.py import socket sk=socket.socket() sk.connect(('127.0.0.1',8801)) while True: inp=input(">>>>") sk.sendall(bytes(inp,"utf8")) data=sk.recv(1024) print(str(data,'utf8'))
此處的Socket服務端相比與原生的Socket,他支持當某一個請求再也不發送數據時,服務器端不會等待而是能夠去處理其餘請求的數據。可是,若是每一個請求的耗時比較長時,select版本的服務器端也沒法完成同時操做。並且select,實現的是一個僞併發。
selectors |
selectors經過單線程實現併發,示例:
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # 接收連接 print('accepted', conn, 'from', addr) conn.setblocking(False)#設置爲非阻塞 sel.register(conn, selectors.EVENT_READ, read)#註冊連接 def read(conn, mask): data = conn.recv(1000) # 接受消息 if data: print('echoing', repr(data), 'to', conn) conn.send(data) # 返回消息 else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 1234)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)
selectors實現簡單FTP文件上傳下載(多用戶同時上傳下載):
import os,socket,selectors BASE_DIR = os.path.dirname(os.path.abspath(__file__)) class FtpServer: def __init__(self): self.dic = {} # 建立狀態字典 self.sel = selectors.DefaultSelector() #建立selectors對象 self.sock() self.main() def sock(self): '''建立socket對象''' s = socket.socket() s.bind(('127.0.0.1',8090)) s.listen(10) s.setblocking(False) self.sel.register(s, selectors.EVENT_READ, self.accept) print('服務端已開啓') def main(self): '''監聽主函數''' while True: events = self.sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) def accept(self,sock, mask): '''接收函數''' conn, addr = sock.accept() conn.setblocking(False) self.sel.register(conn, selectors.EVENT_READ, self.read) self.dic[conn] = {} def read(self, conn, mask): try: if not self.dic[conn] : data = conn.recv(1024).decode() cmd,filename,filesize = data.split() self.dic[conn]={'cmd': cmd, 'filename': filename,'filesize': int(filesize)} if cmd == 'put': conn.send('100'.encode()) if cmd == 'get': file = os.path.join(BASE_DIR,'download',filename) if os.path.exists(file): fileSize = os.path.getsize(file) info = '%s %s'%('200',fileSize) #文件存在 conn.send(info.encode()) else: info = '%s %s'%('201',0) #文件不存在 conn.send(info.encode()) else: if self.dic[conn].get('cmd',None): cmd=self.dic[conn].get('cmd') if hasattr(self, cmd): func = getattr(self,cmd) func(conn) else: print('指令錯誤') conn.close() else: print('指令錯誤') conn.close() except Exception as e: print(e) self.sel.unregister(conn) conn.close() def put(self, conn): '''上傳''' self.have_rec = 0 fileName = self.dic[conn]['filename'] fileSize = self.dic[conn]['filesize'] file = os.path.join(BASE_DIR,'upload',fileName) recv_data = conn.recv(1024) self.have_rec += len(recv_data) with open(file, 'ab') as f: f.write(recv_data) if fileSize == self.have_rec: if conn in self.dic.keys(): self.dic[conn] = {} #置空字典 def get(self,conn): '''下載''' filename = self.dic[conn]['filename'] path = os.path.join(BASE_DIR,'download',filename) if conn.recv(1024).decode() == '300': with open(path, 'rb') as f: for line in f: conn.send(line) self.dic[conn] = {} #置空字典 if __name__ == '__main__': FtpServer()
import os,sys,socket,time BASE_DIR = os.path.dirname(os.path.abspath(__file__)) class FtpClient: def __init__(self): self.port=('127.0.0.1',8090) self.sock() self.interact() def sock(self): '''建立socket對象函數''' try: self.sk = socket.socket() self.sk.connect(self.port) print('鏈接FTP服務器成功!') except Exception as e: print(e) def interact(self): '''客戶端與服務端交互函數''' while True: cmd = input('>>>').strip() if cmd == 'q': break cmd,file = cmd.split() if hasattr(self, cmd): func = getattr(self, cmd) func(cmd,file) else: print('輸入命令錯誤!') def put(self,cmd,file): if os.path.isfile(file): fileName= os.path.basename(file) fileSize = os.path.getsize(file) fileInfo ='%s %s %s'%(cmd,fileName,fileSize) self.sk.send(fileInfo.encode()) recv_mes = self.sk.recv(1024).decode() # print('recvMes', recv_mes) have_send = 0 if recv_mes == '100': with open(file, 'rb') as f: while fileSize > have_send : data = f.read(1024) self.sk.send(data) have_send += len(data) self.show_process(have_send, fileSize) sys.stdout.write('\n') print('%s文件上傳成功!' % fileName) else: print('文件不存在') def get(self, cmd,file): info = '%s %s %s'%(cmd,file,'0') self.sk.send(info.encode()) fileInfo = self.sk.recv(1024).decode() fileMes, fileSize = fileInfo.split() fileSize=int(fileSize) if fileMes == '200': #文件存在 self.sk.send('300'.encode()) path = os.path.join(BASE_DIR,file) have_recv = 0 with open(path, 'wb') as f: while have_recv < int(fileSize): data = self.sk.recv(1024) have_recv += len(data) f.write(data) self.show_process(have_recv, fileSize) sys.stdout.write('\n') print('%s下載完成!' % file) else: print("文件不存在!") def show_process(self,have_send,file_size): '''顯示進度條''' k = int((have_send / file_size * 100)) space = (100 - k) * ' ' flag = k * '>' sys.stdout.write('\r|%s| [%s%%]' % ((flag + space), k)) sys.stdout.flush() time.sleep(0.2) if __name__ == '__main__': FtpClient()
socketserver |
ThreadingTCPServer實現的Soket服務器內部會爲每一個client建立一個 「線程」,該線程用來和客戶端進行交互。首先來看一下繼承關係圖
socketserver搭建:
import SocketServer class MyServer(SocketServer.BaseRequestHandler): def handle(self): pass if __name__ == '__main__': server = SocketServer.ThreadingTCPServer(('127.0.0.1',8090), MyServer) server.serve_forever()
上述代碼的內部調用流程爲:
實戰:socketserver搭建實現FTP(實現用戶註冊登陸、斷點續傳、簡單命令)
部分主要代碼:
import os import pickle,configparser,time import subprocess import socketserver from socket import * from FtpServer.conf import settings class MyServer(socketserver.BaseRequestHandler): # def __init__(self): # pass def handle(self): while True: try: data = self.request.recv(1024).decode() if '|' in data: cmd,argv= data.split('|') else: cmd = data argv = None self.process(cmd, argv) # process處理接收的命令 except Exception as e: print(e) break def post(self, argv=None): argv = eval(argv) file_info = pickle.loads(argv) # 獲取客戶端傳來的消息 file_name = file_info['file_name'] file_size = int(file_info['file_size']) file_path = os.path.join(settings.USER_HOME, self.user, 'upload', file_name) have_down = 0 # 已經上傳的位置 if os.path.isfile(file_path): self.request.sendall('exist'.encode()) ret = self.request.recv(1024).decode() if ret == 'Y': # 續傳 have_send = os.stat(file_path).st_size # 獲取已經上傳文件的大小 self.request.sendall(str(have_send).encode()) if have_send ==file_size: return else: f = open(file_path, 'ab') # 續傳以a模式打開文件, else: f = open(file_path, 'wb') # 不續傳以w模式打開, else: self.request.sendall('N'.encode()) # 直接上傳 f = open(file_path, 'wb') while True: if have_down == file_size: # 一旦接受到的內容等於文件大小,直接退出循環 break try: ret = self.request.recv(1024) except Exception as e: break f.write(ret) have_down += len(ret) def process(self, cmd, argv=None): '''使用反射處理客戶端傳過來的命令''' if hasattr(self, cmd): func = getattr(self, cmd) func(argv) else: if cmd.startswith('cd'): # 處理cd命令 argv = cmd.split(' ')[1] if argv =='..': self.request.sendall((os.path.join(settings.USER_HOME)).encode()) os.chdir(os.path.join(settings.USER_HOME)) return else: self.request.sendall(argv.encode()) os.chdir(argv) return elif cmd.startswith('ls'): #處理ls命令 i = pickle.dumps(os.listdir()) self.request.sendall(i) return elif cmd.startswith('login'): user,pwd = argv.split(':') if self.checklogin(user,pwd): self.user =user return elif cmd.startswith('register'): user,pwd = argv.split(':') if self.checkregister(user, pwd): self.request.sendall('ok'.encode()) else: try: data = subprocess.getoutput(cmd) # subprocess處理其餘命令 self.request.sendall(data.encode()) return except Exception as e: print(e) def checkregister(self,username,password): '''校驗註冊信息''' while True: config = configparser.ConfigParser() config.read(settings.USER_INFO) if username in config.sections(): self.request.sendall('exist'.encode) else: config.add_section(username) config.set(username,'password',password) config.set(username, 'space_size', '5000000') config.set(username, 'use_size', '0') config.write(open(settings.USER_INFO,'w')) os.mkdir(os.path.join(settings.USER_HOME, username)) os.mkdir(os.path.join(settings.USER_HOME, username,'download')) os.mkdir(os.path.join(settings.USER_HOME, username, 'upload')) return 1 def checklogin(self,username,password): '''校驗登陸數據是否一致''' while True: config = configparser.ConfigParser() config.read(settings.USER_INFO) if username in config.sections(): pwd = config[username]['password'] if pwd == password: self.request.sendall('100'.encode()) print(username,'客戶端驗證經過!') self.request.sendall((os.path.join(settings.USER_HOME,username)).encode()) #家目錄路徑返回給客戶端 os.chdir(os.path.join(settings.USER_HOME,username)) #cd到客戶家目錄下 return 1 else: self.request.sendall('101'.encode()) break else: self.request.sendall('101'.encode()) def get(self,argv =None): '''接收客戶端發來的數據''' file_name = argv file_path = os.path.join(settings.USER_HOME, self.user, 'download', file_name) if os.path.exists(file_path): file_size = os.stat(file_path).st_size self.request.sendall('ok'.encode()) file_info = { 'file_name': argv, 'file_size': file_size, } info = pickle.dumps(file_info) self.request.sendall(info) ret = self.request.recv(1024).decode() have_down = 0 if ret == 'exist': # 返回值 # res = (self.request.recv(1024)).encode() have_down = int((self.request.recv(1024)).decode()) # 接收服務端已下載文件大小 if have_down ==file_size: return with open(file_path, 'rb') as f: f.seek(have_down) while True: if have_down ==file_size: break else: data =f.read(1024) self.request.sendall(data) have_down+=len(data) return else: self.request.sendall('no'.encode()) return @classmethod def start(cls): '''啓動服務器''' print('\33[32;1m服務器已啓動!\33[1m') server = socketserver.ThreadingTCPServer((settings.IP, settings.PORT), MyServer) server.serve_forever() if __name__=='__main__': MyServer.start()
import sys,os,hashlib,pickle,time from socket import * from FtpClient.conf import settings class MyClient: def __init__(self): self.addr = (settings.IP,settings.PORT) self.start() self.c_d = '' #用戶當前路徑 def post(self,argv = None): #傳入文件名稱 '''上傳文件''' if len(argv) ==0: print('請輸入上傳文件名稱!') return print('上傳前請確保文件在用戶的upload文件夾下!') file_path = os.path.join(settings.USER_HOME,self.user,'upload',argv) if os.path.exists(file_path): #判斷文件是否存在 file_size = os.stat(file_path).st_size file_info = { 'file_name': argv, 'file_size': file_size, } info = pickle.dumps(file_info) self.socket.sendall(('post|%s'%info).encode()) # 將上傳的文件信息做爲參數發給服務端 ret = self.socket.recv(1024).decode() have_send = 0 if ret =='exist': choice = input('文件已存在,是否續傳?(Y/N)').strip() if choice.upper() =='Y': self.socket.sendall('Y'.encode()) current_size= int(self.socket.recv(1024).decode()) #接收服務端已存文件大小 if current_size ==file_size: print('文件完整不需從新上傳') return else: have_send =current_size else: self.socket.sendall('N'.encode()) with open(file_path,'rb') as f: f.seek(have_send) for line in f: self.socket.sendall(line) have_send += len(line) self.show_process(have_send,file_size) sys.stdout.write('\n') def show_process(self,have_send,file_size): '''顯示進度條''' k = int((have_send / file_size * 100)) space = (100 - k) * ' ' flag = k * '>' sys.stdout.write('\r|%s| [%s%%]' % ((flag + space), k)) sys.stdout.flush() time.sleep(0.2) def get(self,argv =None): '''下載文件''' if len(argv) == 0: print('請輸入下載文件名稱!') return print('下載前請確保服務端download下有該文件!') self.socket.sendall(('get|%s' % argv).encode()) # 將下載的文件名做爲參數發給服務端 ret = self.socket.recv(1024).decode() have_down = 0 if ret=='ok': #服務端存在文件,可下載 file_info = pickle.loads(self.socket.recv(1024)) # 獲取服務端傳來的消息 file_name = file_info['file_name'] file_size = int(file_info['file_size']) file_path = os.path.join(settings.USER_HOME, self.user, 'download', file_name) if os.path.exists(file_path): # 客戶端存在文件 self.socket.sendall('exist'.encode()) choice = input('文件已存在,是否繼續下載?(Y/N)').strip() if choice.upper() == 'Y': have_down = os.stat(file_path).st_size # 獲取已經下載文件的大小 self.socket.sendall((str(have_down).encode())) if have_down ==file_size: print('文件完整不需從新下載!') return f = open(file_path, 'ab') # 續傳以a模式打開文件 else: self.socket.sendall((str(0).encode())) f = open(file_path, 'wb') # 不續傳以w模式打開 else: self.socket.sendall('reload'.encode()) # 從新下載 f = open(file_path, 'wb') while True: if have_down == file_size: break try: ret = self.socket.recv(1024) except Exception as e: break f.write(ret) have_down += len(ret) self.show_process(have_down, file_size) sys.stdout.write('\n') else: print('所下載文件不存在') return def register(self): '''註冊用戶''' while True: user = input('請輸入用戶名:').strip() if len(user) ==0:continue password = input('請輸入密碼:').strip() if len(password) ==0:continue pd =hashlib.md5() pd.update(password.encode()) pwd = pd.hexdigest() #加密後的信息 # pwd = password self.socket.sendall(('register|%s:%s'%(user, pwd)).encode())# 發送加密後的帳戶信息 ret = self.socket.recv(1024).decode() if ret =='ok': print('註冊成功,請登陸!') os.mkdir(os.path.join(settings.USER_HOME, user)) os.mkdir(os.path.join(settings.USER_HOME, user,'download')) os.mkdir(os.path.join(settings.USER_HOME, user, 'upload')) return 1 else: print('註冊用戶名已存在!') def login(self): '''客戶端用戶登陸''' try_times = 0 while try_times < 3: user = input('請輸入用戶名:') self.user = user if len(user) == 0: continue password = input('請輸入用密碼:') if len(password) == 0: continue pd =hashlib.md5() pd.update(password.encode()) pwd = pd.hexdigest() #加密後的信息 # pwd = password self.socket.sendall('login|{}:{}'.format(user, pwd).encode()) # 發送加密後的帳戶信息 ret = self.socket.recv(1024).decode() if ret == '100': print('登錄成功!') self.c_d = self.socket.recv(1024).decode() return 1 else: print('用戶或密碼錯誤!') try_times += 1 sys.exit('嘗試太屢次!') def interact(self): '''客戶端與服務端交互''' print('服務器鏈接成功!') while True: choice = input("[%s]:"%self.c_d).strip() if len(choice) == 0:continue if '|' in choice: cmd,argv = choice.split('|') else: cmd = choice argv = None if hasattr(self,'%s'%cmd): func = getattr(self,'%s'%cmd) func(argv) elif cmd.startswith('cd'): self.socket.sendall(choice.encode()) self.c_d = self.socket.recv(1024).decode() continue elif cmd.startswith('ls'): self.socket.sendall(choice.encode()) ret = pickle.loads(self.socket.recv(1024)) for i in ret: print(i) continue else: self.socket.sendall(choice.encode()) ret = self.socket.recv(1024) print(ret.decode()) def start(self): '''啓動主函數''' self.socket = socket(AF_INET,SOCK_STREAM) try: self.socket.connect(self.addr) except Exception as e: sys.exit("鏈接服務端錯誤:%s" % e) while True: msg = '''1.註冊FTP\n2.登陸FTP\n3.退出''' print('\33[32;1m歡迎登陸MyFtpClient!\33[1m') print(msg) choice = input('>>>:').strip() if choice =='1': if self.register(): if self.login(): self.interact() elif choice =='2': if self.login(): self.interact() elif choice =='3': sys.exit() else:print('選項錯誤,請從新輸入!') if __name__=='__main__': client = MyClient() # client.interact()