咱們都知道windows是支持多任務的操做系統。html
什麼叫「多任務」呢?簡單地說,就是操做系統能夠同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽MP3,一邊在用Word趕做業,這就是多任務,至少同時有3個任務正在運行。還有不少任務悄悄地在後臺同時運行着,只是桌面上沒有顯示而已。python
如今,多核CPU已經很是普及了,可是,即便過去的單核CPU,也能夠執行多任務。因爲CPU執行代碼都是順序執行的,那麼,單核CPU是怎麼執行多任務的呢?linux
答案就是操做系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每一個任務都是交替執行的,可是,因爲CPU的執行速度實在是太快了,咱們感受就像全部任務都在同時執行同樣。nginx
真正的並行執行多任務只能在多核CPU上實現,可是,因爲任務數量遠遠多於CPU的核心數量,因此,操做系統也會自動把不少任務輪流調度到每一個核心上執行。web
對於操做系統來講,一個任務就是一個進程(Process),好比打開一個瀏覽器就是啓動一個瀏覽器進程,打開一個記事本就啓動了一個記事本進程,打開兩個記事本就啓動了兩個記事本進程,打開一個Word就啓動了一個Word進程。編程
有些進程還不止同時幹一件事,好比Word,它能夠同時進行打字、拼寫檢查、打印等事情。在一個進程內部,要同時幹多件事,就須要同時運行多個「子任務」,咱們把進程內的這些「子任務」稱爲線程(Thread)。json
因爲每一個進程至少要幹一件事,因此,一個進程至少有一個線程。固然,像Word這種複雜的進程能夠有多個線程,多個線程能夠同時執行,多線程的執行方式和多進程是同樣的,也是由操做系統在多個線程之間快速切換,讓每一個線程都短暫地交替運行,看起來就像同時執行同樣。固然,真正地同時執行多線程須要多核CPU纔可能實現。windows
咱們前面編寫的全部的Python程序,都是執行單任務的進程,也就是隻有一個線程。若是咱們要同時執行多個任務怎麼辦?有兩種解決方案:api
固然還有第三種方法,就是啓動多個進程,每一個進程再啓動多個線程,這樣同時執行的任務就更多了,固然這種模型更復雜,實際不多采用。瀏覽器
總結一下就是,多任務的實現有3種方式:
同時執行多個任務一般各個任務之間並非沒有關聯的,而是須要相互通訊和協調,有時,任務1必須暫停等待任務2完成後才能繼續執行,有時,任務3和任務4又不能同時執行,因此,多進程和多線程的程序的複雜度要遠遠高於咱們前面寫的單進程單線程的程序。
Python既支持多進程,又支持多線程。
正在進行的一個過程或者說一個任務。而負責執行任務的則是CPU
因爲如今計算計算機都是多任務同時進行的,好比:打開了QQ,而後聽着音樂,後面下載者片兒,那麼這些都是怎麼完成的呢?答案是經過多進程。操做系統會對CPU的時間進行規劃,每一個進程執行一個任務(功能),CPU會快速的在這些進行之間進行切換已達到同時進行的目的(單核CPU的狀況)
程序:一堆代碼的集合體。 進程:指的是程序運行的過程。 注意的是:一個程序執行兩次,那麼會產生兩個互相隔離的進程。
並行:同時運行,只有具有多個CPU才能實現並行 併發:是僞並行,即看起來是同時運行。單個CPU+多道技術就能夠實現併發。(並行也屬於併發)
同步指一個進程在執行某個請求的時候,若該請求須要一段時間才能返回信息,那麼這個進程將會一直等待下去,直到返回西南喜才繼續執行下去。 異步是指進程不須要一直等下去,而是繼續執行下面的操做,無論其餘進程的狀態。當有消息返回時系統會通知進程處理,這樣能夠提升執行的效率。 例子:打電話就是同步,發短信就是異步
主要分爲4種: 一、系統初始化:(查看進程Linux中用ps命令,windows中用任務管理器,前臺進程負責與用戶交互,後臺運行的進程與用戶無關,運行在後臺而且只有在須要時才喚醒的進程,成爲守護進程,如電子郵件,web頁面,新聞,打印等) 二、一個進程在運行過程當中開啓了子進程(如nginx開啓多線程,操做系統os.fork(),subprocess.Popen等) 三、用戶的交互請求,而建立一個新的進程(如用戶雙擊QQ) 四、一個批處理做業的開始(只在大型批處理系統中應用)
以上四種其實都是由一個已經存在了的進程執行了一個用於建立進程的系統調用而建立的。
fork()
調用一次,返回兩次,由於操做系統自動把當前進程(稱爲父進程)複製了一份(稱爲子進程),而後,分別在父進程和子進程內返回。子進程返回0,父進程返回子進程的PID。注意:
進程之間共享終端,共享一個文件系統
進程的狀態主要分爲三種:進行、阻塞、就緒
在傳統的操做系統中,每一個進程有一個地址空間,並且默認就有一個控制線程,多線程(及多個控制線程)的概念是,在一個進程中存在多個控制線程,多個控制線程共享該進程的地址空間,進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是CPU的執行單位。
多線程指的是,在一個進程中開啓多個線程,簡單來講:若是多個任務公用一塊地址空間,那麼必須在一個進程內開啓多個線程。 一、多線程共享一個進程的地址空間 二、線程比進程更輕量級,線程比進程更容易建立和撤銷,在許多操做系統中,建立一個線程比建立一個進程要快10-100倍 三、對於CPU密集型的應用,多線程並不能提高性能,但對於I/O密集型,使用多線程會明顯的提高速度(I/O密集型,根本用不上多核優點) 四、在多CPU系統中,爲了最大限度的利用多核,能夠開啓多個線程(比開進程開銷要小的多) --> 針對其餘語言 注意: Python中的線程比較特殊,其餘語言,1個進程內4個線程,若是有4個CPU的時候,是能夠同時運行的,而Python在同一時間1個進程內,只有一個線程能夠工做。(就算你有再多的CPU,對Python來講用不上)
一、線程共享建立它的進程的地址空間,進程擁有本身的地址空間 二、線程能夠直接訪問進程的數據,進程擁有它父進程內存空間的拷貝 三、線程能夠和同一進程內其餘的線程直接通訊,進程必須interprocess communicateion(IPC機制)進行通訊 四、線程能夠被很容易的建立,而進程依賴於父進程內存空間的拷貝 五、線程能夠直接控制同一進程內的其餘線程,進程只能控制本身的子進程 六、改變主線程(控制)可能會影響其餘線程,改變主進程不會影響它的子進程
python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing,該模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。
multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。
須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。
注意:在windows中Process()必須放到# if __name__ == '__main__':下
利用Process建立進程的類:
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
參數:
p.start(): # 啓動進程,並調用該子進程中的p.run() --> 和直接調用run方法是不一樣的,由於它會初始化部分其餘參數。 p.run(): # 進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 p.terminate(): # 強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖 p.is_alive(): # 若是p仍然運行,返回True p.join([timeout]): # 主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
p.daemon: # 默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置 p.name: # 進程的名稱 p.pid: # 進程的pid p.exitcode: # 進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) p.authkey: # 進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功
特別強調:設置 p.daemon=True 是會隨着主進程執行完畢而被回收,無論子進程是否完成任務。
使用Process建立進程的類有兩種方法:
一、經過實例化Process類完成進程的建立
二、繼承Process類,定製本身須要的功能後實例化建立進程類
# --------------------------- 方法1 --------------------------- import random import time from multiprocessing import Process def hello(name): print('Welcome to my Home') time.sleep(random.randint(1,3)) print('Bye Bye') p = Process(target=hello,args=('daxin',)) # 建立子進程p p.start() # 啓動子進程 print('主進程結束') # --------------------------- 方法2 --------------------------- import random import time from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super(MyProcess, self).__init__() # 必須繼承父類的構造函數 self.name = name def run(self): # 必須叫run方法,由於start,就是執行的run方法。 print('Welcome to {0} Home'.format(self.name)) time.sleep(random.randint(1,3)) print('Bye Bye') p = MyProcess('daxin') p.start() print('主進程結束')
上一節咱們利用socket完成了socket server的編寫,這裏咱們使用multiprocessing對server端進行改寫,完成併發接受請求的功能。
若是服務端接受上萬個請求,那麼豈不是要建立1萬個進程去分別對應?這樣是不行的,那麼咱們可使用進程池的概念來解決這個問題,進程池的問題,在後續小節中詳細說明
進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理。
鎖的目的就是:當程序1在使用的時候,申請鎖,而且鎖住共享資源,待使用完畢後,釋放鎖資源,其餘程序獲取鎖後,重複這個過程。
Multiprocessing模塊提供了Lock對象用來完成進程同步鎖的功能
from multiprocessing import Lock lock = Lock() # 對象沒有參數 # 經過使用lock對象的acquire/release方法來進行 鎖/釋放 的需求。
利用進程同步鎖模擬搶票軟件的需求:
import random import time import json from multiprocessing import Process,Lock def gettickles(filename,str,lock): lock.acquire() # 對要修改的部分加鎖 with open(filename,encoding='utf-8') as f: dic = json.loads(f.read()) if dic['count'] > 0 : dic['count'] -= 1 time.sleep(random.random()) with open(filename,'w',encoding='utf-8') as f: f.write(json.dumps(dic)) print('\033[33m{0}搶票成功\033[0m'.format(str)) else: print('\033[35m{0}搶票失敗\033[0m'.format(str)) lock.release() # 修改完畢後解鎖 if __name__ == '__main__': lock = Lock() # 建立一個鎖文件 p_l = [] for i in range(1000): p = Process(target=gettickles,args=('a.txt','用戶%s' % i,lock)) p_l.append(p) p.start()
加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。多進程是實現併發的手段之一,須要注意的問題是:
例如當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。
咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數...
ps:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程
from multiprocessing import Pool pool = Pool(processes=None, initializer=None, initargs=())
參數:
p.apply(func [, args [, kwargs]]) # 調用進程池中的一個進程執行函數func,args/kwargs爲傳遞的參數,注意apply是阻塞式的,既串行執行。 p.apply_async(func [, args [, kwargs]]) # 功能同apply,區別是非阻塞的,既異步執行。 ———> 經常使用 p.close() # 關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成 P.join() # 等待全部工做進程退出。此方法只能在close()或teminate()以後調用
注意:
apply_async 會返回AsyncResul對象,這個AsyncResul對象有有一下方法:
利用進程池改寫socket server:
import os import socket import multiprocessing server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8100)) server.listen(5) def talk(conn): print('個人進程號是: %s' % os.getpid() ) while True: msg = conn.recv(1024) if not msg:break data = msg.decode('utf-8') msg = data.upper() conn.send(msg.encode('utf-8')) if __name__ == '__main__': pool = multiprocessing.Pool(1) while True: conn,addr = server.accept() print(addr) pool.apply_async(talk,args=(conn,)) pool.close() pool.join()
這裏指定了進程池的數量爲1,那麼併發兩個鏈接的話,第二個會hold住,只有第一個斷開後,纔會鏈接,注意:進程的Pid號,仍是相同的。
須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數。咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
apply_async(self, func, args=(), kwds={}, callback=None) # func的結果會交給指定的callback函數處理
一個爬蟲的小例子:
from multiprocessing import Pool import requests import os def geturl(url): print('個人進程號爲: %s' % os.getpid()) print('我處理的url爲: %s ' % url ) response = requests.get(url) # 請求網頁 return response.text # 返回網頁源碼 def urlparser(htmlcode): print('個人進程號是: %s ' % os.getpid()) datalength = len(htmlcode) # 計算源碼的長度 print('解析到的html大小爲: %s' % datalength) if __name__ == '__main__': pool = Pool() url = [ 'http://www.baidu.com', 'http://www.sina.com', 'http://www.qq.com', 'http://www.163.com' ] res_l = [] for i in url: res = pool.apply_async(geturl,args=(i,),callback=urlparser) # res 是 geturl執行的結果,由於已經交給urlparser處理了,因此這裏不用拿 res_l.append(res) pool.close() pool.join() for res in res_l: print(res.get()) # 這裏拿到的就是網頁的源碼
進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊提供的兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。可是還有一種基於共享數據的方式,如今已經不推薦使用,建議使用隊列的方式進行進程間通信。
展望將來,基於消息傳遞的併發編程是大勢所趨,即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。
底層就是以管道和鎖定的方式實現。
建立隊列的類:
Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。 # 參數 maxsize: 隊列能承載的最大數量,省略的話則不限制隊列大小
基本使用:
from multiprocessing import Queue q = Queue(3) q.put('a') # 數據存入Queue print(q.get()) # 從Queue中取出數據
注意:隊列(Queue)是FIFO模式,既先進先出。
q.put() 用於插入數據到隊列中。
q.put(obj, block=True, timeout=None) # 參數: # blocked,timeout:若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
PS:q.put_nowait() 等同於 q.put(block=False)
q.get() 用於從隊列中獲取數據。
q.get(block=True,timeout=None) # 參數: # blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
PS:q.get_nowait() 等同於 q.get(block=False)
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
基於隊列實現生產者消費者模型:
上面的例子很完美,可是生產者生產完畢,消費者也消費完畢了,那麼咱們的主程序就應該退出了,但是並無,由於消費者還在等待從隊列中獲取(q.get),這裏咱們考慮能夠發送一個作完/吃完的信號,抓取到信號後退出便可。
其中:
進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的,雖然進程間數據獨立,但也能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此。
Manager() # 沒有參數 # 使用Manager對象建立共享數據類型
利用Manager建立數據,完成進程共享
import os from multiprocessing import Manager,Process def worker(d,l): d[os.getpid()]=os.getpid() # 對共享數據進行修改 l.append(os.getpid()) if __name__ == '__main__': m = Manager() d = m.dict() # 建立共享字典 l = m.list() # 建立共享列表 p_l = [] for i in range(10): p= Process(target=worker,args=(d,l)) p_l.append(p) p.start() for p in p_l: p.join() print(d) print(l)
Python 標準庫提供了 thread 和 threading 兩個模塊來對多線程進行支持。其中, thread 模塊以低級、原始的方式來處理和控制線程,而 threading 模塊經過對 thread 進行二次封裝,提供了更方便的 api 來處理線程。
PS:multiprocessing徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,因此不少用法都是相同的,因此可能看起來會比較眼熟。
Thread 是threading模塊中最重要的類之一,可使用它來建立線程。
有兩種方式來建立線程:
# -----------------------實例化對象-------------------------- import threading def work(name): print('hello,{0}'.format(name)) if __name__ == '__main__': t = threading.Thread(target=work,args=('daxin',)) t.start() print('主進程') # -----------------------本身建立類-------------------------- import threading class Work(threading.Thread): def __init__(self,name): super(Work, self).__init__() self.name = name def run(self): print('hello,{0}'.format(self.name)) if __name__ == '__main__': t = Work(name='daxin') t.start() print('主進程')
PS:執行的時候,咱們能夠看到會先打印"hello,daxin",而後纔會打印"主進程",因此這也同時說明了,建立線程比建立進程消耗資源少的多,線程會被很快的建立出來並執行。若是咱們在target執行的函數和主函數中,同時打印os.getpid,你會發現,進程號是相同的,這也說明了這裏開啓的是自線程。