Python 併發編程之進程、線程續

何時使用多線程

首先要明白:io操做不佔用CPU,計算佔用CPUhtml

Python多線程 不適合CPU密集操做型的任務,適合IO操做密集型的任務。python

 

multiprocessing模塊

python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了multiprocessing。linux

multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。shell

multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。數據庫

須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。編程

 

Process類的介紹

建立進程的類:json

from multiprocessing import Process
p = Process(target=None, name=None, args=(), kwargs={})    #由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

##強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號

 

參數介紹:windows

1 group參數未使用,值始終爲None
2 target表示調用對象,即子進程要執行的任務
3 args表示調用對象的位置參數元組,args=(1,2,'egon',)
4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
5 name爲子進程的名稱

 

方法介紹:數組

  • p.start():啓動進程,並調用該子進程中的p.run()
  • p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 
  • p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖p.is_alive():若是p仍然運行,返回True
  • p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程

 

屬性介紹:安全

  • p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
  • p.name:進程的名稱
  • p.pid:進程的pid
  • p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)
  • p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

 

Process類的使用

 注意:在windows中Process()必須放到# if __name__ == '__main__':下

Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. 
If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). 
This is the reason for hiding calls to Process() inside

if __name__ == "__main__"
since statements inside this if-statement will not get called upon import.
因爲Windows沒有fork,多處理模塊啓動一個新的Python進程並導入調用模塊。 
若是在導入時調用Process(),那麼這將啓動無限繼承的新進程(或直到機器耗盡資源)。 
這是隱藏對Process()內部調用的原,使用if __name__ == 「__main __」,這個if語句中的語句將不會在導入時被調用。
詳細解釋

 

一個簡單的多進程例子

from multiprocessing import Process
import time

def run(name):
    time.sleep(2)
    print('hello',name)

if __name__ == '__main__':
    for i in range(6):
        p = Process(target = run,args = ('dean',))    #必須加逗號
        p.start()

 

進程之間的內存空間是隔離的

from multiprocessing import Process
n=100    #在windows系統中應該把全局變量定義在if __name__ == '__main__'之上就能夠了
def work():
    global n
    n=0
    print('子進程內: ',n)


if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主進程內: ',n)

##運行結果:
主進程內:  100
子進程內:  0
View Code

 

Process對象的join方法

from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)


p=Piao('egon')
p.start()
p.join(0.0001) #等待p中止,等0.0001秒就再也不等了
print('開始')
join:主進程等,等待子進程結束
from multiprocessing import Process
import time
import random
def piao(name):
    print('%s is piaoing' %name)
    time.sleep(random.randint(1,3))
    print('%s is piao end' %name)

p1=Process(target=piao,args=('egon',))
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('yuanhao',))
p4=Process(target=piao,args=('wupeiqi',))

p1.start()
p2.start()
p3.start()
p4.start()

#有的同窗會有疑問:既然join是等待進程結束,那麼我像下面這樣寫,進程不就又變成串行的了嗎?
#固然不是了,必須明確:p.join()是讓誰等?
#很明顯p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,

#詳細解析以下:
#進程只要start就會在開始運行了,因此p1-p4.start()時,系統中已經有四個併發的進程了
#而咱們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵
#join是讓主線程等,而p1-p4仍然是併發執行的,p1.join的時候,其他p2,p3,p4仍然在運行,等#p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接經過檢測,無需等待
# 因此4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間
p1.join()
p2.join()
p3.join()
p4.join()

print('主線程')


#上述啓動進程與join進程能夠簡寫爲
# p_l=[p1,p2,p3,p4]
# 
# for p in p_l:
#     p.start()
# 
# for p in p_l:
#     p.join()
有了join,程序不就是串行了嗎???

Process對象的其餘方法或屬性(瞭解)

#進程對象的其餘方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s is piao end' %self.name)


p1=Piao('egon1')
p1.start()

p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
print(p1.is_alive()) #結果爲True

print('開始')
print(p1.is_alive()) #結果爲False
terminate與is_alive
from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Piao-1,
        #                    #因此加到這裏,會覆蓋咱們的self.name=name

        #爲咱們開啓的進程設置名字的作法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)

p=Piao('egon')
p.start()
print('開始')
print(p.pid) #查看pid
name與pid

殭屍進程與孤兒進程(瞭解)

