python多進程控制學習

前言:html

python多進程,常常在使用,卻沒有怎麼系統的學習過,官網上面講得比較細,結合本身的學習,整理記錄下
官網:https://docs.python.org/3/library/multiprocessing.html

multiprocessing簡介

multiprocessing是python自帶的多進程模塊,能夠大批量的生成進程,在服務器爲多核CPU時效果更好,相似於threading模塊。相對於多線程,多進程因爲獨享內存空間,更穩定安全,在運維裏面作些批量操做時,多進程有更多適用的場景python

multiprocessing包提供了本地和遠程兩種併發操做,有效的避開了使用子進程而不是全局解釋鎖的線程,所以,multiprocessing能夠有效利用到多核處理編程

Process類

在multiporcessing中,經過Process類對象來批量產生進程,使用start()方法來啓動這個進程安全

1.語法

multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*)

group: 這個參數通常爲空,它只是爲了兼容threading.Tread
target: 這個參數就是經過run()可調用對象的方法,默認爲空,表示沒有方法被調用
name: 表示進程名
args: 傳給target調用方法的tuple(元組)參數
kwargs: 傳給target調用方法的dict(字典)參數

2.Process類的方法及對象

run()
該方法是進程的運行過程,能夠在子類中重寫此方法,通常也不多去重構服務器

start()
啓動進程,每一個進程對象都必須被該方法調用多線程

join([timeout])
等待進程終止,再往下執行,能夠設置超時時間併發

name
能夠獲取進程名字,多個進程也能夠是相同的名字app

is_alive()
返回進程是否還存活,True or False,進程存活是指start()開始到子進程終止運維

daemon
守護進程的標記,一個布爾值,在start()以後設置該值,表示是否後臺運行
注意:若是設置了後臺運行,那麼後臺程序不運行再建立子進程函數

pid
能夠獲取進程ID

exitcode
子進程退出時的值,若是進程尚未終止,值將是None,若是是負值,表示子進程被終止

terminate()
終止進程,若是是Windows,則使用terminateprocess(),該方法對已經退出和結束的進程,將不會執行

如下爲一個簡單的例子:

#-*- coding:utf8 -*- 
import multiprocessing
import time

def work(x):
   time.sleep(1)
   print time.ctime(),'這是子進程[{0}]...'.format(x)

if __name__ == '__main__':
    for i in range(5):
        p = multiprocessing.Process(target=work,args=(i,))
        print '啓動進程數:{0}'.format(i)
        p.start()
        p.deamon = True

clipboard.png

固然也能夠顯示每一個進程的ID

#-*- coding:utf8 -*- 
import multiprocessing
import time
import os

def work(x):
   time.sleep(1)
   ppid = os.getppid()
   pid  = os.getpid()
   print time.ctime(),'這是子進程[{0},父進程:{1},子進程:{2}]...'.format(x,ppid,pid)

if __name__ == '__main__':
    for i in range(5):
        p = multiprocessing.Process(target=work,args=(i,))
        print '啓動進程數:{0}'.format(i)
        p.start()
        p.deamon = True

clipboard.png

但在實際使用的過程當中,並不僅是併發完就能夠了,好比,有30個任務,因爲服務器資源有限,每次併發5個任務,這裏還涉及到30個任務怎麼獲取的問題,另外併發的進程任務執行時間很難保證一致,尤爲是須要時間的任務,可能併發5個任務,有3個已經執行完了,2個還須要很長時間執行,總不能等到這兩個進程執行完了,再繼續執行後面的任務,所以進程控制就在此有了使用場景,能夠利用Process的方法和一些multiprocessing的包,類等結合使用

進程控制及通訊經常使用類

1、Queue類

相似於python自帶的Queue.Queue,主要用在比較小的隊列上面
語法:

multiprocessing.Queue([maxsize])

類方法:
qsize()
返回隊列的大體大小,由於多進程或者多線程一直在消耗隊列,所以該數據不必定正確

empty()
判斷隊列是否爲空,若是是,則返回True,不然False

full()
判斷隊列是否已滿,若是是,則返回True,不然False

put(obj[, block[, timeout]])
將對象放入隊列,可選參數block爲True,timeout爲None

get()
從隊列取出對象

#-*- coding:utf8 -*-
from multiprocessing import Process, Queue

def f(q):
    q.put([42,None,'hi'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()  #打印內容: [42,None,'hi']
    p.join()

2、Pipe類

pipe()函數返回一對對象的鏈接,能夠爲進程間傳輸消息,在打印一些日誌、進程控制上面有一些用處,Pip()對象返回兩個對象connection,表明兩個通道,每一個connection對象都有send()和recv()方法,須要注意的是兩個或以上的進程同時讀取或者寫入同一管道,可能會致使數據混亂,測試了下,是直接覆蓋了。另外,返回的兩個connection,若是一個是send()數據,那麼另一個就只能recv()接收數據了

#-*- coding:utf8 -*-
from multiprocessing import Process, Pipe
import time
def f(conn,i):
    print '[{0}]已經執行到子進程:{1}'.format(time.ctime(),i)
    time.sleep(1)
    w = "[{0}]hi,this is :{1}".format(time.ctime(),i)
    conn.send(w)
    conn.close()

if __name__ == '__main__':
    reader = []
    parent_conn, child_conn = Pipe()
    for i in range(4):
        p = Process(target=f, args=(child_conn,i))
        p.start()
        reader.append(parent_conn)
        p.deamon=True

    # 等待全部子進程跑完
    time.sleep(3)
    print '\n[{0}]下面打印child_conn向parent_conn傳輸的信息:'.format(time.ctime())
    for i in reader:
        print i.recv()

輸出爲:
clipboard.png

3、Value,Array

在進行併發編程時,應儘可能避免使用共享狀態,由於多進程同時修改數據會致使數據破壞。但若是確實須要在多進程間共享數據,multiprocessing也提供了方法Value、Array

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d',0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

*print
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]*

4、Manager進程管理模塊

Manager類管理進程使用得較多,它返回對象能夠操控子進程,而且支持不少類型的操做,如: list, dict, Namespace、lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value, Array,所以使用Manager基本上就夠了

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join() #等待進程結束後往下執行
        print d,'\n',l

輸出:
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
能夠看到,跟共享數據同樣的效果,大部分管理進程的方法都集成到了Manager()模塊了

5、對多進程控制的應用實例

#-*- coding:utf8 -*-
    from multiprocessing import Process, Queue
    import time
    
    def work(pname,q):
        time.sleep(1)
        print_some = "{0}|this is process: {1}".format(time.ctime(),pname)
        print print_some
        q.put(pname)
    
    if __name__ == '__main__':
        p_manag_num = 2  # 進程併發控制數量2
        # 併發的進程名
        q_process = ['process_1','process_2','process_3','process_4','process_5']
        q_a = Queue() # 將進程名放入隊列
        q_b = Queue() # 將q_a的進程名放往q_b進程,由子進程完成
    
        for i in q_process:
            q_a.put(i)
    
        p_list = [] # 完成的進程隊列
        while not q_a.empty():
            if len(p_list) <= 2:
                pname=q_a.get()
                p = Process(target=work, args=(pname,q_b))
                p.start()
                p_list.append(p)
                print pname
    
            for p in p_list:
                if not p.is_alive():
                    p_list.remove(p)
    
        # 等待5秒,預估執行完後看隊列通訊信息
        # 固然也能夠循環判斷隊列裏面的進程是否執行完成
        time.sleep(5)
        print '打印p_b隊列:'
        while not q_b.empty():
            print q_b.get()

執行結果:

clipboard.png

相關文章
相關標籤/搜索