python(40)- 進程、線程、協程及IO模型

1、操做系統概念

操做系統位於底層硬件與應用軟件之間的一層。工做方式:向下管理硬件,向上提供接口。html

操做系統進行進程切換:1.出現IO操做;2.固定時間。python

固定時間很短,人感覺不到。每個應用層運行起來的程序都是進程。linux

 

2、進程與線程的概念

2.1 進程

程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程。須要強調的是:同一個程序執行兩次,那也是兩個進程。nginx

進程:資源管理單位(容器)。git

線程:最小執行單位,管理線程的是進程。程序員

 

進程定義:github

進程就是一個程序在一個數據集上的一次動態執行過程。進程通常由程序、數據集、進程控制塊三部分組成。咱們編寫的程序web

用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程當中所須要使用的資源;進程控制塊用來記錄進程的外redis

部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感知進程存在的惟一標誌。算法

 

舉一例說明進程:

想象一位有一手好廚藝的計算機科學家正在爲他的女兒烘製生日蛋糕。他有作生日蛋糕的食譜,廚房裏有所需的原料:麪粉、雞蛋、

糖、香草汁等。在這個比喻中,作蛋糕的食譜就是程序(即用適當形式描述的算法)計算機科學家就是處理器(cpu),而作蛋糕的各類原

料就是輸入數據。進程就是廚師閱讀食譜、取來各類原料以及烘製蛋糕等一系列動做的總和。如今假設計算機科學家的兒子哭着跑了

進來,說他的頭被一隻蜜蜂蟄了。計算機科學家就記錄下他照着食譜作到哪兒了(保存進程的當前狀態),而後拿出一本急救手冊,按

照其中的指示處理蟄傷。這裏,咱們看處處理機從一個進程(作蛋糕)切換到另外一個高優先級的進程(實施醫療救治),每一個進程擁有各

自的程序(食譜和急救手冊)。當蜜蜂蟄傷處理完以後,這位計算機科學家又回來作蛋糕,從他離開時的那一步繼續作下去。

 

2.2 線程

線程的出現是爲了下降上下文切換的消耗,提升系統的併發性,並突破一個進程只能幹同樣事的缺陷,使到進程內併發成爲可

能。假設,一個文本程序,須要接受鍵盤輸入,將內容顯示在屏幕上,還須要保存信息到硬盤中。若只有一個進程,勢必形成同一時

間只能幹同樣事的尷尬(當保存時,就不能經過鍵盤輸入內容)。如有多個進程,每一個進程負責一個任務,進程A負責接收鍵盤輸入的

任務,進程B負責將內容顯示在屏幕上的任務,進程C負責保存內容到硬盤中的任務。這裏進程A,B,C間的協做涉及到了進程通訊問

題,並且有共同都須要擁有的東西——-文本內容,不停的切換形成性能上的損失。如有一種機制,可使任務A,B,C共享資源,這

樣上下文切換所須要保存和恢復的內容就少了,同時又能夠減小通訊所帶來的性能損耗,那就行了。是的,這種機制就是線程。

線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,由線程ID、程序計數器、寄存器集合

和堆棧共同組成。線程的引入減少了程序併發執行時的開銷,提升了操做系統的併發性能。線程沒有本身的系統資源。

 

2.3 進程與線程的關係

在傳統操做系統中,每一個進程有一個地址空間,並且默認就有一個控制線程。

多線程(即多個控制線程)的概念是,在一個進程中存在多個控制線程,控制該進程的地址空間。 

進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是cpu上的執行單位。

進程是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。或者說

進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位。

線程則是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位。

進程和線程的關係:

(1)一個線程只能屬於一個進程,而一個進程能夠有多個線程,但至少有一個線程。

(2)資源分配給進程,同一進程的全部線程共享該進程的全部資源。

(3)CPU分給線程,即真正在CPU上運行的是線程。

 

2.4 並行和併發

不管是並行仍是併發,在用戶看來都是'同時'運行的,而一個cpu同一時刻只能執行一個任務。

並行:同時運行,只有具有多個cpu才能實現並行。

併發:是僞並行,即看起來是同時運行,單個cpu+多道技術。

 

全部現代計算機常常會在同一時間作不少件事,一個用戶的PC(不管是單cpu仍是多cpu),均可以同時運行多個任務(一個任務可

以理解爲一個進程)。當啓動系統時,會祕密啓動許多進程:

    啓動一個進程來殺毒(360軟件)

    啓動一個進程來看電影(暴風影音)

    啓動一個進程來聊天(騰訊QQ)

全部的這些進程都需被管理,因而一個支持多進程的多道程序系統是相當重要的。

 

多道技術:內存中同時存入多道(多個)程序,cpu從一個進程快速切換到另一個,使每一個進程各自運行幾十或幾百毫秒,

這樣,雖然在某一個瞬間,一個cpu只能執行一個任務,但在1秒內,cpu卻能夠運行多個進程,這就給人產生了並行的錯覺,

即僞併發,以此來區分多處理器操做系統的真正硬件並行(多個cpu共享同一個物理內存)。

 

 

 

2.5 同步與異步

同步就是指一個進程在執行某個請求的時候,若該請求須要一段時間才能返回信息,那麼這個進程將會一直等待下去,直到收

到返回信息才繼續執行下去;異步是指進程不須要一直等下去,而是繼續執行下面的操做,無論其餘進程的狀態。當有消息返

回時系統會通知進程進行處理,這樣能夠提升執行的效率。

舉個例子,打電話時就是同步通訊,發短息時就是異步通訊。

 

2.6 進程的建立

  但凡是硬件,都須要有操做系統去管理,只要有操做系統,就有進程的概念,就須要有建立進程的方式,一些操做系統只爲一個應用程序設計,好比微波爐中的控制器,一旦啓動微波爐,全部的進程都已經存在。

  而對於通用系統(跑不少應用程序),須要有系統運行過程當中建立或撤銷進程的能力,主要分爲4中形式建立新的進程

  1. 系統初始化(查看進程linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只在須要時才喚醒的進程,稱爲守護進程,如電子郵件、web頁面、新聞、打印)

  2. 一個進程在運行過程當中開啓了子進程(如nginx開啓多進程,os.fork,subprocess.Popen等)

  3. 用戶的交互式請求,而建立一個新進程(如用戶雙擊暴風影音)

  4. 一個批處理做業的初始化(只在大型機的批處理系統中應用)

  

  不管哪種,新進程的建立都是由一個已經存在的進程執行了一個用於建立進程的系統調用而建立的:

  1. 在UNIX中該系統調用是:fork,fork會建立一個與父進程如出一轍的副本,兩者有相同的存儲映像、一樣的環境字符串和一樣的打開文件(在shell解釋器進程中,執行一個命令就會建立一個子進程)

  2. 在windows中該系統調用是:CreateProcess,CreateProcess既處理進程的建立,也負責把正確的程序裝入新進程。

 

  關於建立的子進程,UNIX和windows

  1.相同的是:進程建立後,父進程和子進程有各自不一樣的地址空間,任何一個進程的在其地址空間中的修改都不會影響到另一個進程。

  2.不一樣的是:在UNIX中,子進程的初始地址空間是父進程的一個副本,提示:子進程和父進程是能夠有隻讀的共享內存區的。可是對於windows系統來講,從一開始父進程與子進程的地址空間就是不一樣的。

 