參考博客:http://www.cnblogs.com/Anker/p/3271773.html

一:殭屍進程(有害)
  殭屍進程:一個進程使用fork建立子進程,若是子進程退出,而父進程並無調用wait或waitpid獲取子進程的狀態信息,那麼子進程的進程描述符仍然保存在系統中。這種進程稱之爲僵死進程。詳解以下

咱們知道在unix/linux中,正常狀況下子進程是經過父進程建立的,子進程在建立新的進程。子進程的結束和父進程的運行是一個異步過程,即父進程永遠沒法預測子進程到底何時結束,若是子進程一結束就馬上回收其所有資源,那麼在父進程內將沒法獲取子進程的狀態信息。

所以,UNⅨ提供了一種機制能夠保證父進程能夠在任意時刻獲取子進程結束時的狀態信息:
1、在每一個進程退出的時候,內核釋放該進程全部的資源,包括打開的文件,佔用的內存等。可是仍然爲其保留必定的信息(包括進程號the process ID,退出狀態the termination status of the process,運行時間the amount of CPU time taken by the process等)
二、直到父進程經過wait / waitpid來取時才釋放. 但這樣就致使了問題,若是進程不調用wait / waitpid的話,那麼保留的那段信息就不會釋放,其進程號就會一直被佔用,可是系統所能使用的進程號是有限的,若是大量的產生僵死進程,將由於沒有可用的進程號而致使系統不能產生新的進程. 此即爲殭屍進程的危害,應當避免。

  任何一個子進程(init除外)在exit()以後,並不是立刻就消失掉,而是留下一個稱爲殭屍進程(Zombie)的數據結構,等待父進程處理。這是每一個子進程在結束時都要通過的階段。若是子進程在exit()以後,父進程沒有來得及處理,這時用ps命令就能看到子進程的狀態是「Z」。若是父進程能及時 處理,可能用ps命令就來不及看到子進程的殭屍狀態,但這並不等於子進程不通過殭屍狀態。  若是父進程在子進程結束以前退出,則子進程將由init接管。init將會以父進程的身份對殭屍狀態的子進程進行處理。

二:孤兒進程(無害)

  孤兒進程:一個父進程退出,而它的一個或多個子進程還在運行,那麼那些子進程將成爲孤兒進程。孤兒進程將被init進程(進程號爲1)所收養,並由init進程對它們完成狀態收集工做。

  孤兒進程是沒有父進程的進程,孤兒進程這個重任就落到了init進程身上,init進程就好像是一個民政局,專門負責處理孤兒進程的善後工做。每當出現一個孤兒進程的時候,內核就把孤 兒進程的父進程設置爲init,而init進程會循環地wait()它的已經退出的子進程。這樣,當一個孤兒進程淒涼地結束了其生命週期的時候,init進程就會表明黨和政府出面處理它的一切善後工做。所以孤兒進程並不會有什麼危害。

咱們來測試一下(建立完子進程後,主進程所在的這個腳本就退出了,當父進程先於子進程結束時,子進程會被init收養,成爲孤兒進程,而非殭屍進程),文件內容

import os
import sys
import time

pid = os.getpid()
ppid = os.getppid()
print 'im father', 'pid', pid, 'ppid', ppid
pid = os.fork()
#執行pid=os.fork()則會生成一個子進程
#返回值pid有兩種值:
#    若是返回的pid值爲0,表示在子進程當中
#    若是返回的pid值>0,表示在父進程當中
if pid > 0:
    print 'father died..'
    sys.exit(0)

# 保證主線程退出完畢
time.sleep(1)
print 'im child', os.getpid(), os.getppid()

執行文件,輸出結果:
im father pid 32515 ppid 32015
father died..
im child 32516 1

看,子進程已經被pid爲1的init進程接收了,因此殭屍進程在這種狀況下是不存在的,存在只有孤兒進程而已,孤兒進程聲明週期結束天然會被init來銷燬。


