淺談 python multiprocessing(多進程)下如何共享變量

一、問題:

羣中有同窗貼了以下一段代碼,問爲什麼 list 最後打印的是空值?html

from multiprocessing import Process, Manager
import os

manager = Manager()
vip_list = []
#vip_list = manager.list()

def testFunc(cc):
    vip_list.append(cc)
    print 'process id:', os.getpid()

if __name__ == '__main__':
    threads = []

    for ll in range(10):
        t = Process(target=testFunc, args=(ll,))
        t.daemon = True
        threads.append(t)

    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print vip_list

其實若是你瞭解 python 的多線程模型,GIL 問題,而後瞭解多線程、多進程原理,上述問題不難回答,不過若是你不知道也不要緊,跑一下上面的代碼你就知道是什麼問題了。python

python aa.py
process id: 632
process id: 635
process id: 637
process id: 633
process id: 636
process id: 634
process id: 639
process id: 638
process id: 641
process id: 640
------------------------
process id: 619
[]

將第 6 行註釋開啓,你會看到以下結果:編程

process id: 32074
process id: 32073
process id: 32072
process id: 32078
process id: 32076
process id: 32071
process id: 32077
process id: 32079
process id: 32075
process id: 32080
------------------------
process id: 32066
[3, 2, 1, 7, 5, 0, 6, 8, 4, 9]

二、python 多進程共享變量的幾種方式:

(1)Shared memory:

Data can be stored in a shared memory map using Value or Array. For example, the following code多線程

http://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes併發

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

結果:app

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

(2)Server process:

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array.
代碼見開頭的例子。python2.7

http://docs.python.org/2/library/multiprocessing.html#managerside

三、多進程的問題遠不止這麼多:數據的同步

看段簡單的代碼:一個簡單的計數器:性能

from multiprocessing import Process, Manager
import os

manager = Manager()
sum = manager.Value('tmp', 0)

def testFunc(cc):
    sum.value += cc

if __name__ == '__main__':
    threads = []

    for ll in range(100):
        t = Process(target=testFunc, args=(1,))
        t.daemon = True
        threads.append(t)

    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print sum.value

結果:spa

------------------------
process id: 17378
97

也許你會問:WTF?其實這個問題在多線程時代就存在了,只是在多進程時代又杯具重演了而已:Lock!

from multiprocessing import Process, Manager, Lock
import os

lock = Lock()
manager = Manager()
sum = manager.Value('tmp', 0)


def testFunc(cc, lock):
    with lock:
        sum.value += cc


if __name__ == '__main__':
    threads = []

    for ll in range(100):
        t = Process(target=testFunc, args=(1, lock))
        t.daemon = True
        threads.append(t)

    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print sum.value

這段代碼性能如何呢?跑跑看,或者加大循環次數試一下。。。

再來看個多進程共享變量的例子:該腳本能夠在集羣中批量執行任意命令並返回結果。

#!/usr/bin/env python
# coding=utf-8
import sys

reload(sys)
sys.setdefaultencoding('utf-8')

import rpyc
from pyUtil import *
from multiprocessing import Pool as ProcessPool
from multiprocessing import Manager

hostDict = {
    '192.168.1.10': 11111
}

manager = Manager()
localResultDict = manager.dict()


def rpc_client(host_port_cmd):
    host = host_port_cmd[0]
    port = host_port_cmd[1]
    cmd = host_port_cmd[2]
    c = rpyc.connect(host, port)
    result = c.root.exposed_execCmd(cmd)
    localResultDict[host] = result
    c.close()


def exec_cmd(cmd_str):
    host_port_list = []
    for (host, port) in hostDict.items():
        host_port_list.append((host, port, cmd_str))

    pool = ProcessPool(len(hostDict))
    results = pool.map(rpc_client, host_port_list)
    pool.close()
    pool.join()
    for ip in localResultDict.keys():
        print ip + ":\t" + localResultDict[ip]

if __name__ == "__main__":

    if len(sys.argv) == 2 and sys.argv[1] != "-h":
        print "======================"
        print "    Your command is:\t" + sys.argv[1]
        print "======================"
        cmd_str = sys.argv[1]
    else:
        print "Cmd Error!"
        sys.exit(1)

    exec_cmd(cmd_str)

須要注意的是 manager.dict() 在遍歷時必定要使用 .keys() 方法,不然會拋異常:

Traceback (most recent call last):
  File "test.py", line 83, in <module>
    exec_cmd(cmd_str)
  File "test.py", line 57, in exec_cmd
    for ip in localResultDict:
  File "<string>", line 2, in __getitem__
  File "/opt/soft/python-2.7.10/lib/python2.7/multiprocessing/managers.py", line 774, in _callmethod
    raise convert_to_error(kind, result)
KeyError: 0

四、最後的建議:

Note that usually sharing data between processes may not be the best choice, because of all the synchronization issues; an approach involving actors exchanging messages is usually seen as a better choice. See also Python documentation: As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes. However, if you really do need to use some shared data then multiprocessing provides a couple of ways of doing so.

五、Refer:

[0] 理解Python併發編程一篇就夠了 - 線程篇

http://www.dongwm.com/archives/%E4%BD%BF%E7%94%A8Python%E8%BF%9B%E8%A1%8C%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B-%E7%BA%BF%E7%A8%8B%E7%AF%87/

[1] multiprocessing — Process-based 「threading」 interface

https://docs.python.org/2/library/multiprocessing.html

[2] Manager()出錯,不知道爲何

http://m.newsmth.net/article/Python/100915

[3] Can't iterate over multiprocessing.managers.DictProxy

http://bugs.python.org/issue9733

相關文章
相關標籤/搜索