進程、線程和協程

1、進程

一、多任務原理

  多任務是指操做系統同時能夠運行多個任務。python

  • 單核CPU實現多任務原理:操做系統輪流讓各個任務交替執行;
  • 多核CPU實現多任務原理:真正的執行多任務只能在多核CPU上實現,多出來的任務輪流調度到每一個核心上執行。
  • 併發:看上去一塊兒執行,任務數多於CPU核心數;
  • 並行:真正的一塊兒執行,任務數小於等於CPU核心數。

  實現多任務的方式:
    一、多進程模式
    二、多線程模式
    三、協程模式
    四、多進程+多線程模式多線程

二、進程

  對於操做系統而言,一個任務就是一個進程;併發

  進程是系統中程序執行和資源分配的基本單元,每一個進程都有本身的數據段、代碼段、堆棧段。app


 

  下面是一小段程序,一個單任務的例子。在其中,有兩個輸出語句分別在在兩個不一樣的循環當中,單任務的執行方式,也就是最初學習時,當一個循環沒有結束的時候,沒法執行到下面的程序當中。若是想要讓兩個循環能夠同時在執行,就是在實現多任務,固然不是說同時輸出,而是兩個循環都在執行着。dom

 1 from time import sleep  2 # 只能執行到那一個循環,執行不了run,因此叫單任務
 3 def run():  4     while True:  5         print("&&&&&&&&&&&&&&&")  6         sleep(1.2)  7 
 8 if __name__ == "__main__":  9     while True: 10         print("**********") 11         sleep(1) 12     run()

  接下來啓用多任務,經過進程來實現。async

  multiprocessing庫:跨平臺版本的多進程模塊,提供了一個Process類來表明一個進程對象(fork僅適用於Linux)。ide

  下面的程序是在一個父進程中建立一個子進程,讓父進程和子進程能夠都在執行,建立方式程序中已經很簡潔了。能夠本身把這兩段程序複製下來運行一下,看看輸出的效果。函數

 1 from multiprocessing import Process  2 from time import sleep  3 import os  4 
 5 def run(str):  6     # os.getpid()獲取當前進程id號
 7     # os.getppid()獲取當前進程的父進程id號
 8     while True:  9         print("&&&&&&&&&&&&&&&%s--%s--%s" % (str, os.getpid(), os.getppid())) 10         sleep(0.5) 11 
12 if __name__ == "__main__": 13     print("主(父)進程啓動 %s" % (os.getpid())) 14     # 建立子進程
15     # target說明進程執行的任務
16     p = Process(target=run, args=("nice",)) 17     # 啓動進程
18  p.start() 19 
20     while True: 21         print("**********") 22         sleep(1)

  我想第一個單任務的程序就沒必要說了吧,就是一個死循環,一直沒有執行到下面的run函數。第二段程序是經過多進程實現的多任務,兩個循環都能執行到,我把結果截圖放下面,最好本身去試一下。學習

三、父子進程的前後順序

  上面的多進程的例子中輸出了那麼多,咱們使用的時候到底是先執行哪一個後執行哪一個呢?根據咱們的通常思惟來講,咱們寫的主函數其實就是父進程,在主函數中間,要調用的也就是子進程。大數據

 

 1 from multiprocessing import Process  2 from time import sleep  3 import os  4 
 5 def run():  6     print("啓動子進程")  7     print("子進程結束")  8     sleep(3)  9 
10 if __name__ == "__main__": 11     print("父進程啓動") 12     p = Process(target=run) 13  p.start() 14 
15     # 父進程的結束不能影響子進程,讓進程等待子進程結束再執行父進程
16  p.join() 17 
18     print("父進程結束")

四、全局變量在多個進程中不能共享 

  在多進程的程序當中定義的全局變量在多個進程中是不能共享的,篇幅較長在這裏就不舉例子了,能夠本身試一下。這個也是和稍後要說的線程的一個區別,在線程中,變量是能夠共享的,也所以衍生出一些問題,稍後再說。