三:殭屍進程危害場景:

  例若有個進程,它按期的產 生一個子進程,這個子進程須要作的事情不多,作完它該作的事情以後就退出了,所以這個子進程的生命週期很短,可是,父進程只管生成新的子進程,至於子進程 退出以後的事情,則一律漠不關心,這樣,系統運行上一段時間以後,系統中就會存在不少的僵死進程,假若用ps命令查看的話,就會看到不少狀態爲Z的進程。 嚴格地來講,僵死進程並非問題的根源,罪魁禍首是產生出大量僵死進程的那個父進程。所以,當咱們尋求如何消滅系統中大量的僵死進程時,答案就是把產生大 量僵死進程的那個元兇槍斃掉(也就是經過kill發送SIGTERM或者SIGKILL信號啦)。槍斃了元兇進程以後,它產生的僵死進程就變成了孤兒進 程,這些孤兒進程會被init進程接管,init進程會wait()這些孤兒進程,釋放它們佔用的系統進程表中的資源,這樣,這些已經僵死的孤兒進程 就能瞑目而去了。

四:測試
#一、產生殭屍進程的程序test.py內容以下

#coding:utf-8
from multiprocessing import Process
import time,os

def run():
    print('',os.getpid())

if __name__ == '__main__':
    p=Process(target=run)
    p.start()
    
    print('',os.getpid())
    time.sleep(1000)


#二、在unix或linux系統上執行
[root@vm172-31-0-19 ~]# python3  test.py &
[1] 18652
[root@vm172-31-0-19 ~]# 主 18652
子 18653

[root@vm172-31-0-19 ~]# ps aux |grep Z
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root     18653  0.0  0.0      0     0 pts/0    Z    20:02   0:00 [python3] <defunct> #出現殭屍進程
root     18656  0.0  0.0 112648   952 pts/0    S+   20:02   0:00 grep --color=auto Z

[root@vm172-31-0-19 ~]# top #執行top命令發現1zombie
top - 20:03:42 up 31 min,  3 users,  load average: 0.01, 0.06, 0.12
Tasks:  93 total,   2 running,  90 sleeping,   0 stopped,   1 zombie
%Cpu(s):  0.0 us,  0.3 sy,  0.0 ni, 99.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem :  1016884 total,    97184 free,    70848 used,   848852 buff/cache
KiB Swap:        0 total,        0 free,        0 used.   782540 avail Mem 

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                                                                                                        
root      20   0   29788   1256    988 S  0.3  0.1   0:01.50 elfin                                                                                                                      


#三、
等待父進程正常結束後會調用wait/waitpid去回收殭屍進程
但若是父進程是一個死循環,永遠不會結束,那麼該殭屍進程就會一直存在,殭屍進程過多,就是有害的
解決方法一:殺死父進程
解決方法二:對開啓的子進程應該記得使用join,join會回收殭屍進程
參考python2源碼註釋
class Process(object):
    def join(self, timeout=None):
        '''
        Wait until child process terminates
        '''
        assert self._parent_pid == os.getpid(), 'can only join a child process'
        assert self._popen is not None, 'can only join a started process'
        res = self._popen.wait(timeout)
        if res is not None:
            _current_process._children.discard(self)

join方法中調用了wait,告訴系統釋放殭屍進程。discard爲從本身的children中剔除

解決方法三:http://blog.csdn.net/u010571844/article/details/50419798
View Code

 

和以前學習的多線程結合在一塊兒使用,代碼以下:

import multiprocessing
import time,threading

def thread_run():
    print(threading.get_ident())    #這裏表示獲取線程id

def run(name):
    time.sleep(2)
    print('hello',name)
    t = threading.Thread(target = thread_run,)
    t.start()

if __name__ == '__main__':
    for i in range(6):
        p = multiprocessing.Process(target = run,args = ('dean',))
        p.start()

運行結果以下:

 

接着查看下面代碼:

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  #獲取父進程id
    print('process id:', os.getpid())       #獲取子進程id
    print("\n\n")


def f(name):
    info('\033[31;1mcalled from child process function f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')

運行結果以下:

 

咱們這裏能夠看到父進程id:7968,而且會發現不管程序運行多少次都是這個,而後咱們在windows任務管理器查看發現這個是pycharm的進程id,以下圖:

這裏要記住:每個子進程都是由父進程啓動的

咱們將上面代碼中if __name__=」__main__」進行修改,以下:

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    p = Process(target=f, args=('bob',))
    p.start()

運行結果以下:

 

守護進程

主進程建立守護進程

  其一:守護進程會在主進程代碼執行結束後就終止

  其二:守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children

注意:進程之間是互相獨立的,主進程代碼運行結束,守護進程隨即終止