2.7 進程的終止

  1. 正常退出(自願,如用戶點擊交互式頁面的叉號,或程序執行完畢調用發起系統調用正常退出,在linux中用exit,在windows中用ExitProcess)

  2. 出錯退出(自願,程序員主動拋出異常,例如raise)

  3. 嚴重錯誤(非自願,執行非法指令,如引用不存在的內存,1/0等)

  4. 被其餘進程殺死(非自願,如kill -9)

 

2.8 進程的層次結構

  不管UNIX仍是windows,進程只有一個父進程,不一樣的是:

  1. 在UNIX中全部的進程,都是以init進程爲根,組成樹形結構。父子進程共同組成一個進程組,這樣,當從鍵盤發出一個信號時,該信號被送給當前與鍵盤相關的進程組中的全部成員。

  2. 在windows中,沒有進程層次的概念,全部的進程都是地位相同的,惟一相似於進程層次的暗示,是在建立進程時,父進程獲得一個特別的令牌(稱爲句柄),該句柄能夠用來控制子進程,可是父進程有權把該句柄傳給其餘子進程,這樣就沒有層次了。

 

2.9 進程的狀態

  tail -f access.log |grep '404'

  執行程序tail,開啓一個子進程,執行程序grep,開啓另一個子進程,兩個進程之間基於管道'|'通信,將tail的結果做爲grep的輸入。

  進程grep在等待輸入(即I/O)時的狀態稱爲阻塞,此時grep命令都沒法運行

  其實在兩種狀況下會致使一個進程在邏輯上不能運行,

  1. 進程掛起是自身緣由,遇到I/O阻塞,便要讓出CPU讓其餘進程去執行,這樣保證CPU一直在工做

  2. 與進程無關,是操做系統層面,可能會由於一個進程佔用時間過多,或者優先級等緣由,而調用其餘的進程去使用CPU。

  於是一個進程由三種狀態

 

2.8 進程併發的實現

  進程併發的實如今於,硬件中斷一個正在運行的進程,把此時進程運行的全部狀態保存下來,爲此,操做系統維護一張表格,即進程表(process table),每一個進程佔用一個進程表項(這些表項也稱爲進程控制塊)

  該表存放了進程狀態的重要信息:程序計數器、堆棧指針、內存分配情況、全部打開文件的狀態、賬號和調度信息,以及其餘在進程由運行態轉爲就緒態或阻塞態時,必須保存的信息,從而保證該進程在再次啓動時,就像從未被中斷過同樣。

 

 

 

三 threading模塊

python的多線程:因爲GIL,致使同一時刻同一進程中只能有一個線程運行在一個cpu上,而不能有多個線程同時在一個cpu上運行。

實現多線程的併發須要使用threading模塊。

線程對象的建立

Thread類直接建立 

#多線程的併發,只能是交給一個cpu執行,不能多個cpu執行。即多線程不能實現並行。

#多線程並行方式一:

import threading
import time
def tingge():
    print("聽歌")
    time.sleep(3)
    print("聽歌結束")

def xieboke():
    print("寫博客")
    time.sleep(5)
    print("寫博客結束")
    print(time.time()-s)   #計算整個程序運行時間,不能放在函數外,否則要和另外三個進程競爭,致使其輸出的時間不許確。

s=time.time()
t1=threading.Thread(target=tingge)        #建立聽歌線程,多線程的主進程
t2=threading.Thread(target=xieboke)        #建立寫博客線程,多線程的主進程

t1.start()    #運行聽歌線程,多線程的子進程
t2.start()    #運行寫博客線程,多線程的子進程
print("ending")    #多線程的主進程

--->聽歌
    寫博客
    ending  
    聽歌結束
    寫博客結束
    5.000286340713501

#由於三個線程t1.start(),t2.start() 和print("ending")之間競爭的緣由,print("ending")競爭成功,因此先運行print("ending")這個進程。

#當tingge函數中睡眠更改成8s的話,不考慮小數點的狀況下,真實運行時間爲8s

  

 

Thread類繼承式建立

#調用多線程方式二

import threading
import time

class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)
        self.num=num

    def run(self):
        print("running on number:%s" %self.num)
        time.sleep(3)

t1=MyThread(56)
t2=MyThread(78)

t1.start()              #該進程運行run函數緣由,請查看源碼,一系列的調用最終是調用run函數

t2.start()              #該進程運行run函數緣由,請查看源碼,一系列的調用最終是調用run函數

print("ending")   

--->running on number:56  
    running on number:78
    ending

  

Thread類的實例方法

join()

在子線程完成運行以前,這個子線程的父線程將一直被阻塞。 

 
import threading
from time import ctime,sleep
import time

def Music(name):

        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))

def Blog(title):

        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))


threads = []


t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':

    for t in threads:
        t.start()

    print ("all over %s" %ctime())

--->Begin listening to FILL ME. Mon May  8 17:13:50 2017
    Begin recording the python. Mon May  8 17:13:50 2017  #Music函數和Blog函數同時運行
    all over Mon May  8 17:13:50 2017
    end listening Mon May  8 17:13:53 2017  
    end recording Mon May  8 17:13:55 2017  #總程序執行時間爲5s  

 

t.join():線程對象t未執行完,會阻塞你的主線程 ,但不會阻塞子進程,子進程沒有任何影響。

