python進階筆記【1】--- 多進程

有關於 multiprocessing 中共享變量的問題

如今的cpu都很強大,比方我用的至強2620有24核能夠同時工做,並行執行進程程序。這在計算密集型的程序是很須要的,如沙漠中的綠洲,使人重獲新生。那麼,問題接踵而來,python中多進程可否共享一個變量,由於我須要更新矩陣。python

個人辦法是用list存儲三元組信息,信息包括矩陣位置以及value。那麼首先咱們設定一個全局變量叫result_list就能夠了?安全

答案是NO.app

進程間共享變量就須要獨立開闢一塊內存空間或是文件共享,在python裏很方面,直接用一個模塊能夠解決這個問題,那就是 multiprocessing 裏的 Manager。固然,這是針對咱們須要的是list而言,若是咱們只是共享一個簡單的變量如一個整數,能夠直接用 multiprocessing 裏的 value。async

下面的實例是怎麼去共享變量。函數

from multiprocessing import Process, Manager, Lock
import os

lock = Lock()
manager = Manager()
sum = manager.list()


def testFunc(cc, lock):
    with lock:
        sum.append(1)


if __name__ == '__main__':
    threads = []

    for ll in range(1000):
        t = Process(target=testFunc, args=(1, lock))
        t.daemon = True
        threads.append(t)

    sum = manager.list()
    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print sum

很簡單,manager這個模塊實現了開闢一塊共享內存空間,就比如c中的 shmget 方法同樣,有興趣的同窗能夠去查閱。 傳送門spa

這樣簡單的處理並不能知足我。
首先,我須要一個線程池,固然,實現線程池也是很是簡單的。可是就會遇到一個問題。線程

lock = multiprocessing.Lock()  
pool = multiprocessing.Pool(processes=3)  
for i in range(0,3):  
    pool.apply_async(child_worker, ((my_parameter, lock),))  
pool.close()  
pool.join()

以上代碼執行時會出錯。code

RuntimeError: Lock objects should only be shared between processes through inheritance

查了下資料,multiprocessing.Manager()返回的manager對象控制了一個server進程,可用於多進程之間的安全通訊,其支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array等。 server

因此代碼修改爲這樣後就能夠正常運行了:對象

lock = multiprocessing.Manager().Lock()  
pool = multiprocessing.Pool(processes=3)  
for i in range(0,3):  
    pool.apply_async(child_worker, ((my_parameter, lock),))  
pool.close()  
pool.join()

因此,lock的問題解決了,真是厲害咱們如今能夠充分地(往死裏)用咱們的電腦了。

But,還不夠,我想要屢次執行這個並行化計算sum的函數。也就是說我須要每次去清空result_list的內容,這個但是一個很關鍵的細節,由於這個須要明白一個細節,你不能用sum = [] 這樣的方式去重置,我我的認爲是局部變量的緣由,我後來找到了del sum[:]的方法,解決了個人大問題,so,final version 以下。

from multiprocessing import Process, Manager,Pool
import os

lock = Manager().Lock()
manager = Manager()
sum = manager.list()


def testFunc(cc, lock):
    with lock:
        sum.append(1)

# 配合 multiprocessing pool 對多參數的要求添加的函數
def multi_test(args):
    testFunc(*args)

def testing():
    threads = []
    _pool = Pool(24)

    del sum[:]
    lst_vars = []
    for shot in range(1000):
        lst_vars.append((1,lock))
    _pool.map(multi_test, lst_vars)
    _pool.close()
    _pool.join()
    
    print "------------------------"
    print 'process id:', os.getpid()
    print sum
    
if __name__ == '__main__':
    testing()
    testing()

這些實例是我方便寫博客想的,其實我是在寫一個大工程遇到了這些個問題,忙的我焦頭爛額,可是總結出了人生經驗,但願幫到你,讓你多活幾年~~

相關文章
相關標籤/搜索