須要弄清楚的問題:html
在什麼問題下用什麼鎖node
線程進程協程什麼狀況下用python
select,poll,epoll 各類狀況下怎麼用mysql
本章按照如下內容進行:linux
操做系統優先級----------------併發並行---------------進程------------進程池----------進程通訊-----------線程-------------GIL鎖-------------同步鎖-----------協程----------io驅動模型io複用----------select ---------Twisted------------subprocess補充git
進程存在的標識,在Linux系統中是task_struct,task_struct在內核棧(Linux進程氛圍用戶棧和內核棧)的尾端分配。程序員
從低地址到高地址:github
SP段:堆棧指針web
PC段:程序計數器算法
進程是擁有資源的基本單位,進程的地址空間相互獨立;
線程是獨立調度的基本單位,共享同一個進程內的資源(線程有本身的棧),減小了程序併發時所付出的時空開銷,而且能夠高效的共享數據,有效地利用多處理器和多核計算機,提升os的併發度。
一個進程異常退出不會引發另外的進程運行異常;可是線程若異常退出通常是會引發整個進程奔潰。
建立/撤銷/切換 進程的開銷遠大於線程的(建立線程比建立進程快10~100倍 UNPv2/P406)。
孤兒進程 —— 父進程退出,而它的一個或多個子進程還在運行,那麼那些子進程將成爲孤兒進程。孤兒進程將被init進程(進程號爲1)所收養,並由init進程對它們完成狀態收集工做。
殭屍進程 —— 一個進程使用fork建立子進程,若是子進程退出,而父進程並無調用wait或waitpid獲取子進程的狀態信息,那麼子進程的進程描述符仍然保存在系統中。
守護進程 —— 守護進程的父進程是init進程,由於它真正的父進程在fork出子進程後就先於子進程exit退出了,因此它是一個由init繼承的孤兒進程。不須要用戶輸入就能運行並且提供某種服務,不是對整個系統就是對某個用戶程序提供服務。常見的守護進程包括系統日誌進程syslogd、 web服務器httpd、郵件服務器sendmail和數據庫服務器mysqld等。
最大進程數受如下3方面限制:
UNP 分了如下幾中形式的IPC:
進程間通訊方式:匿名管道,有名管道,消息隊列,共享內存,信號量,套接字,域套接字,信號
線程同步方式:互斥量,條件變量,讀寫鎖
半雙工的(即數據只能在一個方向上流動),具備固定的讀端和寫端;是一種特殊的文件(pipefs,掛載在內核中),有固定大小,只存在於內存中;
實現原理:
管道是由內核管理的一個緩衝區,被設計成爲環形的數據結構,以便管道能夠被循環利用。當管道中沒有信息的話,從管道中讀取的進程會等待,直到另外一端的進程放入信息。當管道被放滿信息的時候,嘗試放入信息的進程會等待,直到另外一端的進程取出信息。當兩個進程都終結的時候,管道也自動消失。
在 Linux 中,管道的實現藉助了文件系統的file結構和VFS的索引節點inode。經過將兩個file結構指向同一個臨時的VFS索引節點,而這個VFS索引節點又指向一個物理頁面而實現的。
內核會利用必定的機制同步對管道的訪問。
半雙工,可在無關進程使用;FIFO有路徑名與之相關聯,以一種特殊設備文件形式存在於文件系統中
實現原理:
Linux中設立了一個專門的特殊文件系統--管道文件,FIFO在文件系統中有對應的路徑。當一個進程以讀(r)的方式打開該文件,而另外一個進程以寫(w)的方式打開該文件,那麼內核就會在這兩個進程之間創建管道,雖然FIFO在VFS的目錄樹下可見,可是它並不對應disk上的文件。
本質上是一個先進先出的隊列數據結構,最先放入的數據被最早讀出來,從而保證信息交流的順序。FIFO只是借用了文件系統來爲管道命名。當刪除FIFO文件時,管道鏈接也隨之消失。當進程終止時,管道內的數據會被刪除。
共享內存是最快的:
一般往管道、FIFO或消息隊列寫入數據時,這些IPC須要將數據從進程複製到內核,一般總共須要複製4次,而共享內存則只拷貝2次數據;如圖:
用於通知接收進程,有某種事件發生。
信號是在軟件層次上對中斷機制的一種模擬,在原理上,一個進程收到一個信號與處理器收到一箇中斷請求能夠說是同樣的。信號是異步的,一個進程沒必要經過任何操做來等待信號的到達。
加鎖原語,排他性訪問共享數據,用於保護臨界區。可細分爲遞歸鎖/非遞歸鎖。
若是存在某個線程依然使用原先的程序
(即不嘗試得到mutex,而直接修改共享變量),互斥鎖不能阻止其修改。因此,互斥鎖機制須要程序員本身來寫出完善的程序來實現互斥鎖的功能(如下鎖 同樣)。
互斥鎖用於上鎖,條件變量用於等待,條件變量的使用是與互斥鎖共通使用的。
條件變量學名叫管程(monitor)【From muduo P40】。
讀寫鎖也叫作 共享-獨佔鎖,容許更高的併發度。
互斥量要麼是鎖住狀態,要麼是不加鎖狀態,並且一次只有一個線程對其加鎖。
讀寫鎖能夠有三種狀態:讀模式下加鎖狀態,寫模式下加鎖狀態,不加鎖狀態。一次只有一個線程能夠佔有寫模式的讀寫鎖,可是多個線程可用同時佔有讀模式的讀寫鎖。
讀寫鎖能夠經過使用互斥鎖和條件變量來實現。
在獲取鎖以前一直處於忙等(自旋)阻塞狀態;經常使用於
鎖被持有的時間短,且線程並不但願在從新調度上花費太多成本。當線程自旋等待鎖變爲可用時,CPU不能作其餘事情。
故而自旋鎖常做爲底層原語,用於實現其餘類型的鎖。
記錄鎖是讀寫鎖的一種擴展類型,可用於親緣關係或無親緣關係的進程之間共享某個文件的讀與寫。被鎖住的文件經過文件描述符進行訪問,執行上鎖的操做函數是fcntl,這種類型的鎖一般在內核中維護。
記錄鎖的功能是:一個進程正在讀或修改文件的某個部分時,能夠阻止其餘進程修改同一文件區,即其鎖定的是文件的一個區域或整個文件。
記錄鎖有兩種類型:共享讀鎖,獨佔寫鎖。基本規則是:多個進程在一個給定的字節上能夠有一把共享的讀鎖,但在一個給定字節上的寫鎖只能有一個進程獨用。即:若是在一個給定的字節上已經有一把讀或多把讀鎖,則不能在該字節上再加寫鎖;若是在一個字節上已經有一把獨佔性的寫鎖,則不能再對它加任何讀鎖。
處理死鎖的策略:
通常來講,打破循環使用資源最容易,即順序加減鎖
銀行家算法(死鎖避免算法)。在資源動態分配過程當中,防止系統進入不安全狀態,以免發生死鎖。
數據庫中會用到等待圖進行死鎖檢測
在Linux下,信號量和線程互斥鎖的實現都是經過futex系統調用。
由fork建立的新進程被稱爲子進程(child process)。該函數被調用一次,但返回兩次。兩次返回的區別是子進程的返回值是0,而父進程的返回值則是新進程(子進程)的進程 id。將子進程id返回給父進程的理由是:由於一個進程的子進程能夠多於一個,沒有一個函數使一個進程能夠得到其全部子進程的進程id。對子進程來講,之因此fork返回0給它,是由於它隨時能夠調用getpid()來獲取本身的pid;也能夠調用getppid()來獲取父進程的id。(進程id 0老是由交換進程使用,因此一個子進程的進程id不可能爲0 )。
fork以後,操做系統會複製一個與父進程徹底相同的子進程,雖然說是父子關係,可是在操做系統看來,他們更像兄弟關係,這2個進程共享代碼空間,可是數據空間是互相獨立的,子進程數據空間中的內容是父進程的完整拷貝,指令指針也徹底相同,子進程擁有父進程當前運行到的位置(兩進程的程序計數器pc值相同,也就是說,子進程是從fork返回處開始執行的),但有一點不一樣,若是fork成功,子進程中fork的返回值是0,父進程中fork的返回值是子進程的進程號,若是fork不成功,父進程會返回錯誤。
由子進程自父進程繼承到:
1.進程的資格(真實(real)/有效(effective)/已保存(saved)用戶號(UIDs)和組號(GIDs))
2.環境(environment)
3.堆棧
4.內存
5.打開文件的描述符(注意對應的文件的位置由父子進程共享,這會引發含糊狀況)
6.執行時關閉(close-on-exec) 標誌 (譯者注:close-on-exec標誌可經過fnctl()對文件描述符設置,POSIX.1要求全部目錄流都必須在exec函數調用時關閉。更詳細說明,參見《UNIX環境高級編程》 W. R. Stevens, 1993, 尤晉元等譯(如下簡稱《高級編程》), 3.13節和8.9節)
7.信號(signal)控制設定
8.nice值 (譯者注:nice值由nice函數設定,該值表示進程的優先級,數值越小,優先級越高)
進程調度類別(scheduler class)(譯者注:進程調度類別指進程在系統中被調度時所屬的類別,不一樣類別有不一樣優先級,根據進程調度類別和nice值,進程調度程序可計算出每一個進程的全局優先級(Global process prority),優先級高的進程優先執行)
8.進程組號
9.對話期ID(Session ID) (譯者注:譯文取自《高級編程》,指:進程所屬的對話期(session)ID, 一個對話期包括一個或多個進程組, 更詳細說明參見《高級編程》9.5節)
10.當前工做目錄
11.根目錄 (譯者注:根目錄不必定是「/」,它可由chroot函數改變)
12.文件方式建立屏蔽字(file mode creation mask (umask))(譯者注:譯文取自《高級編程》,指:建立新文件的缺省屏蔽字)
13.資源限制
14.控制終端
子進程所獨有:
進程號
1.不一樣的父進程號(譯者注:即子進程的父進程號與父進程的父進程號不一樣, 父進程號可由getppid函數獲得)
2.本身的文件描述符和目錄流的拷貝(譯者注:目錄流由opendir函數建立,因其爲順序讀取,顧稱「目錄流」)
3.子進程不繼承父進程的進程,正文(text), 數據和其它鎖定內存(memory locks)(譯者注:鎖定內存指被鎖定的虛擬內存頁,鎖定後,4.不容許內核將其在必要時換出(page out),詳細說明參見《The GNU C Library Reference Manual》 2.2版, 1999, 3.4.2節)
5.在tms結構中的系統時間(譯者注:tms結構可由times函數得到,它保存四個數據用於記錄進程使用中央處理器 (CPU:Central Processing Unit)的時間,包括:用戶時間,系統時間, 用戶各子進程合計時間,系統各子進程合計時間)
6.資源使用(resource utilizations)設定爲0
8.阻塞信號集初始化爲空集(譯者注:原文此處不明確,譯文根據fork函數手冊頁稍作修改)
9.不繼承由timer_create函數建立的計時器
10.不繼承異步輸入和輸出
一個進程在調用exit命令完結本身的性命的時分,其實它並無真正的被燒燬,而是留下一個稱爲殭屍進程(Zombie)的數據結構(體系調用exit,它的做用是使進程退出,但也僅僅限於將一個正常的進程變成一個殭屍進程,並不能將其完整燒燬)。在Linux進程的情況中,殭屍進程是十分特別的一種,它已經廢棄了簡直一切內存空間,沒有任何可履行代碼,也不能被調度,僅僅在進程列表中保存一個地位,記錄該進程的退出情況等信息供其他進程蒐集,除此以外,殭屍進程再也不佔有任何內存空間。它需要它的父進程來爲它收屍,假如他的父進程沒安裝SIGCHLD信號處理函數調用wait或waitpid()期待子進程完結,又沒有顯式疏忽該信號,那麼它就始終維持殭屍情況,假如這時父進程完結了,那麼init進程主動會接手這個子進程,爲它收屍,它仍是能被清除的。然而假如假如父進程是一個循環,不會完結,那麼子進程就會始終維持殭屍情況,這就是爲何體系中有時會有不少的殭屍進程。
建立方式
ret = os.fork() #後面的代碼子程序和主程序都會執行,只不過寫在一個程序裏了 if ret == 0: child_suite # 子進程代碼 else: parent_suite # 父進程代碼
import os import time print "Before fork process pid=%s, ppid=%s" % (os.getpid(), os.getppid()) pid = os.fork() if pid == 0: print "I am child process pid=%s, ppid=%s" % (os.getpid(), os.getppid()) time.sleep(5) else: print "I am parent process pid=%s, ppid=%s" % (os.getpid(), os.getppid()) time.sleep(5) # 下面的內容會被打印兩次,一次是在父進程中,一次是在子進程中。 print "After fork process pid=%s, ppid=%s" % (os.getpid(), os.getppid()) #######輸出以下 Before fork process pid=3507, ppid=2952 i am parent process pid=3507, ppid=2952 i am child process pid=3509, ppid=3507 After fork process pid=3507, ppid=2952 After fork process pid=3509, ppid=3507
import multiprocessing import time import os import psutil TIME =1 def test1(args1): print("這裏是test1,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test1,個人num是%d 個人名字是是%s"%(args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test2(args1): print("這裏是test2,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test2,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test3(args1): print("這裏是test3,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test3,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) if __name__ == '__main__': start1 = time.process_time() start2 = time.time() print("這裏是主進程,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) num = 10 tlist = [] t1 = multiprocessing.Process(target= test1, args=(num,), name="test1") #name是類名字 tlist.append(t1) t2 = multiprocessing.Process(target= test2, args=(num,), name="test2") tlist.append(t2) t3 = multiprocessing.Process(target= test3, args=(num,), name="test3") tlist.append(t3) for process in tlist: process.start() for process in tlist: process.join() end1 = time.process_time() end2 = time.time() print("總共用時:", end2- start2,"秒,其中CPU計算時間爲",end1-start1,"秒") #####輸出以下 這裏是主進程,個人pid是28120 個人父進程是14632 這裏是test1,個人pid是2496 個人父進程是28120 這裏是test1,個人num是10 個人名字是是python.exe 這裏是test2,個人pid是29128 個人父進程是28120 這裏是test2,個人num是10 個人名字是是python.exe 這裏是test3,個人pid是28588 個人父進程是28120 這裏是test3,個人num是10 個人名字是是python.exe 總共用時: 1.7347896099090576 秒,其中CPU計算時間爲 0.09375 秒
procs=[Process(target=func1,args=(s,))for i in xrange(10)] 這種方式初始化更帥
經常使用方法:
import multiprocessing import time TIME = 2 def test1(args1): print("這裏是test1,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test1,個人num是%d 個人名字是是%s"%(args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) class MyProcess(multiprocessing.Process): def __init__(self, num, *args , **kwargs): super(MyProcess, self).__init__(*args, **kwargs) self.num = num def run(self): print("開始重寫run了") i = 0 while True: time.sleep(1) i += 1 print("子進程",i) if i == self.num: break if __name__ == '__main__': p = MyProcess(6, target= test1, args= (10, )) #說明target的目標函數被修改了,只執行run方法 p.start() p.join(2) while True: if p.is_alive(): print("父進程") time.sleep(1) elif not p.is_alive(): print("子進程結束了") break
#!/usr/bin/env python3 # encoding: utf-8 """ @Version: ?? @Author: Administrator 周廣露 @Contact: zhouguanglu2012@163.com @Site: http://www.baidu.com @Software: PyCharm @File: multiP @Created_Time: 2019/1/25 18:35 """ import multiprocessing import time import os import psutil TIME =1 def fun_call(): print("我這裏執行完了一個") def test1(args1): print("這裏是test1,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test1,個人num是%d 個人名字是是%s"%(args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test2(args1): print("這裏是test2,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test2,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test3(args1): print("這裏是test3,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test3,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) if __name__ == '__main__': pool = multiprocessing.Pool(2) start1 = time.process_time() start2 = time.time() print("這裏是主進程,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) num = 10 # pool.apply_async(test1, (num,)) #name是類名字 # pool.apply_async(test2, (num,)) # pool.apply_async(test3, (num,)) #先加進去,而後慢慢執行 pool.apply(test1, (num,)) pool.apply(test2, (num,)) pool.apply(test3, (num,)) #一個個執行 print("----------------------") pool.close() pool.join() end1 = time.process_time() end2 = time.time() print("總共用時:", end2- start2,"秒,其中CPU計算時間爲",end1-start1,"秒") ##########輸出結果 這裏是主進程,個人pid是25568 個人父進程是14632 這裏是test1,個人pid是7612 個人父進程是25568 這裏是test1,個人num是10 個人名字是是python.exe 這裏是test2,個人pid是21636 個人父進程是25568 這裏是test2,個人num是10 個人名字是是python.exe 這裏是test3,個人pid是7612 個人父進程是25568 這裏是test3,個人num是10 個人名字是是python.exe ---------------------- 總共用時: 3.7470672130584717 秒,其中CPU計算時間爲 0.015625 秒
#!/usr/bin/env python3 # encoding: utf-8 """ @Version: ?? @Author: Administrator 周廣露 @Contact: zhouguanglu2012@163.com @Site: http://www.baidu.com @Software: PyCharm @File: multiP @Created_Time: 2019/1/25 18:35 """ import multiprocessing import time import os import psutil TIME =1 def fun_call(): print("我這裏執行完了一個") def test1(args1): print("這裏是test1,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test1,個人num是%d 個人名字是是%s"%(args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test2(args1): print("這裏是test2,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test2,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test3(args1): print("這裏是test3,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test3,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) if __name__ == '__main__': pool = multiprocessing.Pool(2) start1 = time.process_time() start2 = time.time() print("這裏是主進程,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) num = 10 pool.apply_async(test1, (num,)) #name是類名字 pool.apply_async(test2, (num,)) pool.apply_async(test3, (num,)) #先加進去,而後慢慢執行 # pool.apply(test1, (num,)) # pool.apply(test2, (num,)) # pool.apply(test3, (num,)) #一個個執行 print("----------------------") pool.close() pool.join() end1 = time.process_time() end2 = time.time() print("總共用時:", end2- start2,"秒,其中CPU計算時間爲",end1-start1,"秒") #輸出以下: 這裏是主進程,個人pid是10768 個人父進程是14632 ---------------------- 這裏是test1,個人pid是34692 個人父進程是10768 這裏是test1,個人num是10 個人名字是是python.exe 這裏是test2,個人pid是21788 個人父進程是10768 這裏是test2,個人num是10 個人名字是是python.exe 這裏是test3,個人pid是34692 個人父進程是10768 這裏是test3,個人num是10 個人名字是是python.exe 總共用時: 2.7450788021087646 秒,其中CPU計算時間爲 0.0 秒
pool 經常使用方法:
1.apply()
函數原型:apply(func[, args=()[, kwds={}]])
該函數用於傳遞不定參數,同python中的apply函數一致,主進程會被阻塞直到函數執行結束(不建議使用,而且3.x之後不在出現)。
2.apply_async
函數原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
與apply用法一致,但它是非阻塞的且支持結果返回後進行回調。
3.map()
函數原型:map(func, iterable[, chunksize=None])
Pool類中的map方法,與內置的map函數用法行爲基本一致,它會使進程阻塞直到結果返回。 會阻塞到這一步,比map要快,多進程map
注意:雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒後,程序纔會運行子進程。
4.map_async()
函數原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,可是它是非阻塞的。其有關事項見apply_async。
5.close()
關閉進程池(pool),使其不在接受新的任務。
6.terminal()
結束工做進程,不在處理未處理的任務。即便已經正在運行的也不會運行,且回調函數也不執行
7.join()
主進程阻塞等待子進程的退出, join方法要在close或terminate以後使用。
8.回調函數callback
#!/usr/bin/env python3 # encoding: utf-8 """ @Version: ?? @Author: Administrator 周廣露 @Contact: zhouguanglu2012@163.com @Site: http://www.baidu.com @Software: PyCharm @File: multiP @Created_Time: 2019/1/25 18:35 """ import multiprocessing import time import os import psutil TIME =1 def fun_call(msg): print("我這裏執行完了一個",msg) def test1(args1): print("這裏是test1,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test1,個人num是%d 個人名字是是%s"%(args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) return "這是test1的返回值" def test2(args1): print("這裏是test2,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test2,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) def test3(args1): print("這裏是test3,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) print("這裏是test3,個人num是%d 個人名字是是%s" % (args1, psutil.Process(os.getpid()).name())) args1 -= 1 time.sleep(TIME) if __name__ == '__main__': pool = multiprocessing.Pool(2) start1 = time.process_time() start2 = time.time() print("這裏是主進程,個人pid是%d 個人父進程是%d"%(os.getpid(), os.getppid())) num = 10 pool.apply_async(test1, (num,),callback = fun_call) #name是類名字 pool.apply_async(test2, (num,)) pool.apply_async(test3, (num,)) #先加進去,而後慢慢執行 # pool.apply(test1, (num,)) # pool.apply(test2, (num,)) # pool.apply(test3, (num,)) #一個個執行 print("----------------------") pool.close() pool.join() end1 = time.process_time() end2 = time.time() print("總共用時:", end2- start2,"秒,其中CPU計算時間爲",end1-start1,"秒") ######輸出以下 這裏是主進程,個人pid是5532 個人父進程是14632 ---------------------- 這裏是test1,個人pid是33372 個人父進程是5532 這裏是test1,個人num是10 個人名字是是python.exe 這裏是test2,個人pid是14904 個人父進程是5532 這裏是test2,個人num是10 個人名字是是python.exe 我這裏執行完了一個 這是test1的返回值 這裏是test3,個人pid是33372 個人父進程是5532 這裏是test3,個人num是10 個人名字是是python.exe 總共用時: 2.742694854736328 秒,其中CPU計算時間爲 0.015625 秒
注意:
from multiprocessing import Pipe, Pool, Manager, Process def consumer(q,i): print("進來了c") num = 1 while True: print("這裏是消費者%d正在吃個人第%d個包子"%(i, num )) print(q[0].recv()) # time.sleep(2) num = num + 1 def producer(q,i): print("進來了p") num = 1 while True: baoziname = "%d廚師%d號包子"%(i, num) print("這裏是生產者%d正在生產個人第%d個包子"%(i, num )) # time.sleep(1) q[1].send(baoziname) num += 1 if __name__ == '__main__': pipe = Pipe(5) print(pipe) C_SIZE = 5 P_SIZE = 3 pool_c = Pool(C_SIZE) pool_p = Pool(P_SIZE) print("-------------------------開始--------------") for i in range(P_SIZE): pool_p.apply_async(producer, (pipe,i)) for i in range(C_SIZE): pool_c.apply_async(consumer,(pipe,i)) print("-------------------------完事--------------") pool_c.close() pool_p.close() pool_c.join() pool_p.join() # C_SIZE = 5 # P_SIZE = 3 # # print("-------------------------開始--------------") # clist = [] # plist = [] # for i in range(C_SIZE): # t = Process(target=producer, args=(pipe, i)) # t.start() # plist.append(t) # for i in range(P_SIZE): # t = Process(target=consumer, args=(pipe, i)) # t.start() # clist.append(t) # # for i in clist: # i.join() # # for i in plist: # i.join() # print("-------------------------完事--------------")
# named pipe Client
#encoding: utf-8
import os
import time
write_path = "/tmp/server_in.pipe"
read_path = "/tmp/server_out.pipe"
counter = 1
f = os.open( write_path, os.O_SYNC | os.O_CREAT | os.O_RDWR )
print "Client open f", f
rf = None
while True:
# Client發送請求
req = "%s "%counter
len_send = os.write( f, req )
print "request", req, len_send
counter += 1
if rf == None:
# *要點1:在這裏第一次打開read_path,實際這裏的open是一個阻塞操做
# 打開的時機很重要。若是在程序剛開始,沒發送請求就打開read_path,確定會阻塞住
rf = os.open( read_path, os.O_RDONLY )
print "client opened rf", rf
# 接收Server迴應
s = os.read( rf, 1024 )
if len(s) == 0:
# 通常來講,是管道被意外關閉了,好比Server退出了
break
print "received", s
# 這個例子裏沒有sleep,客戶端以最高速度發送數據,能夠觀察執行效果
os.close( f )
os.close(rf)
#named pipe Server
#encoding: utf-8
import os, time
read_path = "/tmp/server_in.pipe"
write_path = "/tmp/server_out.pipe"
try:
# 建立命名管道
os.mkfifo( write_path )
os.mkfifo( read_path )
except OSError, e:
# 若是命名管道已經建立過了,那麼無所謂
print "mkfifo error:", e
# 寫入和讀取的文件,正好和Client相反
rf = os.open( read_path, os.O_RDONLY )
f = os.open( write_path, os.O_SYNC | os.O_CREAT | os.O_RDWR )
while True:
# 接收請求
s = os.read( rf, 2 )
if len(s) == 0:
# 沒有收到字符,通常是惟一的發送方被關閉了。
# 這裏能夠休息一下繼續,對後續消息沒有任何影響,也不會丟包。
time.sleep( 1 )
continue
# 若是收到的字符串帶一個s,打印出來
# 用於調試和測試
if "z" in s:
print "received", s
# 在請求前面加一個s字母,返回
os.write( f, "s%s"%s )
os.close( f )
os.close( rf )
這個也沒有嘗試成功,先不弄了,一天了
確實不會了,蛋疼了,只能夠對3寫讀,4就不行,愛了我擦
import io
import os
import time
from multiprocessing import Pipe, Pool, Manager, Process, Lock
def consumer(q,i,l):
l.acquire()
r1 = os.fdopen(q, "r")
l.release()
print("進來了c")
num = 1
while True:
print("這裏是消費者%d正在吃個人第%d個包子"%(i, num ))
# r1 = open("1.txt", "r")
print(r1.read())
time.sleep(2)
num = num + 1
def producer(q,i,l):
print("進來了p")
num = 1
l.acquire()
w1 = os.fdopen(q, "w")
l.release()
while True:
baoziname = "%d廚師%d號包子"%(i, num)
print("這裏是生產者%d正在生產個人第%d個包子"%(i, num ))
w1.write(baoziname)
time.sleep(2)
num += 1
if __name__ == '__main__':
r,w = os.pipe()
l= Lock()
# C_SIZE = 5
# P_SIZE = 3
# pool_c = Pool(C_SIZE)
# pool_p = Pool(P_SIZE)
print("-------------------------開始--------------")
# for i in range(P_SIZE):
# pool_p.apply_async(producer, (w1,i))
#
# for i in range(C_SIZE):
# pool_c.apply_async(consumer,(r1,i))
t = Process(target=producer, args=(r,0,l))
t.start()
t = Process(target=consumer, args=(r,1,l))
t.start()
print("-------------------------完事--------------")
# pool_c.close()
# pool_p.close()
# pool_c.join()
# pool_p.join()
# C_SIZE = 5
# P_SIZE = 3
#
# print("-------------------------開始--------------")
# clist = []
# plist = []
# for i in range(C_SIZE):
# t = Process(target=producer, args=(pipe, i))
# t.start()
# plist.append(t)
# for i in range(P_SIZE):
# t = Process(target=consumer, args=(pipe, i))
# t.start()
# clist.append(t)
#
# for i in clist:
# i.join()
#
# for i in plist:
# i.join()
# print("-------------------------完事--------------")
import time from multiprocessing import Pool, Process, Queue, Manager def consumer(q,i): print("進來了c") num = 1 while True: print("這裏是消費者%d正在吃個人第%d個包子"%(i, num )) print(q.get()) # time.sleep(2) num = num + 1 def producer(q,i): print("進來了p") num = 1 while True: baoziname = "%d廚師%d號包子"%(i, num) print("這裏是生產者%d正在生產個人第%d個包子"%(i, num )) # time.sleep(1) q.put(baoziname) num += 1 if __name__ == '__main__': C_SIZE = 5 P_SIZE = 3 pool_c = Pool(C_SIZE) pool_p = Pool(P_SIZE) Queue q = Manager().Queue(5) #默認最多隻能存5個包子 print("-------------------------開始--------------") for i in range(P_SIZE): pool_p.apply_async(producer, (q,i)) for i in range(C_SIZE): pool_c.apply_async(consumer,(q,i)) print("-------------------------完事--------------") pool_c.close() pool_p.close() pool_c.join() pool_p.join()
import time from multiprocessing import Pool, Process, Queue, Manager def consumer(q,i): print("進來了c") num = 1 while True: print("這裏是消費者%d正在吃個人第%d個包子"%(i, num )) print(q.get()) # time.sleep(2) num = num + 1 def producer(q,i): print("進來了p") num = 1 while True: baoziname = "%d廚師%d號包子"%(i, num) print("這裏是生產者%d正在生產個人第%d個包子"%(i, num )) # time.sleep(1) q.put(baoziname) num += 1 if __name__ == '__main__': C_SIZE = 5 P_SIZE = 3 q = Queue(5) #默認最多隻能存5個包子 print(q.__dict__) print("-------------------------開始--------------") clist= [] plist =[] for i in range(C_SIZE): t = Process(target=producer, args=(q,i)) t.start() plist.append(t) for i in range(P_SIZE): t = Process(target=consumer, args=(q, i)) t.start() clist.append(t) for i in clist: i.join() for i in plist: i.join() print("-------------------------完事--------------")
雖然Queue是線程安全的,可是print不是,還須要加鎖
Queue經常使用方法:
Queue.Queue(maxsize=0) FIFO, 若是maxsize小於1就表示隊列長度無限 Queue.LifoQueue(maxsize=0) LIFO, 若是maxsize小於1就表示隊列長度無限 Queue.qsize() 返回隊列中數據的大小 Queue.empty() 若是隊列爲空,返回True,反之False Queue.full() 若是隊列滿了,返回True,反之False Queue.full 與 maxsize 大小對應 Queue.get([block[, timeout]])獲取隊列,timeout等待時間 Queue.get_nowait() 至關Queue.get(False) 非阻塞 Queue.put(item) 寫入隊列,timeout等待時間 Queue.put_nowait(item) 至關Queue.put(item, False) Queue.task_done() 在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號 Queue.join() 實際上意味着等到隊列爲空,再執行別的操做 Queue.maxsize 最大能裝在數據,能夠直接修改
共享內存:要否則把變量設置在全局變量,要否則就把變量帶入到進程裏面去
#!/usr/bin/env python3 # encoding: utf-8 """ @Version: ?? @Author: Administrator 周廣露 @Contact: zhouguanglu2012@163.com @Site: http://www.baidu.com @Software: PyCharm @File: 進程管道通訊 @Created_Time: 2019/1/26 16:58 """ import mmap import time from multiprocessing import Pipe, Pool, Manager, Process import pipe # mmap_file = None BUFFSIZE =1024 FILE = "1.txt" mmap_file = mmap.mmap(-1, BUFFSIZE, access = mmap.ACCESS_WRITE, tagname = 'share_mmap') def consumer(i): time.sleep(5) print("進來了c") num = 1 # global mmap_file # mmap_file=mmap.mmap(-1, BUFFSIZE, access = mmap.ACCESS_READ, tagname = 'share_mmap') while True: tell = mmap_file.tell() s = mmap_file.readline().decode("utf8") length = len(s) print("我進入consumer內層循環:") if s : print("這裏是消費者%d正在吃個人第%d個包子"%(i, num )) print("-----",s) num = num + 1 time.sleep(2) else: time.sleep(5) def producer(i): print("進來了p") num = 1 # global mmap_file # mmap_file = mmap.mmap(-1, BUFFSIZE, access=mmap.ACCESS_WRITE, tagname='share_mmap') while True: if num < 25: baoziname = "%d廚師%d號包子"%(i, num) print("這裏是生產者%d正在生產個人第%d個包子"%(i, num )) print("長度:",len(baoziname.encode("utf8") + '\n'.encode("utf8"))) time.sleep(1) mmap_file.write(baoziname.encode("utf8") + '\n'.encode("utf8")) mmap_file.flush() num += 1 if __name__ == '__main__': C_SIZE = 1 P_SIZE = 1 pool_c = Pool(C_SIZE) pool_p = Pool(P_SIZE) print("-------------------------開始--------------") for i in range(P_SIZE): pool_p.apply_async(producer, (i,)) for i in range(C_SIZE): pool_c.apply_async(consumer,(i,)) print("-------------------------完事--------------") pool_c.close() pool_p.close() pool_c.join() pool_p.join() # C_SIZE = 5 # P_SIZE = 3 # print("-------------------------開始--------------") # clist = [] # plist = [] # for i in range(C_SIZE): # t = Process(target=producer, args=(nm, i)) # t.start() # plist.append(t) # for i in range(P_SIZE): # t = Process(target=consumer, args=(nm, i)) # t.start() # clist.append(t) # # for i in clist: # i.join() # # for i in plist: # i.join() # print("-------------------------完事--------------")
建立mmap:
----建立 mmap 對象 mmap(filedesc, length, tagname='') #windows mmap(filedesc, length, flag=MAP_SHARED, prot=PROT_READ|PROT_WRITE) #Unix #第一個爲文件,先寫須要大小的文件,保存 建立並返回一個 mmap 對象,參數 filedesc 一般是由 f.fileno()得到的,這在Python文件系列中已經介紹過。 mmap 建立對象的含義是:將指定 fd 的前 length 字節映射到內存。 Windows中,能夠經過參數tagname爲一段內存映射指定名稱,這樣一個文件上面能夠同時具備多個 mmap。windows中的內存映射都是可讀可寫的,同時在進程之間共享。這時候指定fileno爲-1就能夠,第二個爲字節大小就能夠 Unix平臺上,參數 flags 的可選值包括: mmap.MAP_PRIVATE:這段內存映射只有本進程可用; mmap.MAP_SHARED:將內存映射和其餘進程共享,全部映射了同一文件的進程,都可以看到其中一個所作的更改;默認是共享 參數 prot 對應的取值包括:mmap.PROT_READ, mmap.PROT_WRITE 和 mmap.PROT_WRITE | mmap.PROT_READ。最後一者的含義是同時可讀可寫。 length:要映射文件部分的大小(以字節爲單位),這個值爲0,則映射整個文件,若是大小大於文件當前大小,則擴展這個文件。 flags:MAP_PRIVATE:這段內存映射只有本進程可用;mmap.MAP_SHARED:將內存映射和其餘進程共享,全部映射了同一文件的進程,都可以看到其中一個所作的更改; prot:mmap.PROT_READ, mmap.PROT_WRITE 和 mmap.PROT_WRITE | mmap.PROT_READ。最後一者的含義是同時可讀可寫。 access:在mmap中有可選參數access的值有 ACCESS_READ:讀訪問。 ACCESS_WRITE:寫訪問,默認。 ACCESS_COPY:拷貝訪問,不會把更改寫入到文件,使用flush把更改寫到文件。
m.close() 關閉 m 對應的文件; m.find(str, start=0) 從 start 下標開始,在 m 中從左往右尋找子串 str 最先出現的下標; m.flush([offset, n]) 把 m 中從offset開始的n個字節刷到對應的文件中,參數 offset 要麼同時指定,要麼同時不指定; m.move(dstoff, srcoff, n) 等於 m[dstoff:dstoff+n] = m[srcoff:srcoff+n],把從 srcoff 開始的 n 個字節複製到從 dstoff 開始的n個字節,可能會覆蓋重疊的部分。 m.read(n) 返回一個字符串,從 m 對應的文件中最多讀取 n 個字節,將會把 m 對應文件的位置指針向後移動; m.read_byte() 返回一個1字節長的字符串,從 m 對應的文件中讀1個字節,要是已經到了EOF還調用 read_byte(),則拋出異常 ValueError; m.readline() 返回一個字符串,從 m 對應文件的當前位置到下一個'\n',當調用 readline() 時文件位於 EOF,則返回空字符串; m.resize(n) 把 m 的長度改成 n,m 的長度和 m 對應文件的長度是獨立的; m.seek(pos, how=0) 同 file 對象的 seek 操做,改變 m 對應的文件的當前位置; m.size() 返回 m 對應文件的長度(不是 m 對象的長度len(m)); m.tell() 返回 m 對應文件的當前位置; m.write(str) 把 str 寫到 m 對應文件的當前位置,若是從 m 對應文件的當前位置到 m 結尾剩餘的空間不足len(str),則拋出 ValueError; m.write_byte(byte) 把1個字節(對應一個字符)寫到 m 對應文件的當前位置,實際上 m.write_byte(ch) 等於 m.write(ch)。若是 m 對應文件的當前位置在 m 的結尾,也就是 m 對應文件的當前位置到 m 結尾剩餘的空間不足1個字節,write() 拋出異常ValueError,而 write_byte() 什麼都不作
value和array缺點
優勢:快速在進程間傳遞數據
缺點: 數據安全上存在風險,內存中的內容會被其餘進程覆蓋或 者篡改
value和array用法:
from multiprocessing import Value , Array
Value:將一個值存放在內存中,
Array:將多個數據存放在內存中,但要求數據類型一致
class ctypes.c_byte Represents the C signed char datatype, and interprets the value as small integer. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_char Represents the C char datatype, and interprets the value as a single character. The constructor accepts an optional string initializer, the length of the string must be exactly one character. class ctypes.c_char_p Represents the C char * datatype when it points to a zero-terminated string. For a general character pointer that may also point to binary data, POINTER(c_char) must be used. The constructor accepts an integer address, or a bytes object. class ctypes.c_double Represents the C double datatype. The constructor accepts an optional float initializer. class ctypes.c_longdouble Represents the C long double datatype. The constructor accepts an optional float initializer. On platforms where sizeof(long double) == sizeof(double) it is an alias to c_double. class ctypes.c_float Represents the C float datatype. The constructor accepts an optional float initializer. class ctypes.c_int Represents the C signed int datatype. The constructor accepts an optional integer initializer; no overflow checking is done. On platforms where sizeof(int) == sizeof(long) it is an alias to c_long. class ctypes.c_int8 Represents the C 8-bit signed int datatype. Usually an alias for c_byte. class ctypes.c_int16 Represents the C 16-bit signed int datatype. Usually an alias for c_short. class ctypes.c_int32 Represents the C 32-bit signed int datatype. Usually an alias for c_int. class ctypes.c_int64 Represents the C 64-bit signed int datatype. Usually an alias for c_longlong. class ctypes.c_long Represents the C signed long datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_longlong Represents the C signed long long datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_short Represents the C signed short datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_size_t Represents the C size_t datatype. class ctypes.c_ssize_t Represents the C ssize_t datatype. New in version 3.2. class ctypes.c_ubyte Represents the C unsigned char datatype, it interprets the value as small integer. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_uint Represents the C unsigned int datatype. The constructor accepts an optional integer initializer; no overflow checking is done. On platforms where sizeof(int) == sizeof(long) it is an alias for c_ulong. class ctypes.c_uint8 Represents the C 8-bit unsigned int datatype. Usually an alias for c_ubyte. class ctypes.c_uint16 Represents the C 16-bit unsigned int datatype. Usually an alias for c_ushort. class ctypes.c_uint32 Represents the C 32-bit unsigned int datatype. Usually an alias for c_uint. class ctypes.c_uint64 Represents the C 64-bit unsigned int datatype. Usually an alias for c_ulonglong. class ctypes.c_ulong Represents the C unsigned long datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_ulonglong Represents the C unsigned long long datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_ushort Represents the C unsigned short datatype. The constructor accepts an optional integer initializer; no overflow checking is done. class ctypes.c_void_p Represents the C void * type. The value is represented as integer. The constructor accepts an optional integer initializer. class ctypes.c_wchar Represents the C wchar_t datatype, and interprets the value as a single character unicode string. The constructor accepts an optional string initializer, the length of the string must be exactly one character. class ctypes.c_wchar_p Represents the C wchar_t * datatype, which must be a pointer to a zero-terminated wide character string. The constructor accepts an integer address, or a string. class ctypes.c_bool Represent the C bool datatype (more accurately, _Bool from C99). Its value can be True or False, and the constructor accepts any object that has a truth value. class ctypes.HRESULT Windows only: Represents a HRESULT value, which contains success or error information for a function or method call. class ctypes.py_object Represents the C PyObject * datatype. Calling this without an argument creates a NULL PyObject * pointer. The ctypes.wintypes module provides quite some other Windows specific data types, for example HWND, WPARAM, or DWORD. Some useful structures like MSG or RECT are also defined. Structured data types class ctypes.Union(*args, **kw) Abstract base class for unions in native byte order. class ctypes.BigEndianStructure(*args, **kw) Abstract base class for structures in big endian byte order. class ctypes.LittleEndianStructure(*args, **kw) Abstract base class for structures in little endian byte order.
Array(typecode_or_type, size_or_initializer, *, lock=True)
使用基本相似於Value,Returns a synchronized shared array
typecode_or_type:定義轉換成C語言的存儲類型;
size_or_initializer:初始化共享內存空間,
from multiprocessing.sharedctypes import Array from multiprocessing import Process from time import sleep def read(data): sleep(0.5) while True: data.append(1) # 每隔一秒讀一次 print(data) sleep(1) def write(data): while True: print(data[0],data[1],data[2]) if __name__ == '__main__': data = Array('i', [1,2,3]) # 定義一個共享內存,數據類型string類型 Process(target=write, args=(data,)).start() while True: sleep(1)
from multiprocessing import Process, Manager def test(data): print(id(data)) data.append(2) if __name__ == '__main__': m = Manager() data = m.list() #dict({"1":"2"}) data.append(1) print(id(data)) p =Process(target=test, args=(data,)) p.start() p.join() print(data,id(data))
Python中進程間共享數據,處理基本的queue,pipe和value
+
array外,還提供了更高層次的封裝。使用multiprocessing.Manager能夠簡單地使用這些高級接口。
Manager()返回的manager對象控制了一個server進程,此進程包含的python對象能夠被其餘的進程經過proxies來訪問。從而達到多進程間數據通訊且安全。
Manager支持的類型有
list
,
dict
,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
from multiprocessing import Process,Value,Lock from multiprocessing.managers import BaseManager class Employee(object): def __init__(self,name,salary): self.name=name self.salary=Value('i',salary) def increase(self): self.salary.value+=100 def getPay(self): return self.name+':'+str(self.salary.value) class MyManager(BaseManager): pass def Manager2(): m=MyManager() m.start() return m MyManager.register('Employee',Employee) def func1(em,lock): with lock: em.increase() if __name__ == '__main__': manager=Manager2() em=manager.Employee('zhangsan',1000) lock=Lock() proces=[Process(target=func1,args=(em,lock))for i in xrange(10)] for p in proces: p.start() for p in proces: p.join() print em.getPay()
import time from multiprocessing import Process from random import randint from multiprocessing.managers import BaseManager from queue import Queue,LifoQueue,PriorityQueue class MyManager(BaseManager): pass MyManager.register("Queue", Queue) MyManager.register("LifoQueue", LifoQueue) MyManager.register("PriorityQueue", PriorityQueue) m = LifoQueue() def producer(q): count = 0 while 1: if count > 5: break pri = randint(0, 100) print(f'put :{pri}') q.put((pri, 222, pri)) # (priority, func, args) count += 1 if __name__ == '__main__': m = MyManager() m.start() q = m.PriorityQueue() # q是個代理(proxy),等因而用manager構造了一個多進程能夠共享的PriorityQueue, 而原本PriorityQueue只能線程間共享, 等因而用manager構造了一個多進程能夠共享的PriorityQueue, 而原本PriorityQueue只能線程間共享 t = Process(target=producer, args=(q,)) t.start() time.sleep(1) t.join()
Semaphore爲信號量機制。當共享的資源擁有多個時,可用Semaphore來實現進程同步。其用法和Lock差很少,s = Semaphore(N),每執行一次s.acquire(),該資源的可用個數將減小1,當資源個數已爲0時,就進入阻塞;每執行一次s.release(),佔用的資源被釋放,該資源的可用個數增長1。
import time import multiprocessing def test1(i): while True: i.acquire() # print("test1得到一個資源",multiprocessing.current_process(),i.__dict__) time.sleep(3) i.release() def test2(i): while True: i.acquire() print("test2得到一個資源") time.sleep(3) i.release() from multiprocessing import Semaphore, Process if __name__ == '__main__': s = Semaphore(5) t = Process(target=test1, args=(s,)) t.start() t = Process(target=test2, args=(s,)) t.start()
信號(signal)-- 進程之間通信的方式。一個進程一旦接收到信號就會打斷原來的程序執行流程來處理信號。
幾個經常使用信號:
SIGINT 終止進程 中斷進程 (control+c)
SIGTERM 終止進程 軟件終止信號
SIGKILL 終止進程 殺死進程
SIGALRM 鬧鐘信號
相對於共享內存,信號更加偏向於系統層面的,linux系統也是經過信號來管理進程,並且系統也規定了某些進程接到某些信號後的行爲。
固然咱們能夠經過綁定信號處理函數來修改進程收到信號之後的行爲
#encoding=utf-8 import os import signal from time import sleep def my_term(a,b): print "收到sigterm信號" signal.signal(signal.SIGTERM,my_term) def my_usr1(a,b): print "收到SIGUSR1信號" signal.signal(signal.SIGUSR1,my_usr1) while 1: print "我是進程id是",os.getpid() sleep(1)
signal.SIGHUP # 鏈接掛斷; signal.SIGILL # 非法指令; signal.SIGINT # 終止進程(ctrl+c); signal.SIGTSTP # 暫停進程(ctrl+z); signal.SIGKILL # 殺死進程(此信號不能被捕獲或忽略); signal.SIGQUIT # 終端退出; signal.SIGTERM # 終止信號,軟件終止信號; signal.SIGALRM # 鬧鐘信號,由signal.alarm()發起; signal.SIGCONT # 繼續執行暫停進程;
signal.signal(signalnum, handler) 設置信號處理的函數
signal.alarm(time) 設置發送SIGALRM信號的定時器
os.kill 這個不屬於signal模塊,但其可使用給某一進程發送信號
子進程或者兄弟進程間的:Queue, manager,匿名管道
全部進程間的:有名管道,文件mmap
Queue: pool不能用multiprocessing,能用manager的
pipe:效率比Queue快
import threading import time def test1(): while True: print("這是test1",threading.current_thread()) time.sleep(5) def test2(): while True: print("這是test2",threading.current_thread()) time.sleep(5) # list = ["test1", "test2"] print( eval("test1")) # tlist = [threading.Thread(target=globals().get(x), name=x+"的名字") for x in list] #高級建立多個 tlist = [threading.Thread(target=eval(x), name=x+"的名字") for x in list] #高級建立多個 # threading.Thread(target=test1, args=(), name="test1的名字") # threading.Thread(target=test2, args=(), name="test2的名字") for thread in tlist: thread.start() for thread in tlist: thread.join() print("=-------------------end--------")
import threading import time class MyThread(threading.Thread): def __init__(self,*args, **kwargs): super().__init__(*args, **kwargs) def run(self): print("這個一個自定義類必須運行的函數") try: if self._target: self._target(*self._args, **self._kwargs) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs def test1(): while True: print("這是test1",threading.current_thread()) time.sleep(5) def test2(): while True: print("這是test2",threading.current_thread()) time.sleep(5) # list = ["test1", "test2"] t = [MyThread(target=eval(i)) for i in list] for i in t: i.start() for i in t: i.join()
threading.setDaemon(True) ###設置爲守護進程,若主進程完事,不用等待子進程 threading.join(timeout) ###若設置,則主進程阻塞,知道線程完成或者時間超過,若設置了threading.setDamon, 則仍然會阻塞 #####查詢類 threading.current_thread() threading.currentThread() #當前線程的描述符 threading.active_count() threading.activeCount() #當前活動線程數量 threading.current_thread().name threading.current_thread().getName() t[0].getName() t[0].name #當前進程的名字 threading.current_thread().daemon 是否爲守護進程 threading.current_thread().ident 線程的標識符 threading.main_thread() 線程的主線程 threading.enumerate() ###當前進程中的線程 ####判斷類 threading.current_thread().isAlive() threading.main_thread().is_alive() ##是否激活狀態 ###設置類 threading.currentThread().setName("Main Thread zzzz") #設置名字 threading.currentThread().setDaemon(True) ####設置守護線程,可是必須在start以前設置
threading.settrace(func) ###設置一個跟蹤函數,用於在run()執行以前被調用。
threading.setprofile(func) ###設置一個跟蹤函數,用於在run()執行完畢以後調用。
threading.Timer(time, func) ###幾秒以後執行func函數
import threading import time class MyThread(threading.Thread): def __init__(self,*args, **kwargs): super().__init__(*args, **kwargs) def run(self): print("這個一個自定義類必須運行的函數") try: if self._target: self._target(*self._args, **self._kwargs) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs def test1(): while True: print("這是test1",threading.current_thread()) time.sleep(5) def test2(): while True: print("這是test2",threading.current_thread()) time.sleep(5) # list = ["test1", "test2"] t = [MyThread(target=eval(i)) for i in list] t[0].setDaemon(True) t[1].setDaemon(True) for i in t: i.start() print(threading.current_thread(),threading.currentThread().name,threading.active_count(),threading.activeCount()) #####<_MainThread(MainThread, started 27672)> MainThread 3 3 print(threading.current_thread().name,threading.current_thread().getName(), t[0].getName(), t[0].name, threading.current_thread().daemon ) #####MainThread MainThread Thread-1 Thread-1 False print( threading.current_thread().ident, threading.main_thread()) ####14172 <_MainThread(MainThread, started 14172)> print(threading.current_thread().isAlive(),threading.main_thread().is_alive(),threading.enumerate()) ######True True [<_MainThread(MainThread, started 2276)>, <MyThread(Thread-1, started daemon 13176)>, <MyThread(Thread-2, started daemon 14848)>] threading.currentThread().setName("Main Thread zzzz") #######Main Thread zzzz # threading.currentThread().setDaemon(True) ####在start以前設置才能夠 print(threading.currentThread().getName()) for i in t: i.join(2)
import time from threading import Timer def test1(a): a = a+1 Timer(3,test1,(a,)).start() print(f"這是test1的{a}次調用",time.time()) if __name__ == '__main__': test1(0)
一、什麼是全局解釋器鎖
在同一個進程中只要有一個線程獲取了全局解釋器(cpu)的使用權限,那麼其餘的線程就必須等待該線程的全局解釋器(cpu)使 用權消失後才能使用全局解釋器(cpu),即時多個線程直接不會相互影響在同一個進程下也只有一個線程使用cpu,這樣的機制稱爲全局 解釋器鎖(GIL)。
二、全局解釋器鎖的好處
一、避免了大量的加鎖解鎖的好處
二、使數據更加安全,解決多線程間的數據完整性和狀態同步
三、全局解釋器的缺點
多核處理器退化成單核處理器,只能併發不能並行。
四、如圖所示:
同一時刻的某個進程下的某個線程只能被一個cpu所處理,因此在GIL鎖下的線程只能被併發,不能被並行。
import threading def test1(): while True: print("這是test1的名字", threading.currentThread().name) def test2(): while True: print("這是test2的名字", threading.currentThread().name) funclist = [test1, test2] tlist = [threading.Thread(target=x) for x in funclist] for thread in tlist: thread.start()
輸出結果:
import threading def test1(): while True: lock.acquire() print("這是test1的名字", threading.currentThread().name) lock.release() def test2(): while True: lock.acquire() print("這是test2的名字", threading.currentThread().name) lock.release() funclist = [test1, test2] tlist = [threading.Thread(target=x) for x in funclist] lock = threading.Lock() for thread in tlist: thread.start()
import multiprocessing def test1(lock): while True: lock.acquire() print("這是test1的名字", multiprocessing.current_process().name) lock.release() def test2(lock): while True: lock.acquire() print("這是test2的名字", multiprocessing.current_process().name) lock.release() if __name__ == '__main__': funclist = [test1, test2] lock = multiprocessing.Lock() tlist = [multiprocessing.Process(target=x , args=(lock,)) for x in funclist] for process in tlist: process.start()
import threading import time #####構造一個單例阻塞類,只能一我的構造,再有人構造阻塞 class single1(object): lock = threading.Lock() num = 0 def __del__(self): #實例刪除時候,釋放鎖 single1.lock.release() def __new__(cls, *args, **kwargs): print("進入single1的__new__一次") single1.lock.acquire() single1.num += 1 print(f"獲取到single1構造函數,目前裏面有{single1.num}人") return object.__new__(cls,*args, **kwargs) class single2(object): lock = threading.Lock() num = 0 def __del__(self): #實例刪除時候,釋放鎖 single2.lock.release() def __new__(cls, *args, **kwargs): print("進入single2的__new__一次") single2.lock.acquire() single2.num += 1 print(f"獲取到single2構造函數,目前裏面有{single2.num}人") return object.__new__(cls,*args, **kwargs) def test1(): while True: a = single1() print( "-----------------------") time.sleep(2) b = single2() print("這是{0}的內部".format( threading.currentThread().name)) def test2(): while True: a = single2() print("==================") time.sleep(2) b = single1() print("這是{0}的內部".format(threading.currentThread().name)) funclist = [test1, test2] tlist = [threading.Thread(target=x) for x in funclist] for thread in tlist: thread.start()
import threading import time #####構造一個單例阻塞類,只能一我的構造,再有人構造阻塞 class single1(object): lock = threading.Lock() num = 0 def __del__(self): #實例刪除時候,釋放鎖 single1.lock.release() single1.num -= 1 def __new__(cls, *args, **kwargs): print("進入single1的__new__一次") single1.lock.acquire() single1.num += 1 print(f"獲取到single1構造函數,目前裏面有{single1.num}人") return object.__new__(cls,*args, **kwargs) class single2(object): lock = threading.Lock() num = 0 def __del__(self): #實例刪除時候,釋放鎖 single2.lock.release() single2.num -= 1 def __new__(cls, *args, **kwargs): print("進入single2的__new__一次") single2.lock.acquire() single2.num += 1 print(f"獲取到single2構造函數,目前裏面有{single2.num}人") return object.__new__(cls,*args, **kwargs) def test1(): while True: print("-----------------------") rlock.acquire() #獲取single1 a = single1() rlock.acquire() #獲取single2 b = single2() print("這是{0}的內部".format( threading.currentThread().name)) del a,b rlock.release() rlock.release() time.sleep(2) def test2(): while True: print("==================") rlock.acquire() a = single2() rlock.acquire() b = single1() print("這是{0}的內部".format(threading.currentThread().name)) del a, b rlock.release() rlock.release() time.sleep(2) rlock = threading.RLock() funclist = [test1, test2] tlist = [threading.Thread(target=x) for x in funclist] for thread in tlist: thread.start()
import multiprocessing import time #####構造一個單例阻塞類,只能一我的構造,再有人構造阻塞 class single(object): def __init__(self,name): self.lock = multiprocessing.Lock() self.num = 0 self.name = name def get(self): self.lock.acquire() self.num += 1 return self.num def dele(self): self.lock.release() def test1(a,b): while True: print("-----------------------") # rlock.acquire() #獲取single1 print("a:",a,a.get(),a.name) time.sleep(2) # rlock.acquire() #獲取single2 print("b:",b,b.get(),b.name) print("這是{0}的內部".format( multiprocessing.current_process().name)) a.dele() b.dele() # rlock.release() # rlock.release() def test2(a,b): while True: print("==================") # rlock.acquire() print("b:",b,b.get(),b.name) time.sleep(2) print("--------",a.num) # rlock.acquire() print("a:",a,a.get(),a.name) print("這是{0}的內部".format(multiprocessing.current_process().name)) a.dele() b.dele() # rlock.release() # rlock.release() rlock = multiprocessing.RLock() if __name__ == '__main__': a = single("a") b = single("b") print("a:",a,"b:",b) funclist = [test1, test2] tlist = [multiprocessing.Process(target=x,args=(a,b)) for x in funclist] for process in tlist: process.start()
注意:
import multiprocessing import time #####構造一個單例阻塞類,只能一我的構造,再有人構造阻塞 class single(object): def __init__(self,name): self.lock = multiprocessing.Lock() self.num = 0 self.name = name def get(self): self.lock.acquire() self.num += 1 return self.num def dele(self): self.lock.release() def test1(a,b,rlock): while True: rlock.acquire() #獲取single1 print("-----------------------") print("a:",a,a.get(),a.name) time.sleep(2) rlock.acquire() #獲取single2 print("b:",b,b.get(),b.name) print("這是{0}的內部".format( multiprocessing.current_process().name)) a.dele() b.dele() rlock.release() rlock.release() time.sleep(1) def test2(a,b,rlock): while True: rlock.acquire() print("==================") print("b:",b,b.get(),b.name) time.sleep(2) rlock.acquire() print("a:",a,a.get(),a.name) print("這是{0}的內部".format(multiprocessing.current_process().name)) a.dele() b.dele() rlock.release() rlock.release() time.sleep(1) if __name__ == '__main__': rlock = multiprocessing.RLock() a = single("a") b = single("b") print("a:",a,"b:",b) funclist = [test1, test2] tlist = [multiprocessing.Process(target=x,args=(a,b,rlock)) for x in funclist] for process in tlist: process.start()
注意:
Condition(條件變量)一般與一個鎖關聯。須要在多個Contidion中共享一個鎖時,能夠傳遞一個Lock/RLock實例給構造方法,不然它將本身生成一個RLock實例。
能夠認爲,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處於等待阻塞狀態,直到另外一個線程調用notify()/notifyAll()通知;獲得通知後線程進入鎖定池等待鎖定。
構造方法:
Condition([lock/rlock]) 默認遞歸鎖
實例方法:
acquire([timeout])/release(): 調用關聯的鎖的相應方法。
wait([timeout]): 調用這個方法將使線程進入Condition的等待池等待通知,並釋放鎖。使用前線程必須已得到鎖定,不然將拋出異常。
notify(): 調用這個方法將從等待池挑選一個線程並通知,收到通知的線程將自動調用acquire()嘗試得到鎖定(進入鎖定池);其餘線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
notifyAll(): 調用這個方法將通知等待池中全部的線程,這些線程都將進入鎖定池嘗試得到鎖定。調用這個方法不會釋放鎖定。使用前線程必須已得到鎖定,不然將拋出異常。
import threading import queue, time, random class Goods: # 產品類 def __init__(self): self.count = 0 def add(self, num=1): self.count += num def sub(self): if self.count >= 0: self.count -= 1 def empty(self): return self.count <= 0 class Producer(threading.Thread): # 生產者類 def __init__(self, condition, goods, sleeptime=1): # sleeptime=1 threading.Thread.__init__(self) self.cond = condition self.goods = goods self.sleeptime = sleeptime def run(self): cond = self.cond goods = self.goods time.sleep(3) while True: cond.acquire() # 鎖住資源 goods.add() print("產品數量:", goods.count, "生產者線程", time.asctime()) cond.notifyAll() # 喚醒全部等待的線程--》其實就是喚醒消費者進程 cond.release() # 解鎖資源 time.sleep(self.sleeptime) class Consumer(threading.Thread): # 消費者類 def __init__(self, condition, goods, sleeptime=1): # sleeptime=2 threading.Thread.__init__(self) self.cond = condition self.goods = goods self.sleeptime = sleeptime def run(self): cond = self.cond goods = self.goods while True: time.sleep(self.sleeptime) cond.acquire() # 鎖住資源 while goods.empty(): # 如無產品則讓線程等待 cond.wait() goods.sub() print("產品數量:", goods.count, "消費者線程",time.asctime()) cond.release() # 解鎖資源 g = Goods() c = threading.Condition() P_SIZE = 1 C_SIZE = 1 pro = [Producer(c, g)for i in range(P_SIZE)] for p in pro: p.start() con = [Consumer(c, g) for i in range(C_SIZE)] for c in con: c.start()
注意:
線程的conditon不寫了,應該差很少,在multiprocessing中
import threading import time class Semaphore_ac(threading.Semaphore): def acquire(self, num, blocking: bool = True, timeout: float = None): if not blocking and timeout is not None: raise ValueError("can't specify timeout for non-blocking acquire") rc = False endtime = None with self._cond: while self._value < num: if not blocking: break if timeout is not None: if endtime is None: endtime = _time() + timeout else: timeout = endtime - _time() if timeout <= 0: break self._cond.wait(timeout) else: self._value -= num rc = True return rc def release(self,num): """Release a semaphore, incrementing the internal counter by one. When the counter is zero on entry and another thread is waiting for it to become larger than zero again, wake up that thread. """ with self._cond: self._value += num self._cond.notify() def test1(s): s.acquire(2) print("進入到%s線程"%threading.currentThread().name , time.ctime()) time.sleep(2) s.release(2) if __name__ == '__main__': # sema = threading.Semaphore(5) sema = Semaphore_ac(5) T_SIZE = 10 tlist = [threading.Thread(target=test1,args=(sema,))for i in range(T_SIZE)] for i in tlist: i.start()
event.isSet()返回event的狀態值 event.wait()若是event.isSet()==False將阻塞線程 event.set()設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態,等待操做系統調度 event.clear() 恢復event的狀態值爲Flase
import random import threading import time def test1(event): if not event.isSet(): event.wait() time.sleep(random.random()) print("--------------",event.isSet()) print("這是%s獲取test1的值" % threading.currentThread().name) event.clear() def test2(s): time.sleep(2) print("這是%s設置test2的值"%threading.currentThread().name) event.set() if __name__ == '__main__': event = threading.Event() T_SIZE = 5 tlist1 = [threading.Thread(target=test1,args=(event,))for i in range(T_SIZE)] tlist2 = [threading.Thread(target=test2,args=(event,))for i in range(T_SIZE)] for i in tlist1: i.start() for i in tlist2: i.start()
注意:
import random import threading import time def test1(lock): with lock: print("這是test1") raise ValueError("我錯了") def test2(lock): time.sleep(2) print("-------------") with lock : print("這是test2") print("==============") if __name__ == '__main__': lock = threading.Lock() T_SIZE = 1 tlist1 = [threading.Thread(target=test1,args=(lock,))for i in range(T_SIZE)] tlist2 = [threading.Thread(target=test2,args=(lock,))for i in range(T_SIZE)] for i in tlist1: i.start() for i in tlist2: i.start()
import contextlib @contextlib.contextmanager def test(): print("1111") yield "2" print("333333") with test() as f: print("======",f)
上文提到若是咱們要實現一個自定義的上下文管理器,須要定義一個實現了__enter__和__exit__兩個方法的類, 這顯示不是很方便。Python的contextlib模塊給咱們提供了更方便的方式來實現一個自定義的上下文管理器。contextlib模塊包含一個裝飾器contextmanager和一些輔助函數,裝飾器contextmanager只須要寫一個生成器函數就能夠代替自定義的上下文管理器,典型用法以下:
例子1:鎖資源自動獲取和釋放的例子
@contextmanager
def locked(lock):
lock.acquire()
try:
yield
finally:
lock.release()
with locked(myLock):
#代碼執行到這裏時,myLock已經自動上鎖
pass
#執行完後會,會自動釋放鎖
例子2:文件打開後自動管理的實現
@contextmanager def myopen(filename, mode="r"): f = open(filename,mode) try: yield f finally: f.close() with myopen("test.txt") as f: for line in f: print(line)
例子3:數據庫事務的處理
@contextmanager def transaction(db): db.begin() try: yield except: db.rollback() raise else: db.commit() with transaction(mydb): mydb.cursor.execute(sql) mydb.cursor.execute(sql) mydb.cursor.execute(sql) mydb.cursor.execute(sql)
contextlib模塊還提供了一個函數給咱們:nested(mgr1,mgr2...mgrn)函數,用來嵌套多個上下文管理器,等同於下面的形式:
with mgr1: with mgr2: ... with mgrn: pass
可是with語句自己已經支持了多個下文管理器的使用,因此nested的意義不是很大。咱們能夠寫一個例子來看下nested函數的使用,以及與直接使用with來嵌套多個上下文管理器的區別,以下所示:
1 from contextlib import contextmanager 2 from contextlib import nested 3 from contextlib import closing 4 5 @contextmanager 6 def my_context(name): 7 print("enter") 8 try: 9 yield name 10 finally: 11 print("exit") 12 13 #使用nested函數來調用多個管理器 14 print("---------使用nested函數調用多個管理器-----------") 15 with nested(my_context("管理器一"), my_context("管理器二"),my_context("管理器三")) as (m1,m2,m3): 16 print(m1) 17 print(m2) 18 print(m3) 19 20 #直接使用with來調用調用多個管理器 21 print("---------使用with調用多個管理器-----------") 22 with my_context("管理器一") as m1, my_context("管理器二") as m2, my_context("管理器三") as m3: 23 print(m1) 24 print(m2) 25 print(m3)
輸出結果爲:
contextlib中還包含一個closing對象,這個對象就是一個上下文管理器,它的__exit__函數僅僅調用傳入參數的close函數,closing對象的源碼以下:
1 class closing(object): 18 def __init__(self, thing): 19 self.thing = thing 20 def __enter__(self): 21 return self.thing 22 def __exit__(self, *exc_info): 23 self.thing.close()
因此closeing上下文管理器僅使用於具備close()方法的資源對象。例如,若是咱們經過urllib.urlopen打開一個網頁,urlopen返回的request有close方法,因此咱們就可使用closing上下文管理器,以下:
import urllib, sys from contextlib import closing with closing(urllib.urlopen('http://www.yahoo.com')) as f: for line in f: sys.stdout.write(line)
Python的select()方法直接調用操做系統的IO接口,它監控sockets,open files, and pipes(全部帶fileno()方法的文件句柄)什麼時候變成readable 和writeable, 或者通訊錯誤,select()使得同時監控多個鏈接變的簡單,而且這比寫一個長循環來等待和監控多客戶端鏈接要高效,由於select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器。
select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢。select的一 個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制,可是這樣會形成效率的下降
select(rlist, wlist, xlist, timeout=None)
select()方法接收並監控3個通訊列表, 第一個rlist監控全部要進來的輸入數據,第二個wlist是監控全部要發出去的輸出數據,第三個監控異常錯誤數據,第四個設置指定等待時間,若是想當即返回,設爲null便可,最後須要建立2個列表來包含輸入和輸出信息來傳給select(),讓select方法經過內核去監控,而後生成三個實例。
#創建兩個列表,好比想讓內核去檢測50個鏈接,須要傳給它一個列表,就是這個inputs(列表中裏面存放的是須要被內核監控的連接),而後交給select,就至關於交給內核了 inputs = [server,] #輸入列表,監控全部輸入數據 outputs = [] #輸入列表,監控全部輸出數據 #把兩個列表傳給select方法經過內核去監控,生成三個實例 readable,writeable,exceptional = select.select(inputs,outputs,inputs) # 這裏select方法的第三個參數一樣傳入input列表是由於,input列表中存放着全部的連接,好比以前放入的50被監控連接中有5個斷了,出現了異常,就會輸入到exceptional裏面,但這5連接自己是放在inputs列表中
實例:
import select, socket, queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(("localhost", 9999)) server.listen(1000) server.setblocking(False) # 設置爲非阻塞 msg_dic = dict() # 定義一個隊列字典 inputs = [server, ] # 因爲設置成非阻塞模式,accept和recive都不阻塞了,沒有值就會報錯,所以最開始須要最開始須要監控服務端自己,等待客戶端鏈接 outputs = [] while True: # exceptional表示若是inputs列表中出現異常,會輸出到這個exceptional中 readable, writeable, exceptional = select.select(inputs, outputs, inputs) # 若是沒有任何客戶端鏈接,就會阻塞在這裏 for r in readable: # 沒有個r表明一個socket連接 if r is server: # 若是這個socket是server的話,就說明是是新客戶端鏈接了 conn, addr = r.accept() # 新鏈接進來了,接受這個鏈接,生成這個客戶端實例 print("來了一個新鏈接", addr) inputs.append(conn) # 爲了避免阻塞整個程序,咱們不會馬上在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新鏈接 # 就會被交給select去監聽 msg_dic[conn] = queue.Queue() # 初始化一個隊列,後面存要返回給這個客戶端的數據 else: # 若是不是server,就說明是以前創建的客戶端來數據了 data = r.recv(1024) print("收到數據:", data) msg_dic[r].put(data) # 收到的數據先放到queue裏,一會返回給客戶端 outputs.append(r) # 爲了避免影響處理與其它客戶端的鏈接 , 這裏不馬上返回數據給客戶端 # r.send(data) # print("send done....") for w in writeable: # 要返回給客戶端的連接列表 print("進入到writeable") data_to_client = msg_dic[w].get() w.send(data_to_client) # 返回給客戶端的源數據 outputs.remove(w) # 確保下次循環的時候writeable,不返回這個已經處理完的這個鏈接了 for e in exceptional: # 處理異常的鏈接 if e in outputs: # 由於e不必定在outputs,因此先要判斷 outputs.remove(e) inputs.remove(e) # 刪除inputs中異常鏈接 del msg_dic[e] # 刪除此鏈接對應的隊列
import select import sys import time import socket sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ip_port = ("127.0.0.1", 9999) sk.connect(ip_port) if __name__ == '__main__': while True: sk.send(b"123") print("發送了一遍123") time.sleep(2) try : print("接受到",sk.recv(1024,False)) except Exception as e: print("沒接受到",e)
從以上可知,epoll是對select、poll模型的改進,提升了網絡編程的性能,普遍應用於大規模併發請求的C/S架構中。
一、觸發方式:
邊緣觸發/水平觸發,只適用於Unix/Linux操做系統
二、原理圖
三、通常步驟
事件常量 | 意義 |
---|---|
EPOLLIN | 讀就緒 |
EPOLLOUT | 寫就緒 |
EPOLLPRI | 有數據緊急讀取 |
EPOLLERR | assoc. fd有錯誤狀況發生 |
EPOLLHUP | assoc. fd發生掛起 |
EPOLLET | 設置邊緣觸發(ET)(默認的是水平觸發) |
EPOLLONESHOT | 設置爲 one-short 行爲,一個事件(event)被拉出後,對應的fd在內部被禁用 |
EPOLLRDNORM | 和 EPOLLIN 相等 |
EPOLLRDBAND | 優先讀取的數據帶(data band) |
EPOLLWRNORM | 和 EPOLLOUT 相等 |
EPOLLWRBAND | 優先寫的數據帶(data band) |
EPOLLMSG | 忽視 |
epoll.close()
關閉epoll對象的文件描述符。epoll.fileno
返回control fd的文件描述符number。epoll.fromfd(fd)
用給予的fd來建立一個epoll對象。epoll.register(fd[, eventmask])
在epoll對象中註冊一個文件描述符。(若是文件描述符已經存在,將會引發一個IOError
)epoll.modify(fd, eventmask)
修改一個已經註冊的文件描述符。epoll.unregister(fd)
註銷一個文件描述符。epoll.poll(timeout=-1[, maxevnets=-1])
等待事件,timeout(float)的單位是秒(second)。import socket import select import queue # 建立socket對象 serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 設置IP地址複用 serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # ip地址和端口號 server_address = ("127.0.0.1", 6666) # 綁定IP地址 serversocket.bind(server_address) # 監聽,並設置最大鏈接數 serversocket.listen(10) print("服務器啓動成功,監聽IP:", server_address) # 服務端設置非阻塞 serversocket.setblocking(False) # 超時時間 timeout = 10 # 建立epoll事件對象,後續要監控的事件添加到其中 epoll = select.epoll() # 註冊服務器監聽fd到等待讀事件集合 epoll.register(serversocket.fileno(), select.EPOLLIN) # 保存鏈接客戶端消息的字典,格式爲{} message_queues = {} # 文件句柄到所對應對象的字典,格式爲{句柄:對象} fd_to_socket = {serversocket.fileno(): serversocket, } num = 0 while True: print("等待活動鏈接......") # 輪詢註冊的事件集合,返回值爲[(文件句柄,對應的事件),(...),....] events = epoll.poll(timeout) if not events: print("epoll超時無活動鏈接,從新輪詢......") continue print("有", len(events), "個新事件,開始處理......") for fd, event in events: print('fd and event: ', fd, event) socket = fd_to_socket[fd] # 若是活動socket爲當前服務器socket,表示有新鏈接 print('socket and serversocket: ', socket, serversocket) print('----------event & select.EPOLLIN: ', event & select.EPOLLIN) if socket == serversocket: connection, address = serversocket.accept() # connection新套接字對象 print("新鏈接:", address) # 新鏈接socket設置爲非阻塞 connection.setblocking(False) # 註冊新鏈接fd到待讀事件集合 epoll.register(connection.fileno(), select.EPOLLIN) # 把新鏈接的文件句柄以及對象保存到字典 fd_to_socket[connection.fileno()] = connection # 以新鏈接的對象爲鍵值,值存儲在隊列中,保存每一個鏈接的信息 message_queues[connection] = queue.Queue() # 關閉事件 elif event & select.EPOLLHUP: print('client close') # 在epoll中註銷客戶端的文件句柄 epoll.unregister(fd) # 關閉客戶端的文件句柄 fd_to_socket[fd].close() # 在字典中刪除與已關閉客戶端相關的信息 del fd_to_socket[fd] # 可讀事件 elif event & select.EPOLLIN: # 接收數據 data = socket.recv(1024) if data: print("收到數據:", data, "客戶端:", socket.getpeername()) # 將數據放入對應客戶端的字典 message_queues[socket].put(data) # 修改讀取到消息的鏈接到等待寫事件集合(即對應客戶端收到消息後,再將其fd修改並加入寫事件集合) epoll.modify(fd, select.EPOLLOUT) num = 0 else: print('-------client close connect or send null data -----------') epoll.modify(fd, select.EPOLLHUP) # 客戶端關閉鏈接 # 可寫事件 elif event & select.EPOLLOUT: try: # 從字典中獲取對應客戶端的信息 msg = message_queues[socket].get_nowait() except queue.Empty: print(socket.getpeername(), " queue empty") # 修改文件句柄爲讀事件 epoll.modify(fd, select.EPOLLIN) else: print("發送數據:", data, "客戶端:", socket.getpeername()) # 發送數據 socket.send(msg) # 在epoll中註銷服務端文件句柄 epoll.unregister(serversocket.fileno()) # 關閉epoll epoll.close() # 關閉服務器socket serversocket.close()
#!/usr/bin/env python #-*- coding:utf-8 -*- import socket #建立客戶端socket對象 clientsocket = socket.socket(socket.AF_INET,socket.SOCK_STREAM) #服務端IP地址和端口號元組 server_address = ('127.0.0.1', 6666) #客戶端鏈接指定的IP地址和端口號 clientsocket.connect(server_address) while True: #輸入數據 data = input('please input:') #客戶端發送數據 clientsocket.sendall(data.encode("utf8")) #客戶端接收數據 server_data = clientsocket.recv(1024) print('客戶端收到的數據:', server_data) #關閉客戶端socket # clientsocket.close()
在linux是用epoll 在windows是用select實現的
selector.DefaultSelector()--------->註冊socket sel.register(server, selectors.EVENT_READ, accept)
import selectors, socket sel = selectors.DefaultSelector() def accept(sock, mask): "接收客戶端信息實例" conn, addr = sock.accept() print("accepted", conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) # 新鏈接註冊read回調函數 def read(conn, mask): "接收客戶端的數據" data = conn.recv(1024) if data: print("echoing", repr(data), 'to', conn) conn.send(data) else: print("closing", conn) sel.unregister(conn) conn.close() server = socket.socket() server.bind(('localhost', 9999)) server.listen(500) server.setblocking(False) sel.register(server, selectors.EVENT_READ, accept) # 註冊事件,只要來一個鏈接就調accept這個函數, # sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,] while True: events = sel.select() # 這個select,看起來是select,有可能調用的是epoll,看你操做系統是Windows的仍是Linux的 # 默認阻塞,有活動鏈接就返回活動鏈接列表 print("事件:", events) for key, mask in events: #key是事件組合,fileobj是socket, key.data是回調函數 callback = key.data # 至關於調accept了 callback(key.fileobj, mask) # key.fileobj=文件句柄
import socket, sys messages = [b'This is the message. ', b'It will be sent ', b'in parts.', ] server_address = ('localhost', 9999) # 建立100個 TCP/IP socket實例 socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(1)] # 鏈接服務端 print('connecting to %s port %s' % server_address) for s in socks: s.connect(server_address) for message in messages: # 發送消息至服務端 for s in socks: print('%s: sending "%s"' % (s.getsockname(), message)) s.send(message) # 從服務端接收消息 for s in socks: data = s.recv(1024) print('%s: received "%s"' % (s.getsockname(), data)) if not data: print(sys.stderr, 'closing socket', s.getsockname())
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
先gevent.spawn()--------------->gevent.joinall([])
import gevent def test(time): print(1) gevent.sleep(time) print(2) def test2(time): print(3) gevent.sleep(time) print(4) if __name__ == '__main__': gevent.joinall([ gevent.spawn(test, 2), gevent.spawn(test2, 3) ])
# /usr/bin/python3 # -*- coding: utf-8 -*- # Copyright (c) 2017 - walker <cail1844@gmail.com> import gevent import requests import re import timeit import codecs from threading import Thread from gevent import monkey monkey.patch_socket() def get_title(url,title_list=[]): try: r = requests.get(url,timeout=5) r.encoding = 'utf8' html = r.text title = re.search(r'<title>(.*?)</title>',html).group(1) except TimeoutError: title = '' if title: title_list.append(title) return title def get_url(): baseurl = 'http://www.baidu.com/s?cl=3&tn=baidutop10&fr=top1000&wd=%s' f = codecs.open('muci.txt','r','utf8') url_list = [] for key in f.readlines(): url_list.append(baseurl % key.strip()) return url_list url_list = get_url() def run1(): title_list = [] for url in url_list: get_title(url,title_list) # title_list.append(title) print('Sync result length:',len(title_list),title_list) return title_list def run2(): title_list = [] threads = [gevent.spawn(get_title,url,title_list) for url in url_list] gevent.joinall(threads) # title_list = [thread.value for thread in threads] print('gevent result length:',len(title_list),title_list) return title_list def run3(): title_list = [] th = [] for url in url_list: t = Thread(target=get_title,args=(url,title_list)) th.append(t) t.start() for t in th: t.join() print('threading result length:',len(title_list),title_list) return title_list if __name__ == '__main__': t1 = timeit.timeit('run1()',setup="from __main__ import run1",number=1) print('sync time:',t1) t2 = timeit.timeit('run2()',setup="from __main__ import run2",number=1) print('gevent time:',t2) t3 = timeit.timeit('run3()',setup="from __main__ import run3",number=1) print('thread time:',t3)
asyncio的使用上,感受和gevent有殊途同歸之妙
一、基礎概念:
event_loop 事件循環:理解爲一個循環的池,裏面存放一些async關鍵詞定義的協程函數,只有放到循環池裏才能執行
coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。
task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含任務的各類狀態。
future:表明未來執行或沒有執行的任務的結果。它和task上沒有本質的區別
async/await 關鍵字:python3.5 用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。
import time import asyncio now = lambda : time.time() async def do_some_work(x): # 使用async關鍵字定義協程 print('Waiting: ', x) start = now() coroutine = do_some_work(2) # 建立協程對象 loop = asyncio.get_event_loop() # 建立一個事件循環(池) loop.run_until_complete(coroutine) # 將協程對象包裝並註冊協程對象 2 Waiting: 2 TIME: 0.0004658699035644531
二、建立task
協程對象不能直接運行,須要包裝成任務才能運行,上面是經過run_until_complete()方法包裝成task(隱式包裝),還有下面兩種方式進行顯式包裝:
import asyncio import time now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() # task = asyncio.ensure_future(coroutine) # 方式一 task = loop.create_task(coroutine) # 方式二 print(task) loop.run_until_complete(task) print(task) print('TIME: ', now() - start) 2 3 4 <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:17>> Waiting: 2 <Task finished coro=<do_some_work() done, defined at /Users/ghost/Rsj217/python3.6/async/async-main.py:17> result=None> TIME: 0.0003490447998046875
建立task後,task在加入事件循環以前是pending狀態,加入loop後運行中是running狀態,loop調用完是Done,運行完是finished狀態,雖然說本質上協程函數和task指的東西都同樣,可是task有了協程函數的狀態。
其中loop.run_until_complete()接受一個future參數,futurn具體指代一個協程函數,而task是future的子類,因此咱們不聲明一個task直接傳入協程函數也能執行。
三、綁定回調函數
經過task的task.add_done_callback(callback)方法綁定回調函數,回調函數接收一個future對象參數如task,在內部經過future.result()得到協程函數的返回值。
import asyncio async def test(x): return x+3 def callback(y): print(y.result()) coroutine = test(5) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) task <Task pending coro=<test() running at <ipython-input-4-61142fef17d8>:1>> task.add_done_callback(callback) loop.run_until_complete(task) Out[10]: 8
或者直接經過下面方式也能獲取反饋結果
task.result()
四、await(掛起耗時操做)
多任務聲明瞭協程函數,也同時在loop中註冊了,他的執行也是順序執行的,由於在異步函數中沒有聲明那些操做是耗時操做,因此會順序執行。await的做用就是告訴控制器這個步驟是耗時的,async能夠定義協程對象,使用await能夠針對耗時的操做進行掛起
import asyncio import time async def test(1): time.sleep(1) print(time.time()) tasks = [asyncio.ensure_future(test()) for _ in range(3)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) 1547187398.7611663 1547187399.7611988 1547187400.7632194 Out[8]: ({<Task finished coro=<test() done, defined at <ipython-input-5-1534f9ca2d8e>:4> result=None>, <Task finished coro=<test() done, defined at <ipython-input-5-1534f9ca2d8e>:4> result=None>, <Task finished coro=<test() done, defined at <ipython-input-5-1534f9ca2d8e>:4> result=None>}, set())
上面執行並非異步執行,而是順序執行,可是改爲下面形式那就是異步執行:
import asyncio import time async def test(t): await asyncio.sleep(1) print(time.time()) tasks = [asyncio.ensure_future(test()) for _ in range(3)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) 1547187398.7611663 1547187399.7611988 1547187400.7632194 Out[11]: ({<Task finished coro=<test() done, defined at <ipython-input-9-3a874803716b>:4> result=None>, <Task finished coro=<test() done, defined at <ipython-input-9-3a874803716b>:4> result=None>, <Task finished coro=<test() done, defined at <ipython-input-9-3a874803716b>:4> result=None>}, set())
可見三個任務的間隔時間幾乎忽略不計,這裏要注意可使用await成功掛起的對應應該是下面三種:
原生異步函數(coroutine )
由 types.coroutine() 修飾的生成器,這個生成器能夠返回 coroutine 對象。
包含 __await 方法的對象返回的一個迭代器
因此即便使用saync修飾requests的方法也不支持異步,而是須要專門的異步網絡請求庫aiohttp。
五、aiohttp
aiohttp須要單獨安裝,而後和asyncio庫一塊兒使用,看一下案例
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) import time async def request(): url = "http://www.baidu.com" resulit = await get(url) tasks = [asyncio.ensure_future(request()) for _ in range(10)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) <ClientResponse(http://www.baidu.com) [200 OK]> <CIMultiDictProxy('Bdpagetype': '1', 'Bdqid': '0x94343a8f0000d2ac', 'Cache-Control': 'private', 'Connection': 'Keep-Alive', 'Content-Encoding': 'gzip', 'Content-Type': 'text/html', 'Cxy_all': 'baidu+fcb1f5fc4ea50a8475457d9dba4ffb75', 'Date': 'Fri, 11 Jan 2019 07:20:37 GMT', 'Expires': 'Fri, 11 Jan 2019 07:19:54 GMT', 'P3p': 'CP=" OTI DSP COR IVA OUR IND COM "', 'Server': 'BWS/1.1', 'Set-Cookie': 'BAIDUID=76DA9E559DEFD4059F7858332E63E4CA:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'BIDUPSID=76DA9E559DEFD4059F7858332E63E4CA; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'PSTM=1547191237; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'delPer=0; path=/; domain=.baidu.com', 'Set-Cookie': 'BDSVRTM=0; path=/', 'Set-Cookie': 'BD_HOME=0; path=/', 'Set-Cookie': 'H_PS_PSSID=26525_1426_21079_28132_28266; path=/; domain=.baidu.com', 'Vary': 'Accept-Encoding', 'X-Ua-Compatible': 'IE=Edge,chrome=1', 'Transfer-Encoding': 'chunked')> 1547191237.4161415 <ClientResponse(http://www.baidu.com) [200 OK]> <CIMultiDictProxy('Bdpagetype': '1', 'Bdqid': '0xb19b30e80000e08d', 'Cache-Control': 'private', 'Connection': 'Keep-Alive', 'Content-Encoding': 'gzip', 'Content-Type': 'text/html', 'Cxy_all': 'baidu+6035b8e98737e4cc11dcc73ec79566cc', 'Date': 'Fri, 11 Jan 2019 07:20:37 GMT', 'Expires': 'Fri, 11 Jan 2019 07:19:48 GMT', 'P3p': 'CP=" OTI DSP COR IVA OUR IND COM "', 'Server': 'BWS/1.1', 'Set-Cookie': 'BAIDUID=76DA9E559DEFD405C594443631339D6D:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'BIDUPSID=76DA9E559DEFD405C594443631339D6D; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'PSTM=1547191237; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'delPer=0; path=/; domain=.baidu.com', 'Set-Cookie': 'BDSVRTM=0; path=/', 'Set-Cookie': 'BD_HOME=0; path=/', 'Set-Cookie': 'H_PS_PSSID=26522_1423_21104_28132_28267_22158; path=/; domain=.baidu.com', 'Vary': 'Accept-Encoding', 'X-Ua-Compatible': 'IE=Edge,chrome=1', 'Transfer-Encoding': 'chunked')> 1547191237.417142 <ClientResponse(http://www.baidu.com) [200 OK]> <CIMultiDictProxy('Bdpagetype': '1', 'Bdqid': '0xfdf776e30000dfb4', 'Cache-Control': 'private', 'Connection': 'Keep-Alive', 'Content-Encoding': 'gzip', 'Content-Type': 'text/html', 'Cxy_all': 'baidu+0810232ebbebf660004801978cbc7056', 'Date': 'Fri, 11 Jan 2019 07:20:37 GMT', 'Expires': 'Fri, 11 Jan 2019 07:20:15 GMT', 'P3p': 'CP=" OTI DSP COR IVA OUR IND COM "', 'Server': 'BWS/1.1', 'Set-Cookie': 'BAIDUID=76DA9E559DEFD40584DF85554050AB79:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'BIDUPSID=76DA9E559DEFD40584DF85554050AB79; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'PSTM=1547191237; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'delPer=0; path=/; domain=.baidu.com', 'Set-Cookie': 'BDSVRTM=0; path=/', 'Set-Cookie': 'BD_HOME=0; path=/', 'Set-Cookie': 'H_PS_PSSID=1465_21118_28131_28267_20718; path=/; domain=.baidu.com', 'Vary': 'Accept-Encoding', 'X-Ua-Compatible': 'IE=Edge,chrome=1', 'Transfer-Encoding': 'chunked')> 1547191237.4221385 <ClientResponse(http://www.baidu.com) [200 OK]> <CIMultiDictProxy('Bdpagetype': '1', 'Bdqid': '0x879158430000a46a', 'Cache-Control': 'private', 'Connection': 'Keep-Alive', 'Content-Encoding': 'gzip', 'Content-Type': 'text/html', 'Cxy_all': 'baidu+acdef638e6acee7494d7fce1008c87ca', 'Date': 'Fri, 11 Jan 2019 07:20:37 GMT', 'Expires': 'Fri, 11 Jan 2019 07:20:03 GMT', 'P3p': 'CP=" OTI DSP COR IVA OUR IND COM "', 'Server': 'BWS/1.1', 'Set-Cookie': 'BAIDUID=76DA9E559DEFD40593C8E085477DD125:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'BIDUPSID=76DA9E559DEFD40593C8E085477DD125; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'PSTM=1547191237; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'delPer=0; path=/; domain=.baidu.com', 'Set-Cookie': 'BDSVRTM=0; path=/', 'Set-Cookie': 'BD_HOME=0; path=/', 'Set-Cookie': 'H_PS_PSSID=1448_21109_28131_28267; path=/; domain=.baidu.com', 'Vary': 'Accept-Encoding', 'X-Ua-Compatible': 'IE=Edge,chrome=1', 'Transfer-Encoding': 'chunked')> 1547191237.424138 <ClientResponse(http://www.baidu.com) [200 OK]> <CIMultiDictProxy('Bdpagetype': '1', 'Bdqid': '0xe5c481900000cd70', 'Cache-Control': 'private', 'Connection': 'Keep-Alive', 'Content-Encoding': 'gzip', 'Content-Type': 'text/html', 'Cxy_all': 'baidu+fb1596a42119b92bcb6a321cfd1bde58', 'Date': 'Fri, 11 Jan 2019 07:20:37 GMT', 'Expires': 'Fri, 11 Jan 2019 07:19:51 GMT', 'P3p': 'CP=" OTI DSP COR IVA OUR IND COM "', 'Server': 'BWS/1.1', 'Set-Cookie': 'BAIDUID=76DA9E559DEFD405BD554041F5821AB7:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'BIDUPSID=76DA9E559DEFD405BD554041F5821AB7; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'PSTM=1547191237; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'delPer=0; path=/; domain=.baidu.com', 'Set-Cookie': 'BDSVRTM=0; path=/', 'Set-Cookie': 'BD_HOME=0; path=/', 'Set-Cookie': 'H_PS_PSSID=1448_21105_18560_28132_28266_20719; path=/; domain=.baidu.com', 'Vary': 'Accept-Encoding', 'X-Ua-Compatible': 'IE=Edge,chrome=1', 'Transfer-Encoding': 'chunked')> 1547191237.4261389 <ClientResponse(http://www.baidu.com) [200 OK]> <CIMultiDictProxy('Bdpagetype': '1', 'Bdqid': '0x85ab35690000c4fd', 'Cache-Control': 'private', 'Connection': 'Keep-Alive', 'Content-Encoding': 'gzip', 'Content-Type': 'text/html', 'Cxy_all': 'baidu+15e5fc3bd83c4ffcdf9698e3264f7621', 'Date': 'Fri, 11 Jan 2019 07:20:37 GMT', 'Expires': 'Fri, 11 Jan 2019 07:20:00 GMT', 'P3p': 'CP=" OTI DSP COR IVA OUR IND COM "', 'Server': 'BWS/1.1', 'Set-Cookie': 'BAIDUID=76DA9E559DEFD405C594443631339D6D:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'BIDUPSID=76DA9E559DEFD405C594443631339D6D; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'PSTM=1547191237; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'delPer=0; path=/; domain=.baidu.com', 'Set-Cookie': 'BDSVRTM=0; path=/', 'Set-Cookie': 'BD_HOME=0; path=/', 'Set-Cookie': 'H_PS_PSSID=26522_1423_21104_28132_28267_22158; path=/; domain=.baidu.com', 'Vary': 'Accept-Encoding', 'X-Ua-Compatible': 'IE=Edge,chrome=1', 'Transfer-Encoding': 'chunked')> 1547191237.428144 <ClientResponse(http://www.baidu.com) [200 OK]> <CIMultiDictProxy('Bdpagetype': '1', 'Bdqid': '0x9620ed6b0000f26c', 'Cache-Control': 'private', 'Connection': 'Keep-Alive', 'Content-Encoding': 'gzip', 'Content-Type': 'text/html', 'Cxy_all': 'baidu+a2bfd2645e7c3d7514192a060f9644f5', 'Date': 'Fri, 11 Jan 2019 07:20:37 GMT', 'Expires': 'Fri, 11 Jan 2019 07:20:12 GMT', 'P3p': 'CP=" OTI DSP COR IVA OUR IND COM "', 'Server': 'BWS/1.1', 'Set-Cookie': 'BAIDUID=76DA9E559DEFD4055EFEDF62083FAFD3:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'BIDUPSID=76DA9E559DEFD4055EFEDF62083FAFD3; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'PSTM=1547191237; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'delPer=0; path=/; domain=.baidu.com', 'Set-Cookie': 'BDSVRTM=0; path=/', 'Set-Cookie': 'BD_HOME=0; path=/', 'Set-Cookie': 'H_PS_PSSID=1427_21127_28132_28267; path=/; domain=.baidu.com', 'Vary': 'Accept-Encoding', 'X-Ua-Compatible': 'IE=Edge,chrome=1', 'Transfer-Encoding': 'chunked')> 1547191237.4291408 <ClientResponse(http://www.baidu.com) [200 OK]> <CIMultiDictProxy('Bdpagetype': '1', 'Bdqid': '0x912a1be40000e841', 'Cache-Control': 'private', 'Connection': 'Keep-Alive', 'Content-Encoding': 'gzip', 'Content-Type': 'text/html', 'Cxy_all': 'baidu+599a770e18be144be77bd13c371daf0a', 'Date': 'Fri, 11 Jan 2019 07:20:37 GMT', 'Expires': 'Fri, 11 Jan 2019 07:20:35 GMT', 'P3p': 'CP=" OTI DSP COR IVA OUR IND COM "', 'Server': 'BWS/1.1', 'Set-Cookie': 'BAIDUID=76DA9E559DEFD405106191D066098188:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'BIDUPSID=76DA9E559DEFD405106191D066098188; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'PSTM=1547191237; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'delPer=0; path=/; domain=.baidu.com', 'Set-Cookie': 'BDSVRTM=0; path=/', 'Set-Cookie': 'BD_HOME=0; path=/', 'Set-Cookie': 'H_PS_PSSID=1424_21111_28132_28266; path=/; domain=.baidu.com', 'Vary': 'Accept-Encoding', 'X-Ua-Compatible': 'IE=Edge,chrome=1', 'Transfer-Encoding': 'chunked')> 1547191237.4311435 <ClientResponse(http://www.baidu.com) [200 OK]> <CIMultiDictProxy('Bdpagetype': '1', 'Bdqid': '0x943943940000b92b', 'Cache-Control': 'private', 'Connection': 'Keep-Alive', 'Content-Encoding': 'gzip', 'Content-Type': 'text/html', 'Cxy_all': 'baidu+17014bf10c56f72b235b529f8f9c177b', 'Date': 'Fri, 11 Jan 2019 07:20:37 GMT', 'Expires': 'Fri, 11 Jan 2019 07:20:31 GMT', 'P3p': 'CP=" OTI DSP COR IVA OUR IND COM "', 'Server': 'BWS/1.1', 'Set-Cookie': 'BAIDUID=76DA9E559DEFD40504EF38ED596AEC59:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'BIDUPSID=76DA9E559DEFD40504EF38ED596AEC59; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'PSTM=1547191237; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'delPer=0; path=/; domain=.baidu.com', 'Set-Cookie': 'BDSVRTM=0; path=/', 'Set-Cookie': 'BD_HOME=0; path=/', 'Set-Cookie': 'H_PS_PSSID=1446_21118_28131_26350_28267_22158; path=/; domain=.baidu.com', 'Vary': 'Accept-Encoding', 'X-Ua-Compatible': 'IE=Edge,chrome=1', 'Transfer-Encoding': 'chunked')> 1547191237.4331403 <ClientResponse(http://www.baidu.com) [200 OK]> <CIMultiDictProxy('Bdpagetype': '1', 'Bdqid': '0xfd3e1b1f0000d880', 'Cache-Control': 'private', 'Connection': 'Keep-Alive', 'Content-Encoding': 'gzip', 'Content-Type': 'text/html', 'Cxy_all': 'baidu+39d965c50587bb578c5714a0d732b2e4', 'Date': 'Fri, 11 Jan 2019 07:20:37 GMT', 'Expires': 'Fri, 11 Jan 2019 07:20:25 GMT', 'P3p': 'CP=" OTI DSP COR IVA OUR IND COM "', 'Server': 'BWS/1.1', 'Set-Cookie': 'BAIDUID=76DA9E559DEFD4059A93CF4E300A8EEB:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'BIDUPSID=76DA9E559DEFD4059A93CF4E300A8EEB; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'PSTM=1547191237; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com', 'Set-Cookie': 'delPer=0; path=/; domain=.baidu.com', 'Set-Cookie': 'BDSVRTM=0; path=/', 'Set-Cookie': 'BD_HOME=0; path=/', 'Set-Cookie': 'H_PS_PSSID=1445_21113_28131_28267_22158; path=/; domain=.baidu.com', 'Vary': 'Accept-Encoding', 'X-Ua-Compatible': 'IE=Edge,chrome=1', 'Transfer-Encoding': 'chunked')> 1547191237.4341416
幾個任務的時間之差基本忽略不計,那親測發送一千個請求也就11秒完成,確實很給力。
六、多進程配合使用
asyncio、aiohttp須要配合aiomultiprocess庫使用,版本要求至少3.6,貼上該庫的github上的使用示例,目前還在驗證:
Usage Most of aiomultiprocess mimics the standard multiprocessing module whenever possible, while accounting for places that benefit from async functionality. Executing a coroutine on a child process is as simple as: import asyncio from aiohttp import request from aiomultiprocess import Process async def put(url, params): async with request("PUT", url, params=params) as response: pass async def main(): p = Process(target=put, args=("https://jreese.sh", )) await p asyncio.run(main()) If you want to get results back from that coroutine, Worker makes that available: import asyncio from aiohttp import request from aiomultiprocess import Worker async def get(url): async with request("GET", url) as response: return await response.text("utf-8") async def main(): p = Worker(target=get, args=("https://jreese.sh", )) response = await p asyncio.run(main()) If you want a managed pool of worker processes, then use Pool: import asyncio from aiohttp import request from aiomultiprocess import Pool async def get(url): async with request("GET", url) as response: return await response.text("utf-8") async def main(): urls = ["https://jreese.sh", ...] async with Pool() as pool: result = await pool.map(get, urls) asyncio.run(main())
七、多協程併發
使用loop.run_until_complete(syncio.wait(tasks)) 也可使用 loop.run_until_complete(asyncio.gather(*tasks)) ,前者傳入task列表,會對task進行解包操做。
七、協程嵌套
顧名思義是一個協程中調用另外一個協程,可是涉及到兩個協程函數的結果處理和返回。
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) import time async def request(): url = "http://www.baidu.com" resulit = await get(url) tasks = [asyncio.ensure_future(request()) for _ in range(10000)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
被調用協程返回結果有下列三種方式;
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) async def request(): url = "http://www.baidu.com" tasks = [asyncio.ensure_future(url) for _ in range(1000)] 方式一: dones, pendings = await asyncio.wait(tasks) # 返回future對象,不返回直接結果 for task in dones: print('Task ret: ', task.result()) 方式二: results = await asyncio.gather(*tasks) # 直接返回結果 方式三: for task in asyncio.as_completed(tasks): result = await task print('Task ret: {}'.format(result)) # 迭代方式返回結果 tasks = asyncio.ensure_future(request()) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
八、中止協程任務
實現結束task有兩種方式:關閉單個task、關閉loop,涉及主要函數:
方式一:適用於內嵌協程函數,先取內嵌協程任務 async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: print(response) print(time.time()) async def request(): url = "http://www.baidu.com" tasks = [asyncio.ensure_future(url) for _ in range(1000)] dones, pendings = await asyncio.wait(tasks) task = asyncio.ensure_future(request()) loop = asyncio.get_event_loop() try: loop.run_until_complete(task) except KeyboardInterrupt as e: asyncio.gather(*asyncio.Task.all_tasks()).cancel() loop.stop() loop.run_forever() finally: loop.close() 方式二:適用於無內嵌函數,直接遍歷協程任務 loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close()
線程進程的區別體如今幾個方面:
第一:由於進程擁有獨立的堆棧空間和數據段,因此每當啓動一個新的進程必須分配給它獨立的地址空間,創建衆多的數據表來維護它的代碼段、堆棧段和數據段,這對於多進程來講十分「奢侈」,系統開銷比較大,而線程不同,線程擁有獨立的堆棧空間,可是共享數據段,它們彼此之間使用相同的地址空間,共享大部分數據,比進程更節儉,開銷比較小,切換速度也比進程快,效率高,可是正因爲進程之間獨立的特色,使得進程安全性比較高,也由於進程有獨立的地址空間,一個進程崩潰後,在保護模式下不會對其它進程產生影響,而線程只是一個進程中的不一樣執行路徑。一個線程死掉就等於整個進程死掉。
第二:體如今通訊機制上面,正由於進程之間互不干擾,相互獨立,進程的通訊機制相對很複雜,譬如管道,信號,消息隊列,共享內存,套接字等通訊機制,而線程因爲共享數據段因此通訊機制很方便。。
3.屬於同一個進程的全部線程共享該進程的全部資源,包括文件描述符。而不一樣過的進程相互獨立。
4.線程又稱爲輕量級進程,進程有進程控制塊,線程有線程控制塊;
5.線程一定也只能屬於一個進程,而進程能夠擁有多個線程並且至少擁有一個線程;
第四:體如今程序結構上,舉一個簡明易懂的列子:當咱們使用進程的時候,咱們不自主的使用if else嵌套來判斷pid,使得程序結構繁瑣,可是當咱們使用線程的時候,基本上能夠甩掉它,固然程序內部執行功能單元須要使用的時候仍是要使用,因此線程對程序結構的改善有很大幫助。
進程與線程的選擇取決如下幾點:
一、須要頻繁建立銷燬的優先使用線程;由於對進程來講建立和銷燬一個進程代價是很大的。
二、線程的切換速度快,因此在須要大量計算,切換頻繁時用線程,還有耗時的操做使用線程可提升應用程序的響應
三、由於對CPU系統的效率使用上線程更佔優,因此可能要發展到多機分佈的用進程,多核分佈用線程;
四、並行操做時使用線程,如C/S架構的服務器端併發線程響應用戶的請求;
五、須要更穩定安全時,適合選擇進程;須要速度時,選擇線程更好。
若是父進程死了,子進程依然會進行,父進程變成pid爲1的進程
子進程會在運行時拷貝當前主進程中的全部內容,這也就意味着當一個新的子進程被建立的時候,該子進程就會複製當前模塊,這樣的寫法可能造成無限遞歸式地建立新的子進程。因此爲了不以上狀況發生,咱們在此引入了 if name == ‘main‘
threading.threading
threading.Thread(參數name表示什麼)
t1.join()
t1.setDemon()
同步鎖 lock.acquire() 還有其餘的麼
遞歸鎖,怎麼獲取一個東西是否被鎖 threading.Lock() ,threading.RLock() 裏面維護一個計數器
GIL的概念 怎麼獲取是幾核機器,python最多用一個核,爲了防止公用問題
能夠用進程+協程
multiprocessing.cpu_count() 查看一下multiprocessing的經常使用屬性