閱讀本文大約須要 10 分鐘。
進程html
實現方式python
fork()
示例:程序員
import os print('current_pid :%d' % os.getpid()) res = os.fork() # 子進程返回的是 0 if res == 0: print('res: %d' % res) print('sub_pid: %d' % os.getpid()) # 主進程返回的是子進程的 pid else: print('main_pid: %d' % os.getpid()) print('res:%d' % res) # 結果爲 current_pid :12775 main_pid: 12775 res:12776 res: 0 sub_pid: 12776multiprocessing.Process
multiprocessing.Process
示例:算法
from multiprocessing import Process import os, time print('man_process pid : %d' % os.getpid()) class NewProcess(Process): def __init__(self): Process.__init__(self) def run(self): time.sleep(3) print('%d process was runing' % os.getpid()) np = NewProcess() np.start() # 結果爲 man_process pid : 7846 7847 process was runing
multiprocessing.Pool安全
同步(apply)多線程
示例:併發
from multiprocessing import Pool import time, os, random print('main_process pid: %d' % os.getpid()) def run(): time.sleep(random.random()) # random.random() 隨機生成一個小於 1 的浮點數 print('%d process was runing' % os.getpid()) p = Pool(3) for i in range(4): p.apply(run, args=()) p.close() print('waiting for sub_process') while True: # 獲取 Pool 中剩餘的進程數量 count = len(p._cache) if count != 0: print('there was %d sub_process' % count) time.sleep(random.random()) else: break print('sub_process has done') # 結果爲 main_process pid: 4295 4297 process was runing 4296 process was runing 4298 process was runing 4297 process was runing wating for sub_process sub_process has done
異步(apply_async)
示例:app
from multiprocessing import Pool import time, os, random print('main_process pid: %d' % os.getpid()) def run(): # random.random() 隨機生成一個小於 1 的浮點數 time.sleep(random.random()) print('%d process was runing' % os.getpid()) p = Pool(3) for i in range(4): p.apply_async(run, args=()) p.close() while True: # 獲取 Pool 中剩餘的進程數量 count = len(p._cache) if count != 0: print('there was %d sub_process' % count) time.sleep(random.random()) else: break print('wiating for sub_process..') p.join() print('sub_process has done') # 結果爲 main_process pid: 4342 wiating for sub_process.. there was 4 sub_process 4344 process was runing there was 3 sub_process 4345 process was runing 4344 process was runing 4343 process was runing sub_process has done
優缺點dom
fork()
是計算機最底層的進程實現方式,一個fork()
方法建立出來的進程有兩個:主進程、子進程。fork()
建立出來的進程,主進程不會等待子進程。multiprocessing
模塊經過將fork
方法封裝成一個Process
類,該類有一個start()
方法,當調用該方法時,會自動調用run()
方法,開啓一個進程。而且由Process
建立出來的進程,可使用join()
方法,使得主進程堵塞,被迫等待子進程。multiprocess
下另外一種開啓進程的方式是經過Pool
進程池來實現。進程池能夠開啓多個進程來執行多個任務,可是進程數最大不會超過系統 CPU 核數。一樣的,由Pool
建立出來的進程,主進程也不會等待子進程,經過join()
方法能夠迫使主進程等待子進程,或者使用apply()
同步的方式。進程通訊
進程之間的通訊能夠經過隊列(Queue)來進行,多個進程一部分向隊列裏寫入數據,一部分從隊列裏讀取數據,從而完成多進程之間的通訊問題。
示例:異步
from multiprocessing import Process, Queue import random, time, os def write(q): if not q.full(): for i in range(4): q.put(i) print('%d was writing data[%d] to queue' % (os.getpid(), i)) time.sleep(random.random()) else: print('queue is full') def read(q): # 等待隊列被寫入數據 time.sleep(random.random()) while True: if not q.empty(): data = q.get() print('%d was reading data{%d} from queue' % (os.getpid(), data)) else: print('queue is empty') break # 建立通訊隊列,進程之間,全局變量不共享 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.join() print('end') # 結果爲 4640 was writing data[0] to queue 4640 was writing data[1] to queue 4640 was writing data[2] to queue 4641 was reading data{0} from queue 4641 was reading data{1} from queue 4641 was reading data{2} from queue queue is empty 4640 was writing data[3] to queue end
因爲進程的執行順序問題,形成了 pr 先於 pw 執行,因此 pr 未讀取到數據,pr 進程任務結束,堵塞解開,主進程繼續向下運行,最後 pw 任務結束。
進程通訊改良
示例:
from multiprocessing import Process, Queue import random, time, os def write(q): if not q.full(): for i in range(4): q.put(i) print('%d was writing data[%d] to queue' % (os.getpid(), i)) time.sleep(random.random()) else: print('queue is full') def read(q): # 等待隊列被寫入數據 time.sleep(random.random()) while True: data = q.get() print('%d was reading data{%d} from queue' % (os.getpid(), data)) # 建立通訊隊列,進程之間,沒有全局變量共享之說 q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() # pr 進程馬上結束 pr.terminate() print('end') # 結果爲 12898 was writing data[0] to queue 12898 was writing data[1] to queue 12898 was writing data[2] to queue 12899 was reading data{0} from queue 12899 was reading data{1} from queue 12899 was reading data{2} from queue 12898 was writing data[3] to queue 12899 was reading data{3} from queue end
線程
另外,Python 中的線程依據的是 Java 中的線程模型,若是有興趣的同窗能夠研究一下。
實現方式
示例:
import threading, time def run(): time.sleep(1) # currentThread() 返回的是當前的線程對象信息 print('%s was runing' % threading.currentThread()) print('current thread\'name: %s' % threading.currentThread().getName()) # 建立一個線程 t = threading.Thread(target=run, args=()) # 啓動線程 t.start() # get_ident 返回的是當前線程對象所在的內存地址(id),該地址是惟一能夠驗證線程的數據 # 也可以使用 currentThread().getName() 來簡單的區分線程 print('current thread\'name: %s' % threading.currentThread().getName()) print('main_thread tid: %s' % threading.get_ident()) # 結果爲 current thread'name: MainThread main_thread tid: 140427132020480 <Thread(Thread-1, started 140427100555008)> was runing current thread'name: Thread-1
線程通訊
通訊隊列
通訊隊列做爲相對來講最爲安全的線程通訊手段,其中Queue
模塊自身擁有全部所需的鎖,這使得通訊隊列中的對象能夠安全的在多線程之間共享。
這裏用常見的「生產者-消費者模型」來介紹。
示例:
import threading, queue, time, random flag = object() def producter(q): for i in range(4): q.put(i) print('%s put data{%d} in queue' % (threading.currentThread().getName(), i)) time.sleep(random.random()) q.put(flag) def consumer(q): time.sleep(random.random()) while True: res = q.get() if res == flag: q.put(flag) break else: print('%s get data{%d} from queue' % (threading.currentThread().getName(), res)) # 建立隊列 q = queue.Queue() # 建立線程 pro = threading.Thread(target=producter, args=(q,)) con = threading.Thread(target=consumer, args=(q,)) pro.start() con.start() # 結果爲 Thread-1 put data{0} in queue Thread-1 put data{1} in queue Thread-2 get data{0} from queue Thread-2 get data{1} from queue Thread-1 put data{2} in queue Thread-2 get data{2} from queue Thread-1 put data{3} in queue Thread-2 get data{3} from queue end
這裏有一個細節。在多線程下,當生產者任務完成以後,向隊列queue
裏添加了一個特殊對象(終止信號)flag
,這樣當消費者從queue
中取出任務時,當取到flag
時,意味着全部任務被取出,並再次將flag
添加至queue
中,這樣其餘線程中的消費者在接收到這個終止信號後,也會得知當前生產者任務已經所有發佈。
輪詢
經過爲數據操做添加while
循環判斷,迫使線程被迫等待操做。(爲了優化等待時間,應在最核心的位置添加判斷條件)
示例:
import threading class NewThread(threading.Thread): flag = 0 g_num = 0 def __init__(self): super().__init__() def run(self): print('%s was runing' % threading.currentThread().getName()) if self.name == 'Thread-1': self.add_num() NewThread.flag = 1 else: # 輪詢 # Thread-2 被迫等待 Thread-1 完成任務以後才能執行 while True: if NewThread.flag: self.add_num() break @classmethod def add_num(cls): global g_num for i in range(1000000): cls.g_num += 1 print('on the %s, g_num: %d' % (threading.currentThread().getName(), cls.g_num)) t1 = NewThread() t2 = NewThread() t1.start() t2.start() # 結果爲 Thread-1 was runing Thread-2 was runing on the Thread-1, g_num: 1000000 on the Thread-2, g_num: 2000000
互斥鎖優化
示例:
import threading class NewThread(threading.Thread): g_num = 0 # 生成鎖對象 lock = threading.Lock() def __init__(self): super().__init__() def run(self): # 判斷當前線程是否上鎖,若未上鎖,則一直嘗試上鎖(acquire)直至成功 with NewThread.lock: print('%s was runing' % self.name) self.add_num() @classmethod def add_num(cls): for i in range(1000000): cls.g_num += 1 print('on the %s g_num: %d' % (threading.currentThread().getName(), cls.g_num)) t1 = NewThread() t2 = NewThread() t1.start() t2.start() # 結果爲 Thread-1 was runing on the Thread-1 g_num: 1000000 Thread-2 was runing on the Thread-2 g_num: 2000000
死鎖問題解決
threading.Lock().acquire(timeout=3)
只要在上鎖時設置超時時間timeout=
,只要超過期間,線程就會再也不等待是否解鎖,而是直接運行。可是這種方式很危險,可能會帶來大量的等待時間。進程與線程的區別
join()
方法使得主程序發生堵塞,來等待子進程。而主線程的任務結束後,程序會等待子線程結束纔會結束。故不須要特地使用join()
方法來使主線程等待子線程。協程
實現方式
生成器(yield)
生成器相關內容可看問題 13。
這裏以一個簡單的「生產者-消費者模型」來解釋如何使用生成器實現協程。
示例:
import threading def producter(c): next(c) n = 4 print('%s was running' % threading.currentThread().getName()) while n: print('product data: %d' % n) res = c.send(n) print(res) n -= 1 print('sale out') def consumer(): res = '' print('%s was running' % threading.currentThread().getName()) while True: n = yield res print('consume data: %d' % n) res = '200 OK' print('%s was running' % threading.currentThread().getName()) c = consumer() producter(c) # 結果爲 MainThread was running MainThread was running MainThread was running product data: 4 consume data: 4 200 OK product data: 3 consume data: 3 200 OK product data: 2 consume data: 2 200 OK product data: 1 consume data: 1 200 OK sale out
能夠看到,生產者事先不知道消費者具體要消費多少數據,生產者只是一直在生產。而消費者則是利用生成器的中斷特性,consumer
函數中,程序每一次循環遇到yield
關鍵字就會停下,等待producter
函數啓動生成器,再繼續下一次循環。
在這中間只有一個線程在運行,任務的切換時機由程序員本身控制,避免了因爲多線程之間的切換消耗,這樣就簡單實現了協程。
asyncio
庫,該庫適用於高併發。本身目前不會,就不瞎 BB 了,具體可看文檔。
未寫完,下次更新補上