#添加t1.join()
import threading from time import ctime,sleep def Music(name): print ("Begin listening to {name}. {time}".format(name=name,time=ctime())) sleep(3) print("end listening {time}".format(time=ctime())) def Blog(title): print ("Begin recording the {title}. {time}".format(title=title,time=ctime())) sleep(5) print('end recording {time}'.format(time=ctime())) threads = [] t1 = threading.Thread(target=Music,args=('FILL ME',)) t2 = threading.Thread(target=Blog,args=('python',)) threads.append(t1) threads.append(t2) if __name__ == '__main__': for t in threads: t.start()    #子進程 t1.join() #添加堵塞     print ("all over %s" %ctime())  #主進程 --->Begin listening to FILL ME. Mon May 8 17:17:38 2017 Begin recording the python. Mon May 8 17:17:38 2017  #添加t1.join()狀況下函數Music和Blog函數也是同時進行 end listening Mon May 8 17:17:41 2017 #但只有當子進程Music函數運行完才能運行主進程,因此這裏打印結果與上一個程序順序不一樣 all over Mon May 8 17:17:41 2017 end recording Mon May 8 17:17:43 2017  #整個程序執行時間爲5s

 

t1.join()改爲t2.join()一樣證實會阻塞你的主線程 ,但不會阻塞子進程

#t1.join()改爲t2.join()查看運行結果
import threading
from time import ctime,sleep

def Music(name):
        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))

def Blog(title):
        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))

threads = []

t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    for t in threads:
        t.start()

    t2.join()

    print ("all over %s" %ctime())

--->Begin listening to FILL ME. Mon May  8 17:31:31 2017
    Begin recording the python. Mon May  8 17:31:31 2017
    end listening Mon May  8 17:31:34 2017
    end recording Mon May  8 17:31:36 2017
    all over Mon May  8 17:31:36 2017    #整個程序執行時間爲5s

 

當t.join()在for循環內就不能實現多線程了,沒有意義。

import threading
from time import ctime,sleep

def Music(name):
        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))

def Blog(title):
        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))

threads = []

t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    for t in threads:
        t.start()
        t.join()    

    print ("all over %s" %ctime())

--->Begin listening to FILL ME. Mon May  8 17:39:20 2017
    end listening Mon May  8 17:39:23 2017
    Begin recording the python. Mon May  8 17:39:23 2017    #函數Music和函數Blog不能同時進行
    end recording Mon May  8 17:39:28 2017
    all over Mon May  8 17:39:28 2017        #運行時間8s

 

 

setDaemon()  

將線程聲明爲守護線程,必須在start() 方法調用以前設置,若是不設置爲守護線程程序會被無限掛起。

當咱們在程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程就分兵兩路,分別運行,那麼當主線程完

成。想退出時,會檢驗子線程是否完成。若是子線程未完成,則主線程會等待子線程完成後再退出。

可是有時候咱們須要的是隻要主線程 完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠 用setDaemon方法了。

#主進程結束但子進程未結束,整個程序一樣結束。
import threading
from time import ctime,sleep

def Music(name):
        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))

def Blog(title):
        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))

threads = []

t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    for t in threads:
        t.setDaemon(True)   # 注意:必定在start以前設置
        t.start()

    print ("all over %s" %ctime())

--->Begin listening to FILL ME. Mon May  8 17:48:16 2017
    Begin recording the python. Mon May  8 17:48:16 2017
    all over Mon May  8 17:48:16 2017

只設置t1爲守護線程

import threading
from time import ctime,sleep

def Music(name):
        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))

def Blog(title):
        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))

threads = []

t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    t1.setDaemon(True)  # 注意:必定在start以前設置
    t1.start()
    t2.start()

    print ("all over %s" %ctime())

--->Begin listening to FILL ME. Mon May  8 17:51:47 2017
    Begin recording the python. Mon May  8 17:51:47 2017
    all over Mon May  8 17:51:47 2017
    end listening Mon May  8 17:51:50 2017
    end recording Mon May  8 17:51:52 2017        #由於t1運行時間比較長,因此t1運行完其餘線程也都運行完畢。

只設置t2爲守護線程

import threading
from time import ctime,sleep

def Music(name):
        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("end listening {time}".format(time=ctime()))

def Blog(title):
        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))

threads = []