from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)


p=Piao('egon')
p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
p.start()
print('')
View Code

 

進程同步鎖

進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,

而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理

part1:多個進程共享同一打印終端

#併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
from multiprocessing import Process
import os,time
def work():
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=work)
        p.start()

#執行結果:
6649 is running
6650 is running
6648 is running
6649 is done
6650 is done
6648 is done
併發運行,效率高,但競爭同一打印終端,帶來了打印錯亂
#由併發變成了串行,犧牲了運行效率,但避免了競爭
from multiprocessing import Process,Lock
import os,time
def work(lock):
    lock.acquire()
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,))
        p.start()

#執行結果:
6684 is running
6684 is done
6685 is running
6685 is done
6686 is running
6686 is done
加鎖:由併發變成了串行,犧牲了運行效率,但避免了競爭

 

part2:多個進程共享同一文件

文件當數據庫,模擬搶票

#文件db的內容爲:{"count":1}
#注意必定要用雙引號,否則json沒法識別
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db.txt'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db.txt'))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open('db.txt','w'))
        print('\033[43m購票成功\033[0m')

def task(lock):
    search()
    get()
if __name__ == '__main__':
    lock=Lock()
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()
併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
#文件db的內容爲:{"count":1}
#注意必定要用雙引號,否則json沒法識別
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db.txt'))
    print('\033[43m剩餘票數%s\033[0m' %dic['count'])

def get():
    dic=json.load(open('db.txt'))
    time.sleep(0.1) #模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模擬寫數據的網絡延遲
        json.dump(dic,open('db.txt','w'))
        print('\033[43m購票成功\033[0m')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(100): #模擬併發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()
加鎖:購票行爲由併發變成了串行,犧牲了運行效率,但保證了數據安全

 

總結:

#加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然能夠用文件共享數據實現進程間通訊,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.須要本身加鎖處理



#所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。
1 隊列和管道都是將數據存放於內存中
2 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,
咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。

 

進程間數據的交互,實現方法

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列(Queues)和管道(Pipe)。

注意:經過Queues和Pipe能夠實現進程間數據的傳遞,可是不能實現數據的共享

 

Queues(隊列)

建立隊列的類(底層就是以管道和鎖定的方式實現):

from multiprocessing import Queue
q = Queue(maxsize=0)    #建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。maxsize是隊列中容許最大項數,省略則無大小限制。

 

主要方法:

  • q.put( obj, block=True, timeout=None )
  • q.get( block=True, timeout=None )
  • q.get_nowait( )
  • q.put_nowait( obj )
  • q.empty( )
  • q.full( )
  • q.qsize( )
>>> from multiprocessing import Queue
>>> q = Queue(2)    #隊列中最多容許放2項
>>> q.put(1)    #插入數據到隊列中
>>> q.put(2)
>>> q.qsize()    #返回隊列的大小(元素的個數)。注:該結果不可靠,好比在返回大小的過程當中,若是隊列中又加入或取走了項目。
2
>>> q.full()    #若是隊列滿了,返回True,反之則返回False。注:該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
True
>>> q.put(3,timeout=2)    #若是隊列已滿,且block爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出queue.Full異常。
Traceback (most recent call last):
  File "<pyshell#20>", line 1, in <module>
    q.put(3,timeout=2)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 83, in put
    raise Full
queue.Full
>>> q.put(3,timeout=0)    #若是隊列已滿,且block爲True(默認值),而且timeout指定爲等於或小於0,則當即拋出queue.Full異常。
Traceback (most recent call last):
  File "<pyshell#21>", line 1, in <module>
    q.put(3,timeout=0)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 83, in put
    raise Full
queue.Full
>>> q.put(3,block=False)    #若是隊列已滿,且block爲False,則無論timeout爲什麼值,會當即拋出queue.Full異常。等同於上一個
Traceback (most recent call last):
  File "<pyshell#22>", line 1, in <module>
    q.put(3,block=False)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 83, in put
    raise Full
queue.Full
>>> q.put_nowait(3)    #同q.put(3,block=False)
Traceback (most recent call last):
  File "<pyshell#23>", line 1, in <module>
    q.put_nowait(3)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 83, in put
    raise Full
queue.Full
>>> q.put(3)    #若是隊列已滿,則會一直阻塞


