多進程multiprocessing

PACKAGE CONTENTS
    connection
    dummy (package)
    forking
    heap
    managers
    pool
    process
    queues
    reduction
    sharedctypes
    synchronize
    util
module package

the class frequently-used of the module:

一、Process

Process 類用來描述一個進程對象。建立子進程的時候,只須要傳入一個執行函數和函數的參數便可完成 Process 示例的建立。多線程

star() 方法啓動進程,
join() 方法實現進程間的同步,等待全部進程退出。
close() 用來阻止多餘的進程涌入進程池 Pool 形成進程阻塞。app

class Process(__builtin__.object)
     |  Process objects represent activity that is run in a separate process
     |  
     |  The class is analagous to `threading.Thread`
     |  
     |  Methods defined here:
     |  
     |  __init__(self, group=None, target=None, name=None, args=(), kwargs={})
     |  
     |  __repr__(self)
     |  
     |  is_alive(self)
     |      Return whether process is alive
     |  
     |  join(self, timeout=None)
     |      Wait until child process terminates
     |  
     |  run(self)
     |      Method to be run in sub-process; can be overridden in sub-class
     |  
     |  start(self)
     |      Start child process
     |  
     |  terminate(self)
     |      Terminate process; sends SIGTERM signal or uses TerminateProcess()
Process Class
#一、設置daemon都爲True
import multiprocessing
import time,datetime
def create_test_function(statement):
    time.sleep(2)
    print(statement)
if __name__ == "__main__":
    print("first")
    create_one_process = multiprocessing.Process(target=create_test_function,args=("good",))
    create_one_process.daemon = True
    create_one_process.start()
    print("second")
    create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",))
    create_two_process.daemon = True
    create_two_process.start()
    print("third")
'''
output:
           first
           second
           third
'''

#二、設置daemon都爲False
import multiprocessing
import time,datetime
def create_test_function(statement):
    time.sleep(2)
    print(statement)
if __name__ == "__main__":
    print("first")
    create_one_process = multiprocessing.Process(target=create_test_function,args=("good",))
    create_one_process.daemon = False
    create_one_process.start()
    print("second")
    create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",))
    create_two_process.daemon = False
    create_two_process.start()
    print("third")
'''
output:
           first
           second
           third
           good
           nice
'''

#三、設置daemon一個True一個False
import multiprocessing
import time,datetime
def create_test_function(statement):
    time.sleep(2)
    print(statement)
if __name__ == "__main__":
    print("first")
    create_one_process = multiprocessing.Process(target=create_test_function,args=("good",))
    create_one_process.daemon = True
    create_one_process.start()
    print("second")
    create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",))
    create_two_process.daemon = False
    create_two_process.start()
    print("third")
'''
output:
           first
           second
           third
           nice
'''

#四、設置daemon一個False一個True
import multiprocessing
import time,datetime

def create_test_function(statement):
    time.sleep(2)
    print(statement)
    
if __name__ == "__main__":
    print("first")
    create_one_process = multiprocessing.Process(target=create_test_function,args=("good",))
    create_one_process.daemon = False
    create_one_process.start()
    print("second")
    create_two_process = multiprocessing.Process(target=create_test_function,args=("nice",))
    create_two_process.daemon = True
    create_two_process.start()
    print("third")
'''
output:
        first
        second
        third
        good
'''    
設置daemon
import multiprocessing
import time,datetime
def create_test_function(statement):
    time.sleep(2)
    print(statement)   
if __name__ == "__main__":
    print("first")
    create_one_process = multiprocessing.Process(target=create_test_function,args=("1",))
    create_one_process.daemon = True
    create_one_process.start()
    create_one_process.join()
    print("second")
    create_two_process = multiprocessing.Process(target=create_test_function,args=("2",))
    create_two_process.daemon = True
    create_two_process.start()
    #create_two_process.join()
    print("third")
'''output:
first
1
second
third
'''

import multiprocessing
import time,datetime
def create_test_function(statement):
    time.sleep(2)
    print(statement)   
if __name__ == "__main__":
    print("first")
    create_one_process = multiprocessing.Process(target=create_test_function,args=("1",))
    create_one_process.daemon = True
    create_one_process.start()
    create_one_process.join()
    print("second")
    create_two_process = multiprocessing.Process(target=create_test_function,args=("2",))
    create_two_process.daemon = True
    create_two_process.start()
    create_two_process.join()
    print("third")
'''output:
first
1
second
2
third
'''
設置join
import multiprocessing
import time,datetime
def create_test_function(statement):
    time.sleep(2)
    print(statement)  
if __name__ == "__main__":
    for i in xrange(4):
        create_one_process = multiprocessing.Process(target=create_test_function,args=(i,))
        create_one_process.daemon = True
        create_one_process.start()
        create_one_process.join()
    print("cheers")

二、Queue

Queue的功能是將每一個核或線程的運算結果放在隊裏中, 等到每一個線程或核運行完畢後再從隊列中取出結果, 繼續加載運算。緣由很簡單, 多線程調用的函數不能有返回值, 因此使用Queue存儲多個線程運算的結果。async

import multiprocessing as mp