t1 = threading.Thread(target=Music,args=('FILL ME',))
t2 = threading.Thread(target=Blog,args=('python',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    t2.setDaemon(True)  # 注意:必定在start以前設置
    t1.start()
    t2.start()

    print ("all over %s" %ctime())

--->Begin listening to FILL ME. Mon May  8 17:54:44 2017
    Begin recording the python. Mon May  8 17:54:44 2017
    all over Mon May  8 17:54:44 2017
    end listening Mon May  8 17:54:47 2017   #由於t2進程運行只有3s,而t1進程運行須要5s,因此當t2進程和主進程運行完畢,整個程序就結束,無論t1是否運行完畢。

 

 其它方法

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

 

import threading
from time import ctime,sleep

def Music(name):
        print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
        sleep(3)
        print("線程數:",threading.activeCount())   #threading.activeCount()線程數:3
        print("正在運行的線程:",threading.enumerate())       #正在運行的線程
        print("end listening {time}".format(time=ctime()))

def Blog(title):
        print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
        sleep(5)
        print('end recording {time}'.format(time=ctime()))

threads = []

t1 = threading.Thread(target=Music,args=('FILL ME',),name="sub_thread")     #name="sub_thread"定義線程名
t2 = threading.Thread(target=Blog,args=('python',))

threads.append(t1)
threads.append(t2)

if __name__ == '__main__':
    t2.setDaemon(True)  # 注意:必定在start以前設置
    t1.start()
    t2.start()

    print ("all over %s" %ctime())

--->Begin listening to FILL ME. Wed May 10 15:13:33 2017
    Begin recording the python. Wed May 10 15:13:33 2017
    all over Wed May 10 15:13:33 2017
    線程數: 3
    正在運行的線程: [<_MainThread(MainThread, stopped 9284)>, <Thread(sub_thread, started 10796)>, <Thread(Thread-1, started daemon 10232)>]
    end listening Wed May 10 15:13:36 2017

  

 

GIL(全局解釋器鎖)

'''

定義:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)

'''

GIL加在cpython解釋器中, 其餘的python解釋器不會有GIL。

  

Python中的線程是操做系統的原生線程,Python虛擬機使用一個全局解釋器鎖(Global Interpreter Lock)來互斥線程對Python

虛擬機的使用。爲了支持多線程機制,一個基本的要求就是須要實現不一樣線程對共享資源訪問的互斥,因此引入了GIL。

GIL:在一個線程擁有了解釋器的訪問權以後,其餘的全部線程都必須等待它釋放解釋器的訪問權,即便這些線程的下一條指令並不

會互相影響。

在調用任何Python C API以前,要先得到GIL

GIL缺點:多處理器退化爲單處理器;優勢:避免大量的加鎖解鎖操做

GIL的早期設計

Python支持多線程,而解決多線程之間數據完整性和狀態同步的最簡單方法天然就是加鎖。 因而有了GIL這把超級大鎖,而當愈來愈

多的代碼庫開發者接受了這種設定後,他們開始大量依賴這種特性(即默認python內部對象是thread-safe的,無需在實現時考慮額

外的內存鎖和同步操做)。慢慢的這種實現方式被發現是蛋疼且低效的。但當你們試圖去拆分和去除GIL的時候,發現大量庫代碼開

發者已經重度依賴GIL而很是難以去除了。有多難?作個類比,像MySQL這樣的「小項目」爲了把Buffer Pool Mutex這把大鎖拆分紅

各個小鎖也花了從5.5到5.6再到5.7多個大版爲期近5年的時間,而且仍在繼續。MySQL這個背後有公司支持且有固定開發團隊的產

品走的如此艱難,那又更況且Python這樣核心開發和代碼貢獻者高度社區化的團隊呢?

GIL的影響

不管你啓多少個線程,你有多少個cpu, Python在執行一個進程的時候會淡定的在同一時刻只容許一個線程運行。

因此,python是沒法利用多核CPU實現多線程的。

這樣,python對於計算密集型的任務開多線程的效率甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高

的。

 

計算密集型:一直在使用CPU。

IO密集型:存在大量IO操做。

對於IO密集型任務,python的多線程可以節省時間。

對於計算密集型任務,python的多線程並無用。

 

如下程序爲計算密集型任務:

#單線程即cpu串行狀況下,查看運行時間
#coding:utf8
import time
def cal(n):
    sum=0
    for i in range(n):
       sum+=i

s=time.time()


cal(50000000)
cal(50000000)

print("time",time.time()-s)
--->time 7.72044500541687            #python3串行運行結果
    ('time', 12.600000143051147)      #python2串行運行結果

多線程狀況下運行程序:

import time

def cal(n):
    sum=0
    for i in range(n):
       sum+=i

s=time.time()

import threading

t1=threading.Thread(target=cal,args=(50000000,))
t2=threading.Thread(target=cal,args=(50000000,))

t1.start()
t2.start()
t1.join()
t2.join()


print("time",time.time()-s)
--->time 7.737437728881836          #python3中多線程運行時間
    ('time', 20.12600016593933)        #python2中多線程運行時間

從上述單線程和多線程運行結果來看,無論在python2或者3中運行結果均顯示多線程比單線程運行時間更長。

由於GIL鎖限制你只有一個線程執行,切換進程浪費時間,致使多線程話費時間更多。

python3中時間差不明顯的緣由是由於python3改進了GIL鎖,但根本沒有解決問題。

  

解決方案

1.python使用多核,即開多個進程。

方法一:協程+多進程。使用方法簡單,效率還能夠,通常使用該方法。

    協程yield是你本身寫的,是本身定義何時切換進程。  

方法二:IO多路複用。使用複雜,但效率很高。不經常使用。

 

2.終極思路:換C模塊實現多線程,即換一個python解釋器,或者換門編程語言避免GIL鎖。

 

多進程:

用multiprocessing替代Thread multiprocessing庫的出現很大程度上是爲了彌補thread庫由於GIL而低效的缺陷。它完整的複製了

一套thread所提供的接口方便遷移。惟一的不一樣就是它使用了多進程而不是多線程。每一個進程有本身的獨立的GIL,所以也不會出現

進程之間的GIL爭搶。

#coding:utf8
from multiprocessing import Process
import time

def counter():
    i = 0
    for _ in range(40000000):
        i = i + 1

    return True

def main():

    l=[]
    start_time = time.time()

    for _ in range(2):
        t=Process(target=counter)
        t.start()
        l.append(t)
        #t.join()

    for t in l:
       t.join()

    end_time = time.time()
    print("Total time: {}".format(end_time - start_time))

if __name__ == '__main__':
    main()


'''

py2.7:
     串行:6.1565990448 s
     並行:3.1639978885 s

py3.5:
     串行:6.556925058364868 s
     併發:3.5378448963165283 s

'''

  

固然multiprocessing也不是萬能良藥。它的引入會增長程序實現時線程間數據通信和同步的困難。就拿計數器來舉例子,若是咱們

要多個線程累加同一個變量,對於thread來講,申明一個global變量,用thread.Lock的context包裹住三行就搞定了。而

multiprocessing因爲進程之間沒法看到對方的數據,只能經過在主線程申明一個Queue,put再get或者用share memory的方法。

這個額外的實現成本使得原本就很是痛苦的多線程程序編碼,變得更加痛苦了。

總結:由於GIL的存在,只有IO Bound場景下得多線程會獲得較好的性能 - 若是對並行計算性能較高的程序能夠考慮把核心部分也成

C模塊,或者索性用其餘語言實現 - GIL在較長一段時間內將會繼續存在,可是會不斷對其進行改進。

因此對於GIL,既然不能反抗,那就學會去享受它吧!

 

 

同步鎖 (Lock)

import time
import threading

def addNum():
    global num #在每一個線程中都獲取這個全局變量
    num-=1

num = 100  #設定一個共享變量

thread_list = []

for i in range(100):            
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待全部線程執行完畢
    t.join()
print('Result: ', num)

--->Result:0    #for循環循環進行了100次addNum函數,由於速度很快,100個線程同時競爭運行addNum函數,num減去100次1,因此結果爲0

修改addNum中的代碼:

#睡眠時間較長時

import time
import threading

def addNum():
    global num #在每一個線程中都獲取這個全局變量
    temp=num
    time.sleep(1)
    num =temp-1  # 對此公共變量進行-1操做

num = 100  #設定一個共享變量
thread_list = []
for i in range(100):            #循環進行了100次addNum函數
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list: #等待全部線程執行完畢
    t.join()
print('Result: ', num)

--->99

#100個線程同時競爭運行函數,睡眠1s確定夠100個進程運行到同時處於睡眠的狀態,第一個競爭到的確定率先醒來速度極快計算完,num=99,線程2醒來從上面攜帶的global num=100一樣計算num=99

 

#睡眠時間較短時
import time
import threading

def addNum():
    global num #在每一個線程中都獲取這個全局變量
    temp=num
    time.sleep(0.001)
    num =temp-1  # 對此公共變量進行-1操做

num = 100  #設定一個共享變量
thread_list = []

for i in range(100):            #循環進行了100次addNum函數
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)

for t in thread_list: #等待全部線程執行完畢
    t.join()

print('Result: ', num)

--->85或者86或者87...

#每次執行程序結果都不一樣。100個線程由於GIL大鎖的緣由競爭運行函數,for循環第一次時線程1率先運行函數,線程1最快運行到time.sleep(0.001)睡眠時,GIL釋放,線程1還未運行完addNum函數。for循環了2,3...次,線程2,3...競爭到運行函數,假設線程1 醒來時不知道有多少個線程在同時運行函數,當線程1計算num值,num值改變了,改變後的num值對在函數中的線程2,3...計算時num已經不是100的初始值了,num值因爲一直不停的有線程進入一直在改變。並且線程1睡眠時不知道有多少個線程同時在睡眠,最後的結果確定也不一樣。

 

上述就是線程安全問題,數據不可控,不安全,解決方法就是再建立一把鎖。

 

鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法

來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放

鎖:

#注意獲取鎖和釋放鎖的位置
import time
import threading

def addNum():
    global num
    lock.acquire()      #獲取這把鎖
    temp=num
    time.sleep(0.01)
    num =temp-1
    lock.release()      #釋放這把鎖

num = 100
thread_list = []

lock=threading.Lock()       #建立一把鎖

for i in range(100):
    t = threading.Thread(target=addNum)
    t.start()
    thread_list.append(t)
for t in thread_list:
    t.join()
print('Result: ', num)

--->Result:  0

上鎖的做用是這個線程未結束其餘線程沒法競爭,只能等,是一個串行,運行時間爲0.001s*100次。

但與join()不一樣的是:join()是整個程序是串行的,上鎖的話只有公共數據部分加鎖,是串行的,程序其餘內容仍是並行的。

 

但上鎖後的程序頗有可能會出現死鎖的狀況。

 

 

死鎖與遞歸鎖

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。

import threading
import time

mutexA = threading.Lock()
mutexB = threading.Lock()

class MyThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.fun1()
        self.fun2()

    def fun1(self):

        mutexA.acquire()  # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放

        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        mutexB.release()
        mutexA.release()


    def fun2(self):

        mutexB.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
        time.sleep(0.2)

        mutexA.acquire()
        print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":

    print("start---------------------------%s"%time.time())

    for i in range(0, 10):
        my_thread = MyThread()
        my_thread.start()

出現死鎖不停競爭,程序卡住。

 

解決方法:

在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:

rl = threading.RLock()  # 遞歸鎖

rl.acquire()  # 上鎖 計數+1 counter=1
rl.acquire()  # 上鎖 計數+1 counter=2
...
rl.release()  # 解鎖 計數-1 counter=1
rl.release()  # 解鎖 計數-1 counter=0

counter記錄了acquire的次數,直到一個線程全部的acquire都被release,即count爲0時,其餘線程才能夠訪問該資源。

import threading
import time

Rlock = threading.RLock()


class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        Rlock.acquire()  # 若是鎖被佔用,則阻塞在這裏,等待鎖的釋放 counter = 1
        print('I am %s ,get res: %s --- %s ' % (self.name, 'ResA', time.time()))

        Rlock.acquire()  # counter = 2
        print('I am %s ,get res: %s --- %s ' % (self.name, 'ResB', time.time()))
        Rlock.release()  # counter = 1

        Rlock.release()  # counter = 0

    def func2(self):
        Rlock.acquire()  # counter = 1
        print('I am %s ,get res: %s --- %s ' % (self.name, 'ResB', time.time()))
        time.sleep(0.2)

        Rlock.acquire()  # counter = 2
        print('I am %s ,get res: %s --- %s ' % (self.name, 'ResA', time.time()))
        Rlock.release()  # counter = 1

        Rlock.release()  # counter = 0

if __name__ == '__main__':
    print('start ----------- %s'%time.time())

    for i in range(0,10):

        mt = MyThread()
        mt.start()

 

 

同步條件 Event對象

線程之間的通訊做用

線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一

步的操做,這時線程同步問題就 會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由

線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個

Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌

設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼

續執行。

event.isSet():返回event的狀態值,False或True;

event.wait():若是 event.isSet()==False將阻塞線程,能夠加參數,表示等待秒數;

event.set(): 設置event的狀態值將爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;

event.clear():恢復event的狀態值爲False。

    

 

能夠考慮一種應用場景(僅僅做爲說明),例如,咱們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去鏈接Redis的服務,通常狀況下,若是Redis鏈接不成功,在各個線程的代碼中,都會去嘗試從新鏈接。若是咱們想要在啓動時確保Redis服務正常,才讓那些工做線程去鏈接Redis服務器,那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做:主線程中會去嘗試鏈接Redis服務,若是正常的話,觸發事件,各工做線程會嘗試鏈接Redis服務。

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',)

def worker(event):
    logging.debug('Waiting for redis ready...')
    event.wait()
    logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime())
    time.sleep(1)