>>> q.get()    #從隊列讀取而且刪除一個元素
1
>>> q.get()
2
>>> q.qsize()    #返回隊列的大小(元素的個數)
0
>>> q.empty()    #若是隊列爲空,返回True,反之則返回False。注:該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
True
>>> q.get(timeout=2)    #若是隊列爲空,且blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出queue.Empty異常。
Traceback (most recent call last):
  File "<pyshell#28>", line 1, in <module>
    q.get(timeout=2)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 105, in get
    raise Empty
queue.Empty
>>> q.get(timeout=0)    #若是隊列爲空,且blocked爲True(默認值),而且timeout指定爲等於或小於0,則當即拋出queue.Empty異常。
Traceback (most recent call last):
  File "<pyshell#29>", line 1, in <module>
    q.get(timeout=0)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 105, in get
    raise Empty
queue.Empty
>>> q.get(block=False)    #若是隊列爲空,且block爲False,則無論timeout爲什麼值,會當即拋出queue.Empty異常。等同於上一個
Traceback (most recent call last):
  File "<pyshell#30>", line 1, in <module>
    q.get(block=False)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 107, in get
    raise Empty
queue.Empty
>>> q.get_nowait()    #同q.get(block=False)
Traceback (most recent call last):
  File "<pyshell#31>", line 1, in <module>
    q.get_nowait()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 126, in get_nowait
    return self.get(False)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py", line 107, in get
    raise Empty
queue.Empty
>>> q.get()    #若是隊列爲空,則會一直阻塞
View Code

 

其餘方法(瞭解)

  • q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞
  • q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
  • q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲

 

使用方法和threading裏的queue差很少

先來看下線程之間的數據共享,代碼以下

import threading
import queue

def func():
    q.put([22,"dean",'hello'])

if __name__ == '__main__':
    q = queue.Queue()
    t = threading.Thread(target = func,)
    t.start()
    print(q.get(q))


#運行結果:
[22, 'dean', 'hello']

 

從上述代碼能夠看出線程之間的數據是共享的:主線程能夠訪問子線程放入的數據。

接下來看多進程之間,讓子進程調用父進程數據,以下所示:

from multiprocessing import Process
import queue

def func():
    q.put([22,"dean",'hello'])

if __name__ == '__main__':
    q = queue.Queue()    #注意這裏是一個線程q
    p = Process(target = func,)
    p.start()
    print(q.get())

在windows上運行結果:

注意:在linux上運行結果會卡住,這是爲何呢?

這是由於Windows和linux中父進程建立子進程的方式不同致使的!!!

在linux上,子進程是父進程的一個拷貝,子進程得到同父進程相同的數據,可是同父進程使用不一樣的數據段和堆棧段。而Windows上,子進程是一塊新的內存空間,裏面不包含父進程原有的數據。

 

咱們再次將代碼進行修改,寫f方法的時候直接將q給線程傳入,也就是,只有啓動線程,就自動傳入線程q,代碼以下:

from multiprocessing import Process
import queue

def func(q):
    q.put([22,"dean",'hello'])

if __name__ == '__main__':
    q = queue.Queue()    #注意這裏仍是線程q
    p = Process(target = func,args = (q,))
    p.start()
    print(q.get())

在Windows上運行結果:

注:在linux上運行結果仍是會卡住

 

從上述例子中咱們須要知道:進程不能訪問線程q

因此須要改爲進程q,代碼以下:

from multiprocessing import Process,Queue

def func(q):
    q.put([22,"dean",'hello'])

if __name__ == '__main__':
    q = Queue()    #注意這裏的q是進程q
    p = Process(target = func,args = (q,))
    p.start()
    print(q.get())

#運行結果:
[22, 'dean', 'hello']

 

此次咱們就發如今父進程裏就能夠調用到子進程放入的數據

這裏咱們須要明白:這裏的q實際上是被克隆了一個q,而後將子線程序列化的內容傳入到克隆q,而後再反序列化給q,從而實現了進程之間數據的傳遞

 

生產者消費者模型

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

 

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

 

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

 

基於隊列實現生產者消費者模型

from multiprocessing import Process,Queue
import time,random,os

def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res = '包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))

if __name__ == '__main__':
    q = Queue()
    #生產者們:即廚師們
    p1 = Process(target = producer,args = (q,))

    #消費者們:即吃貨們
    c1 = Process(target = consumer,args = (q,))

    #開始
    p1.start()
    c1.start()
    print('')

 