五、啓動多個進程 

  在正常工做使用的時候,固然不止有有個一個兩個進程,畢竟這一兩個也起不到想要的效果。那麼就須要採用更多的進程,這時候須要經過進程池來實現,就是在進程池中放好你要創建的進程,而後執行的時候,把他們都啓動起來,就能夠同時進行了,在必定的環境下能夠大大的提升效率。固然這個也和起初提到的有關,若是你的CPU是單核的,那麼多進程也只是起到了讓幾個任務同時在執行着,並無提升效率,並且啓動進程的時候還要花費一些時間,所以在多核CPU當中更能發揮優點。

  在multiprocessing中有個Pool方法,能夠實現進程池。在利用進程池時能夠設置要啓動幾個進程,通常狀況下,它默認和你電腦的CPU核數一致,也能夠本身設置,若是設置的進程數多於CPU核數,那多出來的進程會輪流調度到每一個核心上執行。下面是啓動多個進程的過程。

 1 from multiprocessing import Pool  2 import os  3 import time  4 import random  5 
 6 
 7 def run(name):  8     print("子進程%s啓動--%s" % (name, os.getpid()))  9     start = time.time() 10     time.sleep(random.choice([1,2,3,4,5])) 11     end = time.time() 12     print("子進程%s結束--%s--耗時%.2f" % (name, os.getpid(), end-start)) 13 
14 if __name__ == "__main__": 15     print("啓動父進程") 16 
17     # 建立多個進程
18     # Pool 進程池 :括號裏的數表示能夠同時執行的進程數量
19     # Pool()默認大小是CPU核心數
20     pp = Pool(4) 21     for i in range(5): 22         # 建立進程,放入進程池,統一管理
23         pp.apply_async(run, args=(i,)) 24 
25     # 在調用join以前必須先調用close,調用close以後就不能再繼續添加新的進程了
26  pp.close() 27     # 進程池對象調用join還等待進程池中全部的子進程結束
28  pp.join() 29 
30     print("結束父進程")

六、文件拷貝(單進程與多進程對比)

(1)單進程實現

 1 from multiprocessing import Pool
 2 import time
 3 import os
 4 
 5 # 實現文件的拷貝
 6 def copyFile(rPath, wPath):
 7     fr = open(rPath, 'rb')
 8     fw = open(wPath, 'wb')
 9     context = fr.read()
10     fw.write(context)
11     fr.close()
12     fw.close()
13 
14 path = r'F:\python_note\線程、協程'
15 toPath = r'F:\python_note\test'
16 
17 # 讀取path下的全部文件
18 filesList = os.listdir(path)
19 
20 # 啓動for循環處理每個文件
21 start = time.time()
22 for fileName in filesList:
23     copyFile(os.path.join(path,fileName), os.path.join(toPath,fileName))
24 
25 end = time.time()
26 print('總耗時:%.2f' % (end-start))
View Code

(2)多進程實現

 1 from multiprocessing import Pool
 2 import time
 3 import os
 4 
 5 # 實現文件的拷貝
 6 def copyFile(rPath, wPath):
 7     fr = open(rPath, 'rb')
 8     fw = open(wPath, 'wb')
 9     context = fr.read()
10     fw.write(context)
11     fr.close()
12     fw.close()
13 
14 path = r'F:\python_note\線程、協程'
15 toPath = r'F:\python_note\test'
16 
17 
18 if __name__ == "__main__":
19     # 讀取path下的全部文件
20     filesList = os.listdir(path)
21 
22     start = time.time()
23     pp = Pool(4)
24     for fileName in filesList:
25         pp.apply_async(copyFile, args=(os.path.join(
26             path, fileName), os.path.join(toPath, fileName)))
27     pp.close()
28     pp.join()
29     end = time.time()
30     print("總耗時:%.2f" % (end - start))
View Code

  上面兩個程序是兩種方法實現同一個目標的程序,能夠將其中的文件路徑更換爲你本身的路徑,能夠看到最後計算出的耗時是多少。也許有人發現並非多進程的效率就高,說的的確沒錯,由於建立進程也要花費時間,沒準啓動進程的時間遠多讓這一個核心運行全部核心用的時間要多。這個例子也只是演示一下如何使用,在大數據的任務下會有更深入的體驗。

 七、進程對象

  咱們知道Python是一個面向對象的語言。並且Python中萬物皆對象,進程也能夠封裝成對象,來方便之後本身使用,只要把他封裝的足夠豐富,提供清晰的接口,之後使用時會快捷不少,這個就根據本身的需求本身能夠試一下,不寫了。

 八、進程間通訊

  上面提到過進程間的變量是不能共享的,那麼若是有須要該怎麼辦?經過隊列的方式進行傳遞。在父進程中建立隊列,而後把隊列傳到每一個子進程當中,他們就能夠共同對其進行操做。 

 1 from multiprocessing import Process, Queue  2 import os  3 import time  4 
 5 
 6 def write(q):  7     print("啓動寫子進程%s" % (os.getpid()))  8     for chr in ['A', 'B', 'C', 'D']:  9  q.put(chr) 10         time.sleep(1) 11     print("結束寫子進程%s" % (os.getpid())) 12 