def main():
    readis_ready = threading.Event()
    t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2')
    t2.start()

    logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event')
    time.sleep(3) # simulate the check progress
    readis_ready.set()

if __name__=="__main__":
    main()



---------------------------------

(t1        ) Waiting for redis ready...
(t2        ) Waiting for redis ready...
(MainThread) first of all, check redis server, make sure it is OK, and then trigger the redis ready event
(t1        ) redis ready, and connect to redis server and do some work [Mon Oct 23 18:20:53 2017]
(t2        ) redis ready, and connect to redis server and do some work [Mon Oct 23 18:20:53 2017]

  

threading.Event的wait方法還接受一個超時參數,默認狀況下若是事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數以後,若是阻塞時間超過這個參數設定的值以後,wait方法會返回。對應於上面的應用場景,若是Redis服務器一致沒有啓動,咱們但願子線程可以打印一些日誌來不斷地提醒咱們當前沒有一個能夠鏈接的Redis服務,咱們就能夠經過設置這個超時參數來達成這樣的目的:

import logging
import time
import threading

logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', )

def worker(event):
    logging.debug('等待redis準備…')

    while not event.isSet():
        logging.debug('等待鏈接...')
        event.wait(3)  # if flag = False阻塞,等待flag = True 繼續執行

    logging.debug('redis準備好,並鏈接到redis服務器和作一些工做 %s', time.ctime())
    time.sleep(1)


