進程間通訊——隊列和管道(multiprocess.Queue、multiprocess.Pipe)

進程:

  以前咱們已經瞭解了操做系統中進程的概念,程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。在多道編程中,咱們容許多個程序同時加載到內存中,在操做系統的調度下,能夠實現併發地執行。這是這樣的設計,大大提升了CPU的利用率。進程的出現讓每一個用戶感受到本身獨享CPU,所以,進程就是爲了在CPU上實現多道編程而提出的。html

 

進程間通訊  

用Queue模塊:

IPC(Inter-Process Communication)python

隊列 

概念介紹

隊列是先進先出git

必須put放進東西后  才能get來取值github

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。 數據庫

Queue([maxsize]) 
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。

 

Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。


q.empty() 
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。
方法介紹

 

隊列中的進程的內容是共享的  由於不一樣的進程的數據是隔離的  咱們能夠用隊列  讓他們之間的數據進行共享編程

在進程中使用隊列能夠完成雙向通訊json

 

from multiprocessing import Process ,Queue
q = Queue(10)
try:
    q.get_nowwait()  # 若是你用 nowwait的話你的獲取嗯u過沒有就不會阻塞就會報錯
except:
    print('queue.Empty')
q.get()
for i in range(10):
    q.get(i)

print(q.qsize(10))

from  multiprocessing import Process ,Queue
q = Queue(10) # 建立一個能夠存放10個值的隊列
# try:
#     q.get_nowwait()
# except:
#     print('queue.Empty')
#
# q.get()


for  i in range(10):
    q.put(i)
print(q.qsize())  # 獲取你的隊列能夠存放的最大值
print(q.full()) # 斷定是否是滿了  返回的值布爾值
# q.put(111)  # 給這個隊列放進值
# print(q.grt())
print('*'*10)
print(q.empty()) # 斷定隊列是否是爲空 返回的也是布爾值
 

 

 

生產者消費者模型
解決數據供需不平衡的狀況
隊列是進程安全的 內置了鎖來保證隊列中的每個數據都不會被多個進程重複取
import time
import random
from multiprocessing import Process,Queue
生產者消費者模型
解決數據供需不平衡的狀況
隊列是進程安全的 內置了鎖來保證隊列中的每個數據都不會被多個進程重複取
def consumer(q,name):
    while True:
        food = q.get()
        if food == 'done':break
        time.sleep(random.random())
        print('%s吃了%s'%(name,food))

def producer(q,name,food):
    for i in range(10):
        time.sleep(random.random())
        print('%s生產了%s%s'%(name,food,i))
        q.put('%s%s'%(food,i))

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=[q,'Egon','泔水'])
    p2 = Process(target=producer,args=[q,'Yuan','骨頭魚刺'])
    p1.start()
    p2.start()
    Process(target=consumer,args=[q,'alex']).start()
    Process(target=consumer,args=[q,'wusir']).start()
    p1.join()
    p2.join()
    q.put('done')
    q.put('done')
生產者消費者來進性隊列的安全

 

import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(q,name):
    while True:
        food = q.get()
        time.sleep(random.random())
        print('%s吃了%s'%(name,food))
        q.task_done()

def producer(q,name,food):
    for i in range(10):
        time.sleep(random.random())
        print('%s生產了%s%s'%(name,food,i))
        q.put('%s%s'%(food,i))
    q.join()   # 等到全部的數據都被taskdone才結束

if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=[q,'Egon','泔水'])
    p2 = Process(target=producer,args=[q,'Yuan','骨頭魚刺'])
    p1.start()
    p2.start()
    c1 = Process(target=consumer,args=[q,'alex'])
    c2 = Process(target=consumer,args=[q,'wusir'])
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()

# producer
    # put
    # 生產徹底部的數據就沒有其餘工做了
    # 在生產數據方 : 容許執行q.join
    # join會發起一個阻塞,直到全部當前隊列中的數據都被消費
# consumer
    # get 獲取到數據
    # 處理數據
    # q.task_done()  告訴q,剛剛從q獲取的數據已經處理完了