運行結果:

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環

from multiprocessing import Process,Queue
import time,random,os

def consumer(q):
    while True:
        res = q.get()
        if res is None: break   # 收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res = '包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))
    q.put(None)  # 發送結束信號

if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target = producer,args = (q,))

    # 消費者們:即吃貨們
    c1 = Process(target = consumer,args = (q,))

    #開始
    p1.start()
    c1.start()
    print('')
生產者在生產完畢後發送結束信號None

注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號

from multiprocessing import Process,Queue
import time,random,os

def consumer(q):
    while True:
        res = q.get()
        if res is None: break   # 收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res = '包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))

if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target = producer,args = (q,))

    # 消費者們:即吃貨們
    c1 = Process(target = consumer,args = (q,))

    #開始
    p1.start()
    c1.start()

    p1.join()
    q.put(None)
    print('')
主進程在生產者生產完畢後發送結束信號None

但上述解決方式,在有多個生產者和多個消費者時,咱們則須要用一個很low的方式去解決

from multiprocessing import Process,Queue
import time,random,os

def consumer(q):
    while True:
        res = q.get()
        if res is None: break   # 收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res = '%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))

if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target = producer,args = ('包子',q))
    p2 = Process(target = producer,args = ('骨頭',q))
    p3 = Process(target = producer,args = ('泔水',q))

    # 消費者們:即吃貨們
    c1 = Process(target = consumer,args = (q,))
    c2 = Process(target = consumer,args = (q,))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()   #必須保證生產者所有生產完畢,才應該發送結束信號
    p2.join()
    p3.join()
    q.put(None)     #有幾個消費者就應該發送幾回結束信號None
    q.put(None)     #發送結束信號
    print('')
有幾個消費者就須要發送幾回結束信號:至關low

 其實咱們的思路無非是發送結束信號而已,有另一種隊列提供了這種機制:

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

方法介紹:

JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:

  • q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
  • q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
from multiprocessing import Process,JoinableQueue
import time,random,os