def main():
    r = threading.Event()  # flag = False

    t1 = threading.Thread(target=worker, args=(r,), name='t1')
    t1.start()

    t2 = threading.Thread(target=worker, args=(r,), name='t2')
    t2.start()

    logging.debug('首先,檢查redis服務器,確保它是OK,而後觸發複述事件作好準備')

    time.sleep(6)
    r.set()  # flag = True


if __name__ == '__main__':
    main()


-------------------------------------------------------

(t1        ) 等待redis準備…
(t1        ) 等待鏈接...
(t2        ) 等待redis準備…
(MainThread) 首先,檢查redis服務器,確保它是OK,而後觸發複述事件作好準備
(t2        ) 等待鏈接...
(t1        ) 等待鏈接...
(t2        ) 等待鏈接...
(t1        ) redis準備好,並鏈接到redis服務器和作一些工做 Mon Oct 23 18:23:20 2017
(t2        ) redis準備好,並鏈接到redis服務器和作一些工做 Mon Oct 23 18:23:20 2017

這樣,咱們就能夠在等待Redis服務啓動的同時,看到工做線程里正在等待的狀況。

 

 

Semaphore(信號量)

同時只有n個線程能夠得到semaphore,便可以限制最大鏈接數爲n)

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。

實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):

import threading
import time

semaphore = threading.Semaphore(5)


def func():
    if semaphore.acquire():
        print(threading.currentThread().getName() + ' get semaphore',time.ctime())
        time.sleep(2)
        semaphore.release()


for i in range(20):
    t1 = threading.Thread(target=func)
    t1.start()


------------------------------------------------------------------------------------------

Thread-1 get semaphore Mon Oct 23 18:44:40 2017
Thread-2 get semaphore Mon Oct 23 18:44:40 2017
Thread-3 get semaphore Mon Oct 23 18:44:40 2017
Thread-4 get semaphore Mon Oct 23 18:44:40 2017
Thread-5 get semaphore Mon Oct 23 18:44:40 2017
Thread-8 get semaphore Mon Oct 23 18:44:42 2017
Thread-9 get semaphore Mon Oct 23 18:44:42 2017
Thread-7 get semaphore Mon Oct 23 18:44:42 2017
Thread-6 get semaphore Mon Oct 23 18:44:42 2017
Thread-10 get semaphore Mon Oct 23 18:44:42 2017
Thread-13 get semaphore Mon Oct 23 18:44:44 2017
Thread-11 get semaphore Mon Oct 23 18:44:44 2017
Thread-12 get semaphore Mon Oct 23 18:44:44 2017
Thread-15 get semaphore Mon Oct 23 18:44:44 2017
Thread-14 get semaphore Mon Oct 23 18:44:44 2017
Thread-16 get semaphore Mon Oct 23 18:44:46 2017
Thread-18 get semaphore Mon Oct 23 18:44:46 2017
Thread-17 get semaphore Mon Oct 23 18:44:46 2017
Thread-19 get semaphore Mon Oct 23 18:44:46 2017
Thread-20 get semaphore Mon Oct 23 18:44:46 2017 

20個線程同時獲取,但每次只能運行5個線程,因此運行程序顯示的結果是5個5個的打印出來。

 

 

multiprocessing模塊

因爲GIL的存在,Python不存在多線程,要充分利用多核資源,就須要使用多進程。

multiprocessing模塊是Python中的多進程管理包。

經過multiprocessing.Process對象來建立一個進程,Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。

multiprocessing與threading同樣,調用同一套API。

 

python的進程調用

# Process類調用
 
from multiprocessing import Process
import time
 
def f(name):
    print('hello', name, time.ctime())
    time.sleep(1)
 
if __name__ == '__main__':
    p_list = []
    for i in range(3):
        p = Process(target=f, args=('xuyaping:%s' % i,))
        p_list.append(p)
        p.start()
    for i in p_list:
        p.join()
    print('end')

------------------------------------------------------------------------

hello xuyaping:0 Mon Oct 23 18:57:11 2017
hello xuyaping:2 Mon Oct 23 18:57:11 2017
hello xuyaping:1 Mon Oct 23 18:57:11 2017
end

 

# 繼承Process類調用
from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        # self.name = name

    def run(self):
        print('hello', self.name, time.ctime())
        time.sleep(1)

if __name__ == '__main__':
    p_list = []
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()
    print('end')

-----------------------------------------------------

hello MyProcess-1 Mon Oct 23 18:59:08 2017
hello MyProcess-2 Mon Oct 23 18:59:08 2017
hello MyProcess-3 Mon Oct 23 18:59:08 2017
end

  

  

 

process類

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前尚未實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程準備就緒,等待CPU調度

  run():strat()調用run方法,若是實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():無論任務是否完成,當即中止工做進程

屬性:

  daemon:和線程的setDeamon功能同樣

  name:進程名字。

  pid:進程號。

from multiprocessing import Process
import os
import time
def info(name):


    print("name:",name)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("------------------")
    time.sleep(1)

def foo(name):

    info(name)

if __name__ == '__main__':

    info('main process line')


    p1 = Process(target=info, args=('xuyaping',))
    p2 = Process(target=foo, args=('egon',))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("ending")



# 輸出:
name: main process line
parent process: 9900
process id: 13264
------------------
name: xuyaping
name: egon
parent process: 13264
process id: 13720
------------------
parent process: 13264
process id: 20128
------------------
ending

經過tasklist(Win)或者ps -elf |grep(linux)命令檢測每個進程號(PID)對應的進程名

 

 

 

進程間通信 

進程隊列Queue

from multiprocessing import Process, Queue
import queue

def f(q,n):
    #q.put([123, 456, 'hello'])
    q.put(n*n+1)
    print("son process",id(q))

if __name__ == '__main__':
    q = Queue()  #try: q=queue.Queue()
    print("main process",id(q))

    for i in range(3):
        p = Process(target=f, args=(q,i))
        p.start()

    print(q.get())
    print(q.get())
    print(q.get())

--------------------------------------------

main process 43659448
son process 44854072
son process 45116216
5
2
son process 44985144
1

  

管道(pipe)

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([12, {"name":"xyp"}, 'hello'])
    response=conn.recv()
    print("response",response)
    conn.close()
    print("q_ID2:",id(child_conn))

if __name__ == '__main__':

    parent_conn, child_conn = Pipe()
    print("q_ID1:",id(child_conn))
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    parent_conn.send("兒子你好!")
    p.join()

  