13 def read(q): 14     print("啓動讀子進程%s" % (os.getpid())) 15     while True: 16         value = q.get() 17         print("value = "+value) 18     print("結束讀子進程%s" % (os.getpid())) 19 
20 if __name__ == "__main__": 21     # 父進程建立隊列,並傳遞給子進程
22     q = Queue() 23     pw = Process(target=write, args=(q,)) 24     pr = Process(target=read, args=(q,)) 25 
26  pw.start() 27  pr.start() 28     # 寫進程結束
29  pw.join() 30     # pr進程裏是個死循環,沒法等待期結束,只能強行結束
31  pr.terminate() 32     print("父進程結束")

 2、線程

一、線程

  • 在一個進程內部,要同時幹多件事,就須要運行多個"子任務",咱們把進程內的多個"子任務"叫作線程
  • 線程一般叫作輕型的進程,線程是共享內存空間,併發執行的多任務,每個線程都共享一個進程的資源
  • 線程是最小的執行單元而進程由至少一個線程組成。如何調度進程和線程,徹底由操做系統來決定,程序本身不能決定何時執行,執行多長時間

模塊:

一、_thread模塊 低級模塊(更接近底層)

二、threading模塊 高級模塊,對_thread進行了封裝

二、啓動一個線程

  一樣,先給一個多線程的例子,其中,仍然使用run函數做爲其中的一個子線程,主函數爲父線程。經過threading的Thread方法建立線程並開啓,join來等待子線程。

 1 import threading  2 import time  3 
 4 
 5 def run():  6     print("子線程(%s)啓動" % (threading.current_thread().name))  7 
 8     # 實現線程的功能
 9     time.sleep(1) 10     print("打印") 11     time.sleep(2) 12 
13     print("子線程(%s)結束" % (threading.current_thread().name)) 14 
15 
16 if __name__ == "__main__": 17     # 任何進程都默認會啓動一個線程,稱爲主線程,主線程能夠啓動新的子線程
18     # current_thread():返回線程的實例
19     print("主線程(%s)啓動" % (threading.current_thread().name)) 20 
21     # 建立子線程
22     t = threading.Thread(target=run, name="runThread") 23  t.start() 24 
25     # 等待線程結束
26  t.join() 27 
28     print("主線程(%s)結束" % (threading.current_thread().name))

三、線程間數據共享

  多線程和多進程最大的不一樣在於,多進程中,同一個變量,各自有一份拷貝存在每一個進程中,互不影響。

  而多線程全部變量都由全部線程共享。因此任何一個變量均可以被任何一個線程修改,所以,線程之間共享數據最大的危險在於多個線程同時修改一個變量,容易把內容改亂了。

 1 import threading  2 
 3 
 4 num = 10
 5 
 6 def run(n):  7     global num  8     for i in range(10000000):  9         num = num + n 10         num = num - n 11 
12 if __name__ == "__main__": 13     t1 = threading.Thread(target=run, args=(6,)) 14     t2 = threading.Thread(target=run, args=(9,)) 15 
16  t1.start() 17  t2.start() 18  t1.join() 19  t2.join() 20 
21     print("num = ",num)

四、線程鎖

  在第三小點中已經提到了,多線程的一個缺點就是數據是共享的,若是有兩個線程正同時在修改這個數據,就會出現混亂,它本身也不知道該聽誰的了,尤爲是在運算比較複雜,次數較多的時候,這種錯誤的機會會更大。

  固然,解決辦法也是有的,那就是利用線程鎖。加鎖的意思就是在其中一個線程正在對數據進行操做時,讓其餘線程不得介入。這個加鎖和釋放鎖是由人來肯定的。

  • 確保了這段代碼只能由一個線程從頭至尾的完整執行
  • 阻止了多線程的併發執行,要比不加鎖時候效率低。包含鎖的代碼段只能以單線程模式執行
  • 因爲能夠存在多個鎖,不一樣線程持有不一樣的鎖,並試圖獲取其餘的鎖,可能形成死鎖致使多個線程掛起,只能靠操做系統強制終止
 1 def run(n):  2     global num  3     for i in range(10000000):  4  lock.acquire()  5         try:  6             num = num + n  7             num = num - n  8         finally:  9             # 修改完釋放鎖