def consumer(q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

        q.task_done()  # 向q.join()發送一次信號,證實一個數據已經被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res = '%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' % (os.getpid(), res))
    q.join()

if __name__ == '__main__':
    q = JoinableQueue()
    # 生產者們:即廚師們
    p1 = Process(target = producer,args = ('包子',q))
    p2 = Process(target = producer,args = ('骨頭',q))
    p3 = Process(target = producer,args = ('泔水',q))

    # 消費者們:即吃貨們
    c1 = Process(target = consumer,args = (q,))
    c2 = Process(target = consumer,args = (q,))
    c1.daemon = True    #設置成守護進程
    c2.daemon = True

    #開始
    p_l = [p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('')

    # 主進程等--->p1,p2,p3等---->c1,c2
    # p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
    # 於是c1,c2也沒有存在的價值了,應該隨着主進程的結束而結束,因此設置成守護進程
View Code

 

Pipe(管道)

進程間通訊(IPC)方式二:管道(不推薦使用,瞭解便可)

#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
#參數介紹:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
    conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 #其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
介紹
from multiprocessing import Process,Pipe

def f(conn ):
    conn.send([22,None,"hello from child"])
    conn.send([22,None,"hello from child2"])
    print(conn.recv())
    conn.close()

if __name__ == "__main__":
    left_conn,right_conn = Pipe()
    p = Process(target = f,args = (right_conn,))
    p.start()
    print(left_conn.recv())
    print(left_conn.recv())
    left_conn.send("我是left_conn")

#執行結果:
[22, None, 'hello from child']
[22, None, 'hello from child2']
我是left_conn

對上面代碼分析:pip()會生成兩個值,上面的left_conn和right_conn,這就如同一條網線的兩頭,兩頭均可以發送和接收數據

 

共享數據

即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合

經過消息隊列交換數據。這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中

進程間通訊應該儘可能避免使用本節所講的共享數據的方式

from multiprocessing import Manager,Process
import os

def f(d,l):
    d[1]="1"
    d["2"] = 2
    d[0.25] = None
    l.append(os.getpid())
    print(l)

if __name__ == "__main__":
    with Manager() as manager:  #這種方式和直接manager=Manager()同樣
        d = manager.dict()  #生成一個字典,能夠在多個進程間共享
        l = manager.list(range(5))  #生成一個列表,能夠在多個進程間共享
        p_list = []
        for i in range(10):
            p = Process(target=f,args=(d,l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()

        print(d)
        print(l)

運行結果以下:

 

進程池

在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。多進程是實現併發的手段之一,須要注意的問題是:

  1. 很明顯須要併發執行的任務一般要遠大於核數
  2. 一個操做系統不可能無限開啓進程,一般有幾個核就開幾個進程
  3. 進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行)

  例如當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。

建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程

from multiprocessing import Pool
p = Pool(processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None, context=None)    #建立進程池

參數介紹:

numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組

 

主要方法:

p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用

其餘方法(瞭解)

方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
View Code

應用:

from multiprocessing import Process,Pool
import time
import os

def Foo(i):
    time.sleep(2)
    print("in the process",os.getpid())
    return i + 100

def Bar(arg):
    print("--> exec done:",arg)


if __name__ == "__main__":
    pool = Pool(5)  #進程池中從無到有建立五個進程,之後一直是這五個進程在執行任務
    res_l = []
    for i in range(10):
        res = pool.apply(func=Foo, args=(i,))    #表示同步調用
        res_l.append(res)
    print(res_l)
    pool.close()
同步調用

如上運行結果發現程序變串行了

from multiprocessing import Process,Pool
import time
import os

def Foo(i):
    time.sleep(2)
    print("in the process",os.getpid())
    return i + 100

def Bar(arg):
    print("--> exec done:",arg)


if __name__ == "__main__":
    pool = Pool(5)  #進程池中從無到有建立五個進程,之後一直是這五個進程在執行任務
    res_l = []
    for i in range(10):
        res = pool.apply_async(func=Foo, args=(i,))   #表示異步調用,這時主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果
        res_l.append(res)
    for res in res_l:   #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
        print(res.get())
    pool.close()
    pool.join()     #調用join以前,先調用close函數,不然會出錯,close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束
異步調用
#一:使用進程池(異步調用,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res)
    print("==============================>") #沒有後面的join,或get,則程序總體結束,進程池中的任務還沒來得及所有執行完也都跟着主進程一塊兒結束了

    pool.close() #關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證實結果已經計算完畢,剩下的事情就是調用每一個對象下的get方法去獲取結果
    for i in res_l:
        print(i.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get

#二:使用進程池(同步調用,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另一個
    print("==============================>")
    pool.close()
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束

    print(res_l) #看到的就是最終的結果組成的列表
    for i in res_l: #apply是同步的,因此直接獲得結果,沒有get()方法
        print(i)
詳解:apply_async與apply

 

回調函數

須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

from multiprocessing import Process,Pool
import time
import os

def Foo(i):
    time.sleep(2)
    print("in the process",os.getpid())
    return i + 100

def Bar(arg):
    print("--> exec done:",arg)


if __name__ == "__main__":
    pool = Pool(5)  #進程池中從無到有建立五個進程,之後一直是這五個進程在執行任務
    res_l = []
    for i in range(10):
        res = pool.apply_async(func=Foo, args=(i,),callback = Bar)   #表示異步調用,這時主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果
        res_l.append(res)
    for res in res_l:   #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
        print(res.get())
    pool.close()
    pool.join()     #調用join以前,先調用close函數,不然會出錯,close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束

運行結果以下:

 

 下面將代碼進行修改,肯定回調函數是由子進程仍是主進程調用

from multiprocessing import Process,Pool
import time
import os

def Foo(i):
    time.sleep(2)
    print("in the process",os.getpid())
    return i + 100

def Bar(arg):
    print("--> exec done:",arg,os.getpid())


if __name__ == "__main__":
    pool = Pool(5)  #進程池中從無到有建立五個進程,之後一直是這五個進程在執行任務
    print('主進程:',os.pid())
    res_l = []
    for i in range(10):
        res = pool.apply_async(func=Foo, args=(i,),callback = Bar)   #表示異步調用,這時主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果
        res_l.append(res)
    for res in res_l:   #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
        print(res.get())
    pool.close()
    pool.join()     #調用join以前,先調用close函數,不然會出錯,close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束

運行結果以下,能夠看出回調函數的pid和主進程是同樣的

相關文章
相關標籤/搜索