# consumer每完成一個任務就會給q發送一個taskdone
# producer在全部的數據都生產完以後會執行q.join()
# producer會等待consumer消費完數據才結束
# 主進程中對producer進程進行join
# 主進程中的代碼會等待producer執行完才結束
# producer結束就意味着主進程代碼的結束
# consumer做爲守護進程結束

# consumer中queue中的全部數據被消費
# producer join結束
# 主進程的代碼結束
# consumer結束
# 主進程結束
例子

 

 

Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。


q.empty() 
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。

 

JoinableQueue([maxsize]) 
建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。 數組

JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法:

q.task_done() 
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。

q.join() 
生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。 
下面的例子說明如何創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。
方法介紹

 

from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
        q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了

def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    q.join() #生產完畢,使用此方法進行阻塞,直到隊列中全部項目均被處理。


if __name__ == '__main__':
    q=JoinableQueue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))
    c1.daemon=True
    c2.daemon=True

    #開始
    p_l=[p1,p2,p3,c1,c2]
    for p in p_l:
        p.start()

    p1.join()
    p2.join()
    p3.join()
    print('') 
    
    #主進程等--->p1,p2,p3等---->c1,c2
    #p1,p2,p3結束了,證實c1,c2確定全都收完了p1,p2,p3發到隊列的數據
    #於是c1,c2也沒有存在的價值了,不須要繼續阻塞在進程中影響主進程了。應該隨着主進程的結束而結束,因此設置成守護進程就能夠了。
JoinableQueue隊列實現消費之生產者模型

 

 

管道:

管道是雙向通訊,數據進程不安全,隊列是管道加鎖來實現的安全

#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
#參數介紹:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
    conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 #其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
介紹
from multiprocessing import Process, Pipe


def f(conn):
    conn.send("Hello The_Third_Wave")
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()
pipe初介紹

應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。 服務器

 

# 管道
# from multiprocessing import Pipe
# left,right = Pipe()
# left.send('1234')
# print(right.recv())
# left.send('1234')
# print(right.recv())
管道的信息發送接收信息是不須要進行編碼轉碼的
from multiprocessing import Process, Pipe

def f(parent_conn,child_conn):
    parent_conn.close() #不寫close將不會引起EOFError
    while True:
        try:
            print(child_conn.recv())
        except EOFError:
            child_conn.close()
            break

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(parent_conn,child_conn,))
    p.start()
    child_conn.close()
    parent_conn.send('hello')
    parent_conn.send('hello')
    parent_conn.send('hello')
    parent_conn.close()
    p.join()

 

進程之間的數據共享

展望將來,基於消息傳遞的併發編程是大勢所趨

即使是使用線程,推薦作法也是將程序設計爲大量獨立的線程集合,經過消息隊列交換數據。

這樣極大地減小了對使用鎖定和其餘同步手段的需求,還能夠擴展到分佈式系統中。

但進程間應該儘可能避免通訊,即使須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題。

之後咱們會嘗試使用數據庫來解決如今進程之間的數據共享問題。

進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的
雖然進程間數據獨立,但能夠經過Manager實現數據共享,事實上Manager的功能遠不止於此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.
Manager模塊介紹
from multiprocessing import Manager,Process,Lock
def work(d,lock):
    with lock: #不加鎖而操做共享的數據,確定會出現數據錯亂
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(100):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)
Manager例子
from multiprocessing import Manager,Process,Lock
def func(dic,lock):
    # lock.acquire()
    # dic['count'] = dic['count']-1
    # lock.release()
    with lock:    # 上下文管理 :必須有一個開始動做 和 一個結束動做的時候
        dic['count'] = dic['count'] - 1

if __name__ == '__main__':
    m = Manager()
    lock = Lock()
    dic = m.dict({'count':100})
    p_lst = []
    for i in range(100):
        p = Process(target=func,args=[dic,lock])
        p_lst.append(p)
        p.start()
    for p in p_lst:p.join()
    print(dic)

# 同一臺機器上 : Queue
# 在不一樣臺機器上 :消息中間件

 

 

進程池和multiprocess.Pool模塊

進程池

 

爲何要有進程池?進程池的概念。

 

在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?

 

在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。

multiprocess.Pool模塊

Pool([numprocess  [,initializer [, initargs]]]):建立進程池

概念介紹