10  lock.release() 11 
12 if __name__ == "__main__": 13     t1 = threading.Thread(target=run, args=(6,)) 14     t2 = threading.Thread(target=run, args=(9,)) 15 
16  t1.start() 17  t2.start() 18  t1.join() 19  t2.join() 20 
21     print("num = ",num)

  上面這段程序是循環屢次num+n-n+n-n的過程,變量n分別設爲6和9是在兩個不一樣的線程當中,程序中已經加了鎖,你能夠先去掉試一下,當循環次數較小的時候也許還能正確,但次數一旦取的較高就會出現混亂。

  加鎖是在循環體當中,依次執行加減法,定義中說到確保一個線程從頭至尾的完整執行,也就是在計算途中,不會有其餘的線程打擾。你能夠想一下,若是一個線程執行完加法,正在執行減法,另外一個線程進來了,它要先進行加法時的初始sum值該是多少呢,線程二不必定在線程一的何時進來,萬一剛進來時候,線程一剛好給sum賦值了,而線程二仍然用的是正準備進來時候的sum值,那從這裏開始豈不已經分道揚鑣了。因此,運算的次數越多,結果會越離譜。

  這個說完了,還有一個小小的改進。你是否記得讀寫文件時候書寫的一種簡便形式,經過with來實現,能夠避免咱們忘記關閉文件,自動幫咱們關閉。固然還有一些其餘地方也用到了這個方法。這裏也一樣適用。

1 # 與上面代碼功能相同,with lock能夠自動上鎖與解鎖
2 with lock: 3     num = num + n 4     num = num - n

五、ThreadLocal

  • 建立一個全局的ThreadLocal對象
  • 每一個線程有獨立的存儲空間
  • 每一個線程對ThreadLocal對象均可以讀寫,可是互不影響

  根據名字也能夠看出,也就是在本地建個鏈接,全部的操做在本地進行,每一個線程之間沒有數據的影響。

 1 import threading  2 
 3 
 4 num = 0  5 local = threading.local()  6 
 7 def run(x, n):  8     x = x + n  9     x = x - n 10 
11 def func(n): 12     # 每一個線程都有local.x
13     local.x = num 14     for i in range(10000000): 15  run(local.x, n) 16     print("%s-%d" % (threading.current_thread().name, local.x)) 17 
18 
19 if __name__ == "__main__": 20     t1 = threading.Thread(target=func, args=(6,)) 21     t2 = threading.Thread(target=func, args=(9,)) 22 
23  t1.start() 24  t2.start() 25  t1.join() 26  t2.join() 27 
28     print("num = ",num)

六、控制線程數量

 1 '''
 2 控制線程數量是指控制線程同時觸發的數量,能夠拿下來這段代碼運行一下,下面啓動了5個線程,可是他們會兩個兩個的進行  3 '''
 4 import threading  5 import time  6 
 7 # 控制併發執行線程的數量
 8 sem = threading.Semaphore(2)  9 
10 def run(): 11  with sem: 12         for i in range(10): 13             print("%s---%d" % (threading.current_thread().name, i)) 14             time.sleep(1) 15 
16 
17 if __name__ == "__main__": 18     for i in range(5): 19         threading.Thread(target=run).start()

  上面的程序是有多個線程,可是每次限制同時執行的線程,通俗點說就是限制併發線程的上限;除此以外,也能夠限制線程數量的下限,也就是至少達到多少個線程才能觸發。

 1 import threading  2 import time  3 
 4 
 5 # 湊夠必定數量的線程纔會執行,不然一直等着
 6 bar = threading.Barrier(4)  7 
 8 def run():  9     print("%s--start" % (threading.current_thread().name)) 10     time.sleep(1) 11  bar.wait() 12     print("%s--end" % (threading.current_thread().name)) 13 
14 
15 if __name__ == "__main__": 16     for i in range(5): 17         threading.Thread(target=run).start()

七、定時線程

 1 import threading  2 
 3 
 4 def run():  5     print("***********************")  6 
 7 # 延時執行線程
 8 t = threading.Timer(5, run)  9 t.start() 10 
11 t.join() 12 print("父線程結束")

八、線程通訊

 1 import threading  2 import time  3 
 4 
 5 def func():  6     # 事件對象
 7     event = threading.Event()  8     def run():  9         for i in range(5): 10             # 阻塞,等待事件的觸發
