PACKAGE CONTENTS
connection
dummy (package)
forking
heap
managers
pool
process
queues
reduction
sharedctypes
synchronize
util
Process 類用來描述一個進程對象。建立子進程的時候,只須要傳入一個執行函數和函數的參數便可完成 Process 示例的建立。多線程
star() 方法啓動進程,
join() 方法實現進程間的同步,等待全部進程退出。
close() 用來阻止多餘的進程涌入進程池 Pool 形成進程阻塞。app
class Process(__builtin__.object) | Process objects represent activity that is run in a separate process | | The class is analagous to `threading.Thread` | | Methods defined here: | | __init__(self, group=None, target=None, name=None, args=(), kwargs={}) | | __repr__(self) | | is_alive(self) | Return whether process is alive | | join(self, timeout=None) | Wait until child process terminates | | run(self) | Method to be run in sub-process; can be overridden in sub-class | | start(self) | Start child process | | terminate(self) | Terminate process; sends SIGTERM signal or uses TerminateProcess()
#一、設置daemon都爲True import multiprocessing import time,datetime def create_test_function(statement): time.sleep(2) print(statement) if __name__ == "__main__": print("first") create_one_process = multiprocessing.Process(target=create_test_function,args=("good",)) create_one_process.daemon = True create_one_process.start() print("second") create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",)) create_two_process.daemon = True create_two_process.start() print("third") ''' output: first second third ''' #二、設置daemon都爲False import multiprocessing import time,datetime def create_test_function(statement): time.sleep(2) print(statement) if __name__ == "__main__": print("first") create_one_process = multiprocessing.Process(target=create_test_function,args=("good",)) create_one_process.daemon = False create_one_process.start() print("second") create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",)) create_two_process.daemon = False create_two_process.start() print("third") ''' output: first second third good nice ''' #三、設置daemon一個True一個False import multiprocessing import time,datetime def create_test_function(statement): time.sleep(2) print(statement) if __name__ == "__main__": print("first") create_one_process = multiprocessing.Process(target=create_test_function,args=("good",)) create_one_process.daemon = True create_one_process.start() print("second") create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",)) create_two_process.daemon = False create_two_process.start() print("third") ''' output: first second third nice ''' #四、設置daemon一個False一個True import multiprocessing import time,datetime def create_test_function(statement): time.sleep(2) print(statement) if __name__ == "__main__": print("first") create_one_process = multiprocessing.Process(target=create_test_function,args=("good",)) create_one_process.daemon = False create_one_process.start() print("second") create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",)) create_two_process.daemon = True create_two_process.start() print("third") ''' output: first second third good '''
import multiprocessing import time,datetime def create_test_function(statement): time.sleep(2) print(statement) if __name__ == "__main__": print("first") create_one_process = multiprocessing.Process(target=create_test_function,args=("1",)) create_one_process.daemon = True create_one_process.start() create_one_process.join() print("second") create_two_process = multiprocessing.Process(target=create_test_function,args=("2",)) create_two_process.daemon = True create_two_process.start() #create_two_process.join() print("third") '''output: first 1 second third ''' import multiprocessing import time,datetime def create_test_function(statement): time.sleep(2) print(statement) if __name__ == "__main__": print("first") create_one_process = multiprocessing.Process(target=create_test_function,args=("1",)) create_one_process.daemon = True create_one_process.start() create_one_process.join() print("second") create_two_process = multiprocessing.Process(target=create_test_function,args=("2",)) create_two_process.daemon = True create_two_process.start() create_two_process.join() print("third") '''output: first 1 second 2 third '''
import multiprocessing import time,datetime def create_test_function(statement): time.sleep(2) print(statement) if __name__ == "__main__": for i in xrange(4): create_one_process = multiprocessing.Process(target=create_test_function,args=(i,)) create_one_process.daemon = True create_one_process.start() create_one_process.join() print("cheers")
Queue的功能是將每一個核或線程的運算結果放在隊裏中, 等到每一個線程或核運行完畢後再從隊列中取出結果, 繼續加載運算。緣由很簡單, 多線程調用的函數不能有返回值, 因此使用Queue存儲多個線程運算的結果。async
import multiprocessing as mp def job(q): res=0 for i in range(1000): res+=i+i**2+i**3 q.put(res) #queue if __name__=='__main__': q = mp.Queue() p1 = mp.Process(target=job,args=(q,)) p2 = mp.Process(target=job,args=(q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print(res1+res2)
進程池就是咱們將所要運行的東西,放到池子裏,Python會自行解決多進程的問題。 Pool和以前的Process的不一樣點是丟向Pool的函數有返回值,而Process的沒有返回值。ide
對於須要使用幾個甚至十幾個進程時,咱們使用Process仍是比較方便的,可是若是要成百上千個進程,用Process顯然太笨了,multiprocessing提供了Pool類,即如今要講的進程池,可以將衆多進程放在一塊兒,設置一個運行進程上限,每次只運行設置的進程數,等有進程結束,再添加新的進程。
Pool(processes =num):設置運行進程數,當一個進程運行完,會添加新的進程進去
apply_async(函數,(參數)):非阻塞,其中參數是tulpe類型,
apply(函數,(參數)):阻塞
close():關閉pool,不能再添加新的任務
terminate():結束運行的進程,再也不處理未完成的任務
join():和Process介紹的做用同樣, 但要在close或terminate以後使用。函數
import multiprocessing as mp def job(x): return x*x def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) if __name__ == '__main__': multicore()
#coding=utf-8 #Pool默認大小是CPU的核數,咱們也能夠經過在Pool中傳入processes參數便可自定義須要的核數量, import multiprocessing as mp def job(x): return x*x def multicore(): pool = mp.Pool(processes=3) # 定義CPU核數量爲3 res = pool.map(job, range(10)) print(res) if __name__ == '__main__': multicore()
#coding=utf-8 #apply_async()中只能傳遞一個值,它只會放入一個核進行運算,可是傳入值時要注意是可迭代的,因此在傳入值後須要加逗號, 同時須要用get()方法獲取返回值。 import multiprocessing as mp def job(x): return x*x def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get得到結果 print(res.get()) if __name__ == '__main__': multicore()
#coding=utf-8 #用 apply_async() 輸出多個結果 。 import multiprocessing as mp def job(x): return x*x def multicore(): pool = mp.Pool() res = pool.map(job, range(10)) print(res) res = pool.apply_async(job, (2,)) # 用get得到結果 print(res.get()) # 迭代器,i=0時apply一次,i=1時apply一次等等 multi_res = [pool.apply_async(job, (i,)) for i in range(10)] # 從迭代器中取出 print([res.get() for res in multi_res]) if __name__ == '__main__': multicore()
一、Pool默認調用是CPU的核數,傳入processes參數可自定義CPU核數
二、map() 放入迭代參數,返回多個結果
三、apply_async()只能放入一組參數,並返回一個結果,若是想獲得map()的效果須要經過迭代ui
#咱們能夠經過使用Value數據存儲在一個共享的內存表中。 import multiprocessing as mp value1 = mp.Value('i', 0) value2 = mp.Value('d', 3.14) #其中d和i參數用來設置數據類型的,d表示一個雙精浮點類型,i表示一個帶符號的整型。
#在Python的mutiprocessing中,有還有一個Array類,能夠和共享內存交互,來實如今進程之間共享數據。 array = mp.Array('i', [1, 2, 3, 4]) #這裏的Array和numpy中的不一樣,它只能是一維的,不能是多維的。一樣和Value 同樣,須要定義數據形式,不然會報錯。
各參數表明的數據類型 | Type code | C Type | Python Type | Minimum size in bytes | | --------- | ------------------ | ----------------- | --------------------- | | `'b'` | signed char | int | 1 | | `'B'` | unsigned char | int | 1 | | `'u'` | Py_UNICODE | Unicode character | 2 | | `'h'` | signed short | int | 2 | | `'H'` | unsigned short | int | 2 | | `'i'` | signed int | int | 2 | | `'I'` | unsigned int | int | 2 | | `'l'` | signed long | int | 4 | | `'L'` | unsigned long | int | 4 | | `'q'` | signed long long | int | 8 | | `'Q'` | unsigned long long | int | 8 | | `'f'` | float | float | 4 | | `'d'` | double | float | 8 |
爲了解決不一樣進程搶共享資源的問題,咱們能夠用加進程鎖來解決。spa
#coding=utf-8 import multiprocessing as mp import time def job(v, num, l): l.acquire() # 鎖住 for _ in range(5): time.sleep(0.1) v.value += num # 獲取共享內存 print(v.value) l.release() # 釋放 def multicore(): l = mp.Lock() # 定義一個進程鎖 v = mp.Value('i', 0) # 定義共享內存 p1 = mp.Process(target=job, args=(v,1,l)) # 須要將lock傳入 p2 = mp.Process(target=job, args=(v,3,l)) p1.start() p2.start() p1.join() p2.join() if __name__ == '__main__': multicore()