Pipe()返回的兩個鏈接對象表明管道的兩端。 每一個鏈接對象都有send()和recv()方法(等等)。 請注意,若是兩個進程(或線程)嘗試同時讀取或寫入管道的同一端,管道中的數據可能會損壞。

 

 

manager

Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另外一個進程的數據

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

from multiprocessing import Process, Manager

def f(d, l,n):

    d[n] = n
    d["name"] ="xuyaping"
    l.append(n)

    #print("l",l)

if __name__ == '__main__':

    with Manager() as manager:

        d = manager.dict()

        l = manager.list(range(5))
        p_list = []

        for i in range(10):
            p = Process(target=f, args=(d,l,i))
            p.start()
            p_list.append(p)

        for res in p_list:
            res.join()

        print(d)
        print(l)


------------------------------------------------

{1: 1, 'name': 'xuyaping', 0: 0, 4: 4, 3: 3, 6: 6, 5: 5, 2: 2, 8: 8, 7: 7, 9: 9}
[0, 1, 2, 3, 4, 1, 0, 4, 3, 6, 5, 2, 8, 7, 9]

  

進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

from multiprocessing import Pool
import time

def foo(args):
 time.sleep(1)
 print(args)

if __name__ == '__main__':
 p = Pool(5)
 for i in range(30):
     p.apply_async(func=foo, args= (i,))

 p.close()   # 等子進程執行完畢後關閉線程池
 # time.sleep(2)
 # p.terminate()  # 馬上關閉線程池
 p.join()

進程池內部維護一個進程序列,當使用時,去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進程,那麼程序就會等待,直到進程池中有可用進程爲止。

進程池中有如下幾個主要方法:

  1. apply:從進程池裏取一個進程並執行
  2. apply_async:apply的異步版本
  3. terminate:馬上關閉線程池
  4. join:主進程等待全部子進程執行完畢,必須在close或terminate以後
  5. close:等待全部進程結束後,才關閉線程池

 

 

yield和協程

1.因爲是單線程,不能再切換
2.再也不有任何鎖的概念

import time

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] ←← Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'

def produce(c):
    next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] →→ Producing %s...' % n)
        cr = c.send(n)
        print('[PRODUCER] Consumer return: %s' % cr)

    c.close()

if __name__ == '__main__':
    c = consumer()
    produce(c)


---------------------------------------------------------------------

[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK

  

 

1.greenlet

方便手動切換

greenlet機制的主要思想是:生成器函數或者協程函數中的yield語句掛起函數的執行,直到稍後使用next()或send()操做進行恢復爲止。可使用一個調度器循環在一組生成器函數之間協做多個任務。greenlet是python中實現咱們所謂的"Coroutine(協程)"的一個基礎庫.

from greenlet import greenlet
 
def test1():
    print (12)
    gr2.switch()
    print (34)
    gr2.switch()
 
def test2():
    print (56)
    gr1.switch()
    print (78)
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

------------------------------

12
56
34
78

  

 

2.gevent

自動切換

gevent是第三方庫,經過greenlet實現協程

當一個greenlet遇到IO操做時,好比訪問網絡,就自動切換到其餘的greenlet,等到IO操做完成,再在適當的時候切換回來繼續執行。因爲IO操做很是耗時,常常使程序處於等待狀態,有了gevent爲咱們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

import gevent
import time

def foo():
    print("running in foo")
    gevent.sleep(2)
    print("switch to foo again")

def bar():
    print("switch to bar")
    gevent.sleep(5)
    print("switch to bar again")

start=time.time()

gevent.joinall(
    [gevent.spawn(foo),
    gevent.spawn(bar)]
)

print(time.time()-start)

--------------------------------------------

running in foo
switch to bar
switch to foo again
switch to bar again
5.010286569595337

  

 

因爲切換是在IO操做時自動完成,因此gevent須要修改Python自帶的一些標準庫,這一過程在啓動時經過monkey patch完成:

from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)
    data = resp.read()
    print('%d bytes received from %s.' % (len(data), url))

start=time.time()

gevent.joinall([
        gevent.spawn(f, 'https://itk.org/'),
        gevent.spawn(f, 'https://www.github.com/'),
        gevent.spawn(f, 'https://zhihu.com/'),
])

# f('https://itk.org/')
# f('https://www.github.com/')
# f('https://zhihu.com/')

print(time.time()-start)

------------------------------------------------------------------------

GET: https://itk.org/
GET: https://www.github.com/
GET: https://zhihu.com/
11785 bytes received from https://zhihu.com/.
12221 bytes received from https://itk.org/.
51166 bytes received from https://www.github.com/.
4.193239688873291

  

 

 

IO模型


經常使用模型分爲4種:
1.阻塞IO
2.非阻塞IO
3.IO多路複用
4.異步IO

不經常使用
驅動信號

阻塞IO和非阻塞IO:
阻塞IO:進程不能作其餘的事情
非阻塞IO:等待數據無阻塞

同步IO和異步IO:
有阻塞就是同步IO,因此,
阻塞IO、非阻塞IO、IO多路複用是同步IO
異步IO是異步IO

阻塞IO

全程阻塞,無論是等待數據或者是從內核態拷貝數據到用戶態

系統調用兩個階段:
wait for data 阻塞
copy data 阻塞

非阻塞IO

setblocking(False) 設置阻塞狀態爲非阻塞
固定時間循環發起系統調用,請求不到作本身的事情,等待下次請求,內核態拷貝數據到用戶態須要等待

系統調用兩個階段:
wait for data 非阻塞
copy data 阻塞

優勢:等待數據無阻塞
缺點:系統調用發送太多;數據不是即時接收的

IO多路複用

全程阻塞,監聽多個連接
系統調用select完成wait for data工做

系統調用兩個階段:
wait for data 阻塞
copy data 阻塞

特色:監聽多個文件描述符,實現併發

r,w,e = select.select([sock,],[],[]) #等待連接 
for obj in r:
    conn,addr = obj.accept()

 

inputs = [sock,] 
r,w,e = select.select(inputs,[],[]) #inputs監聽有變化的套接字 inputs=[sock,conn1,conn2,...]
for obj in r: #第一次[cock,] 第二次[conn1,]
    if obj == sock
        conn,addr = obj.accept()
        inputs.append(conn) #inputs=[sock,conn1,conn2]
    else:
        data = obj.recv(1024)

    

對於文件描述符(socket套接字):
1.每個套接字對象的本質就是一個非零整數,不會變(fb=4)

<socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080), raddr=('127.0.0.1', 51963)>

   

2.收發數據的時候,對於接收端而言,數據先到內核空間,而後copy到用戶空間,同時內核空間的數據被清空

3.根據TCP協議,當接發送端接收到接收端的確認信息後,清空內核空間的數據,不然不清空

異步IO

全程無阻塞,實現複雜

 

 

 

 

 

 

  • IO模型 - IO多路複用實現機制
  • IO模型 - selectors模塊
  • threading模塊 - 隊列queue

IO多路複用實現機制


IO多路複用機制:就是單個process能夠同時處理多個網絡鏈接的IO,基本原理就是經過select/epoll函數不斷輪詢所負責的全部socket,當某個socket有數據到達,就通知用戶進程。

不一樣的操做系統提供的函數不一樣:
    windows系統: select
    linux系統: select、poll、epoll

  

簡單介紹select、poll、epoll三者的特色:

select的缺點有如下三點,會致使效率降低:
    1.每次調用select都要將全部的fd(文件描述符),copy到你的內核空間
    2.遍歷全部的fd,是否有數據訪問
    3.最大鏈接數(1024),超出連接再也不監聽
    
poll:
    與select同樣,只是最大鏈接數沒有限制

epoll不一樣於select和poll只有一個函數,epoll經過三個函數實現實現輪詢socket:
    1.第一個函數:建立epoll句柄:將全部的fd(文件描述符),copy到你的內核空間,只copy一次
    2.回調函數:爲全部fd綁定一個回調函數,一旦有數據訪問,觸發回調函數,回調函數將fd放入一個鏈表中(回調函數:某一個函數或者某一個動做,成功完成以後,會觸發的函數)
    3.第三個函數:判斷鏈表是否爲空
    epoll最大鏈接數沒有上線

  

selectors模塊


selectors基於select模塊實現IO多路複用,調用語句selectors.DefaultSelector(),特色是根據平臺自動選擇最佳IO多路複用機制,調用順序:epoll > poll > select

import selectors
import socket


def accept(sock, mask):
    conn, addr = sock.accept()
    sel.register(conn, selectors.EVENT_READ, read)  # 將conn和read函數註冊到一塊兒,當conn有變化時執行read函數


def read(conn, mask):
    try:
        data = conn.recv(1000)
        print(data.decode('utf8'))
        inputs = input('>>:').strip()
        conn.send(inputs.encode('utf8'))
    except Exception:
        sel.unregister(conn)
        conn.close()


sock = socket.socket()
sock.bind(('127.0.0.1', 8080))
sock.listen(100)
sock.setblocking(False)  # 設置爲非阻塞IO

sel = selectors.DefaultSelector()  # 根據平臺自動選擇最佳IO多路複用機制
sel.register(sock, selectors.EVENT_READ, accept)  # 將sock和accept函數註冊到一塊兒,當sock有變化時執行accept函數

while True:
    events = sel.select()  # 監聽  [(key1,mask1),(key2),(mask2)]
    for key, mask in events:
        func = key.data  # 1 key.data就是accept   # 2 key.data就是read
        obj = key.fileobj  # 1 key.fileobj就是sock   # 2 key.fileobj就是conn

        func(obj, mask)  # 1 accept(sock,mask)   # 2read(conn,mask)

  

 

隊列queue


隊列與線程(和進程)有關,保證多線程信息交換的安全。
隊列是一種數據類型(數據結構),可用於存放數據建立隊列語法queue.Queue(),默認是先進先出(FIFO)。

隊列的優勢:保證線程安全

get與put方法

import queue

q = queue.Queue() #建立隊列對象q

q.put(123) #將123放入隊列中
q.put('hello')

q.get() #將第一個值從隊列中取出

  

join和task_done方法

join()阻塞進程,直到全部任務都完成,須要配合另外一個方法task_done()
task_done() 表示某個任務完成。每一條get語句後須要一條task_done。

import queue

q = queue.Queue(5)
q.put(10)
q.put(20)
print(q.get())
q.task_done()
print(q.get())
q.task_done()

q.join()

print("ending!")

  

其餘模式

先進後出:queue.LifoQueue()後進先出(LIFO)

優先級:queue.PriorityQueue()優先級高先出

q.put([1,‘123’]) #1爲有限等級,越小越優先

  

生產者消費者模型

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師作好菜,不須要直接和客戶交流,而是交給前臺,而客戶去飯菜也不須要不找廚師,直接去前臺領取便可,這也是一個結耦的過程。

import time,random
import queue,threading

q = queue.Queue()

def Producer(name):
  count = 0
  while count <10:
    print("making........")
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
    #q.task_done()
    #q.join()
    print("ok......")
def Consumer(name):
  count = 0
  while count <10:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        #q.task_done()
        #q.join()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1

p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
# c2 = threading.Thread(target=Consumer, args=('C',))
# c3 = threading.Thread(target=Consumer, args=('D',))
p1.start()
c1.start()
# c2.start()
# c3.start()

-----------------------------------------------------------------------------------

making........
Producer A has produced 0 baozi..
ok......
making........
Producer A has produced 1 baozi..
ok......
making........
Producer A has produced 2 baozi..
ok......
making........
0
Consumer B has eat 0 baozi...
Producer A has produced 3 baozi..
ok......
making........
Producer A has produced 4 baozi..
ok......
making........
Producer A has produced 5 baozi..
1
Consumer B has eat 1 baozi...
ok......
making........
2
Consumer B has eat 2 baozi...
3
Consumer B has eat 3 baozi...
4
Consumer B has eat 4 baozi...
5
Consumer B has eat 5 baozi...
Producer A has produced 6 baozi..
ok......
making........
6
Consumer B has eat 6 baozi...
Producer A has produced 7 baozi..
ok......
making........
Producer A has produced 8 baozi..
ok......
making........
Producer A has produced 9 baozi..
ok......
7
Consumer B has eat 7 baozi...
8
Consumer B has eat 8 baozi...
9
Consumer B has eat 9 baozi...

  

 

總結:

進程:最小的資源管理單位(盛放線程的容器)

線程:最小的執行單位

串行、並行、併發

cpython由於存在GIL致使,同一時刻,同一進程只能有一個線程執行

關於daemon:程序直到不存在非守護線程時退出

同步鎖:因爲多線程處理公共數據

遞歸鎖

event:一個對象,讓多個進程間通訊

相關文章
相關標籤/搜索