11  event.wait() 12             # 重置阻塞,使後面繼續阻塞
13  event.clear() 14             print("**************") 15     t = threading.Thread(target=run).start() 16     return event 17 
18 e = func() 19 
20 # 觸發事件
21 for i in range(5): 22     time.sleep(2) 23     e.set()

九、一個小栗子

  這個例子是用了生產者和消費者來模擬,要進行數據通訊,還引入了隊列。先來理解一下。

 1 import threading  2 import queue  3 import time  4 import random  5 
 6 
 7 # 生產者
 8 def product(id, q):  9     while True: 10         num = random.randint(0, 10000) 11  q.put(num) 12         print("生產者%d生產了%d數據放入了隊列" % (id, num)) 13         time.sleep(3) 14     # 任務完成
15  q.task_done() 16 
17 # 消費者
18 def customer(id, q): 19     while True: 20         item = q.get() 21         if item is None: 22             break
23         print("消費者%d消費了%d數據" % (id, item)) 24         time.sleep(2) 25     # 任務完成
26  q.task_done() 27 
28 
29 if __name__ == "__main__": 30     # 消息隊列
31     q = queue.Queue() 32 
33     # 啓動生產者
34     for i in range(4): 35         threading.Thread(target=product, args=(i, q)).start() 36 
37     # 啓動消費者
38     for i in range(3): 39         threading.Thread(target=customer, args=(i, q)).start()

十、線程調度

 1 import threading  2 import time  3 
 4 
 5 # 線程條件變量
 6 cond = threading.Condition()  7 
 8 
 9 def run(): 10  with cond: 11         for i in range(0, 10, 2): 12             print(threading.current_thread().name, i) 13             time.sleep(1) 14             cond.wait()  # 阻塞
15             cond.notify()  # 告訴另外一個線程能夠執行
16 
17 
18 def run2(): 19  with cond: 20         for i in range(1, 10, 2): 21             print(threading.current_thread().name, i) 22             time.sleep(1) 23  cond.notify() 24  cond.wait() 25 
26 
27 threading.Thread(target=run).start() 28 threading.Thread(target=run2).start()

3、協程

一、協程

  • 子程序/子函數:在全部語言中都是層級調用,好比A調用B,在B執行的工程中又能夠調用C,C執行完畢返回,B執行完畢返回最後是A執行完畢。是經過棧實現的,一個線程就是一個子程序,子程序調用老是一個入口,一次返回,調用的順序是明確的
  • 協程:看上去也是子程序,但執行過程當中,在子程序的內部可中斷,而後轉而執行別的子程序,不是函數調用,有點相似CPU中斷
 1 # 這是一個子程序的調用
 2 def C():  3     print("C--start")  4     print("C--end")  5 
 6 def B():  7     print("B--start")  8  C()  9     print("B--end") 10 
11 def A(): 12     print("A--start") 13  B() 14     print("A--end") 15 
16 A()
  • 協程與子程序調用的結果相似,但不是經過在函數中調用另外一個函數
  • 協程執行起來有點像線程,但協程的特色在因而一個線程
  • 與線程相比的優勢:協程的執行效率極高,由於只有一個線程,也不存在同時寫變量的衝突,在協程中共享資源不加鎖,只須要判斷狀態

二、協程的原理

 1 # python對協程的支持是經過generator實現的
 2 def run():  3     print(1)  4     yield 10
 5     print(2)  6     yield 20
 7     print(3)  8     yield 30
 9 
10 # 協程的最簡單風格,控制函數的階段執行,節約線程或者進程的切換
11 # 返回值是一個生成器
12 m = run() 13 print(next(m)) 14 print(next(m)) 15 print(next(m))

三、數據傳輸

 1 # python對協程的支持是經過generator實現的
 2 def run():  3     print(1)  4     yield 10
 5     print(2)  6     yield 20
 7     print(3)  8     yield 30
 9 
10 # 協程的最簡單風格,控制函數的階段執行,節約線程或者進程的切換
11 # 返回值是一個生成器
12 m = run() 13 print(next(m)) 14 print(next(m)) 15 print(next(m))

四、小栗子

 1 def product(c):  2  c.send(None)  3     for i in range(5):  4         print("生產者產生數據%d" % (i))  5         r = c.send(str(i))  6         print("消費者消費了數據%s" % (r))  7  c.close()  8 
 9 
10 def customer(): 11     data = ""
12     while True: 13         n = yield data 14         if not n: 15             return
16         print("消費者消費了%s" % (n)) 17         data = "200"
18 
19 
20 c = customer() 21 product(c)
相關文章
相關標籤/搜索