def job(q):
    res=0
    for i in range(1000):
        res+=i+i**2+i**3
    q.put(res)    #queue

if __name__=='__main__':
    q = mp.Queue()
    p1 = mp.Process(target=job,args=(q,))
    p2 = mp.Process(target=job,args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print(res1+res2)
put和get

三、Pool

進程池就是咱們將所要運行的東西,放到池子裏,Python會自行解決多進程的問題。 Pool和以前的Process的不一樣點是丟向Pool的函數有返回值,而Process的沒有返回值。ide

對於須要使用幾個甚至十幾個進程時,咱們使用Process仍是比較方便的,可是若是要成百上千個進程,用Process顯然太笨了,multiprocessing提供了Pool類,即如今要講的進程池,可以將衆多進程放在一塊兒,設置一個運行進程上限,每次只運行設置的進程數,等有進程結束,再添加新的進程。
Pool(processes =num):設置運行進程數,當一個進程運行完,會添加新的進程進去
apply_async(函數,(參數)):非阻塞,其中參數是tulpe類型,
apply(函數,(參數)):阻塞
close():關閉pool,不能再添加新的任務
terminate():結束運行的進程,再也不處理未完成的任務
join():和Process介紹的做用同樣, 但要在close或terminate以後使用。函數

import multiprocessing as mp

def job(x):
    return x*x
def multicore():
    pool = mp.Pool()
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()
map
#coding=utf-8
#Pool默認大小是CPU的核數,咱們也能夠經過在Pool中傳入processes參數便可自定義須要的核數量,
import multiprocessing as mp
def job(x):
    return x*x
def multicore():
    pool = mp.Pool(processes=3) # 定義CPU核數量爲3
    res = pool.map(job, range(10))
    print(res)
    
if __name__ == '__main__':
    multicore()
Pool
#coding=utf-8
#apply_async()中只能傳遞一個值,它只會放入一個核進行運算,可是傳入值時要注意是可迭代的,因此在傳入值後須要加逗號, 同時須要用get()方法獲取返回值。
import multiprocessing as mp
def job(x):
    return x*x
def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到結果
    print(res.get())
if __name__ == '__main__':
    multicore()
apply_async
#coding=utf-8
#用 apply_async() 輸出多個結果 。
import multiprocessing as mp
def job(x):
    return x*x
def multicore():
    pool = mp.Pool() 
    res = pool.map(job, range(10))
    print(res)
    res = pool.apply_async(job, (2,))
    # 用get得到結果
    print(res.get())
    # 迭代器,i=0時apply一次,i=1時apply一次等等
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    # 從迭代器中取出
    print([res.get() for res in multi_res])
if __name__ == '__main__':
    multicore()
apply_async(二)

一、Pool默認調用是CPU的核數,傳入processes參數可自定義CPU核數
二、map() 放入迭代參數,返回多個結果
三、apply_async()只能放入一組參數,並返回一個結果,若是想獲得map()的效果須要經過迭代ui

四、共享內存

#咱們能夠經過使用Value數據存儲在一個共享的內存表中。
import multiprocessing as mp
value1 = mp.Value('i', 0) 
value2 = mp.Value('d', 3.14)
#其中d和i參數用來設置數據類型的,d表示一個雙精浮點類型,i表示一個帶符號的整型。
Value
#在Python的mutiprocessing中,有還有一個Array類,能夠和共享內存交互,來實如今進程之間共享數據。

array = mp.Array('i', [1, 2, 3, 4])
#這裏的Array和numpy中的不一樣,它只能是一維的,不能是多維的。一樣和Value 同樣,須要定義數據形式,不然會報錯。
Array
各參數表明的數據類型
| Type code | C Type             | Python Type       | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'`     | signed char        | int               | 1                     |
| `'B'`     | unsigned char      | int               | 1                     |
| `'u'`     | Py_UNICODE         | Unicode character | 2                     |
| `'h'`     | signed short       | int               | 2                     |
| `'H'`     | unsigned short     | int               | 2                     |
| `'i'`     | signed int         | int               | 2                     |
| `'I'`     | unsigned int       | int               | 2                     |
| `'l'`     | signed long        | int               | 4                     |
| `'L'`     | unsigned long      | int               | 4                     |
| `'q'`     | signed long long   | int               | 8                     |
| `'Q'`     | unsigned long long | int               | 8                     |
| `'f'`     | float              | float             | 4                     |
| `'d'`     | double             | float             | 8                     |
各參數表明的數據類型

五、進程鎖

爲了解決不一樣進程搶共享資源的問題,咱們能夠用加進程鎖來解決。spa

#coding=utf-8
import multiprocessing as mp
import time
def job(v, num, l):
    l.acquire() # 鎖住
    for _ in range(5):
        time.sleep(0.1) 
        v.value += num # 獲取共享內存
        print(v.value)
    l.release() # 釋放

def multicore():
    l = mp.Lock() # 定義一個進程鎖
    v = mp.Value('i', 0) # 定義共享內存
    p1 = mp.Process(target=job, args=(v,1,l)) # 須要將lock傳入
    p2 = mp.Process(target=job, args=(v,3,l)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    multicore()
Lock
相關文章
相關標籤/搜索