1 numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
2 initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
3 initargs:是要傳給initializer的參數組
參數介紹
1 p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
2 '''須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()'''
3 
4 p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。
5 '''此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。'''
6    
7 p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
8 
9 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
主要方法
1 方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
2 obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
3 obj.ready():若是調用完成,返回True
4 obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
5 obj.wait([timeout]):等待結果變爲可用。
6 obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
其餘方法

 

 '''
一 進程池與線程池
在剛開始學多進程或多線程時,咱們火燒眉毛地基於多進程或多線程實現併發的套接字通訊,
然而這種實現方式的致命缺陷是:服務的開啓的進程數或線程數都會隨着併發的客戶端數目地增多而增多,
這會對服務端主機帶來巨大的壓力,甚至於不堪重負而癱瘓,因而咱們必須對服務端開啓的進程數或線程數加以控制,
讓機器在一個本身能夠承受的範圍內運行,這就是進程池或線程池的用途,
例如進程池,就是用來存放進程的池子,本質仍是基於多進程,只不過是對開啓進程的數目加上了限制
''' 
進程池通常就是運用在服務端口的,好比12306官網就是這樣的,若是不進行限定的話 同時無限個進程一塊兒來訪問的話就會引發服務器的崩潰的


同步和異步
import os,time
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程當中可能有阻塞也可能沒有阻塞
                                    # 但無論該任務是否存在阻塞,同步調用都會在原地等着
    print(res_l)
進程池的同步調用

 

import os
import time
import random
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(random.random())
    return n**2

if __name__ == '__main__':
    p=Pool(3) #進程池中從無到有建立三個進程,之後一直是這三個進程在執行任務
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 異步運行,根據進程池中有的進程數,每次最多3個子進程在異步執行
                                          # 返回結果以後,將結果放入列表,歸還進程,以後再執行新的任務
                                          # 須要注意的是,進程池中的三個進程不會同時開啓或者同時結束
                                          # 而是執行完一個就釋放一個進程,這個進程就去接收新的任務。  
        res_l.append(res)

    # 異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果
    # 不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
進程池的異步調用

 

import time
import random
from multiprocessing import Pool
def func(i):
    print('func%s' % i)
    time.sleep(random.randint(1,3))
    return i**2
if __name__ == '__main__':
    p = Pool(5)
    ret_l = []
    for i in range(15):
        # p.apply(func=func,args=(i,))    # 同步調用
        ret = p.apply_async(func=func,args=(i,))# 異步調用
        ret_l.append(ret)
    for ret in ret_l : print(ret.get())
    # 主進程和全部的子進程異步了

 

 

回調函數:

須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def pasrse_page(res):
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l]) #拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給回調函數處理了

'''
打印結果:
<進程3388> get https://www.baidu.com
<進程3389> get https://www.python.org
<進程3390> get https://www.openstack.org
<進程3388> get https://help.github.com/
<進程3387> parse https://www.baidu.com
<進程3389> get http://www.sina.com.cn/
<進程3387> parse https://www.python.org
<進程3387> parse https://help.github.com/
<進程3387> parse http://www.sina.com.cn/
<進程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
'''
使用多進程請求多個url來減小網絡等待浪費的時間
import os
from urllib.request import urlopen
from multiprocessing import Pool
def get_url(url):
    print('-->',url,os.getpid())
    ret = urlopen(url)
    content = ret.read()
    return url

def call(url):
    # 分析
    print(url,os.getpid())

if __name__ == '__main__':
    print(os.getpid())
    l = [
        'http://www.baidu.com',  # 5
        'http://www.sina.com',
        'http://www.sohu.com',
        'http://www.sogou.com',
        'http://www.qq.com',
        'http://www.bilibili.com',  #0.1
    ]
    p = Pool(5)   # count(cpu)+1
    ret_l = []
    for url in l:
        ret = p.apply_async(func = get_url,args=[url,],callback=call)
        ret_l.append(ret)
    for ret in ret_l : ret.get()


# 回調函數
# 在進程池中,起了一個任務,這個任務對應的函數在執行完畢以後
# 的返回值會自動做爲參數返回給回調函數
# 回調函數就根據返回值再進行相應的處理

# 回調函數 是在主進程執行的
相關文章
相關標籤/搜索