【Python3之多進程】

1、進程和線程的簡單解釋

進程(process)和線程(thread)是操做系統的基本概念,可是它們比較抽象,不容易掌握。python

用生活舉例:數據庫

(轉自阮一峯網絡日誌)編程

1.計算機的核心是CPU,它承擔了全部的計算任務。它就像一座工廠,時刻在運行。
2.假定工廠的電力有限,一次只能供給一個車間使用。也就是說,一個車間開工的時候,其餘車間都必須停工。背後的含義就是,單個CPU一次只能運行一個任務。
3.進程就比如工廠的車間,它表明CPU所能處理的單個任務。任一時刻,CPU老是運行一個進程,其餘進程處於非運行狀態。
4.一個車間裏,能夠有不少工人。他們協同完成一個任務。
5.線程就比如車間裏的工人。一個進程能夠包括多個線程。
6.車間的空間是工人們共享的,好比許多房間是每一個工人均可以進出的。這象徵一個進程的內存空間是共享的,每一個線程均可以使用這些共享內存。
7.但是,每間房間的大小不一樣,有些房間最多隻能容納一我的,好比廁所。裏面有人的時候,其餘人就不能進去了。這表明一個線程使用某些共享內存時,其餘線程必須等它結束,才能使用這一塊內存。
8.一個防止他人進入的簡單方法,就是門口加一把鎖。先到的人鎖上門,後到的人看到上鎖,就在門口排隊,等鎖打開再進去。這就叫互斥鎖(Mutual exclusion,縮寫 Mutex),防止多個線程同時讀寫某一塊內存區域。
9.還有些房間,能夠同時容納n我的,好比廚房。也就是說,若是人數大於n,多出來的人只能在外面等着。這比如某些內存區域,只能供給固定數目的線程使用。
10.這時的解決方法,就是在門口掛n把鑰匙。進去的人就取一把鑰匙,出來時再把鑰匙掛回原處。後到的人發現鑰匙架空了,就知道必須在門口排隊等着了。這種作法叫作信號量(Semaphore),用來保證多個線程不會互相沖突。
  不難看出,mutex是semaphore的一種特殊狀況(n=1時)。也就是說,徹底能夠用後者替代前者。可是,由於mutex較爲簡單,且效率高,因此在必須保證資源獨佔的狀況下,仍是採用這種設計。

11.操做系統的設計,所以能夠歸結爲三點:
(1)以多進程形式,容許多個任務同時運行;
(2)以多線程形式,容許單個任務分紅不一樣的部分運行;
(3)提供協調機制,一方面防止進程之間和線程之間產生衝突,另外一方面容許進程之間和線程之間共享資源。

 

2、python併發編程之多進程

一、multiprocessing模塊介紹

python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python提供了很是好用的多進程包multiprocessing。json

multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。windows

multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。數組

須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。安全

 

二、Process類的介紹

  • 建立進程的類
Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)

強調:
1. 須要使用關鍵字的方式來指定參數
2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
  • 參數介紹
group參數未使用,值始終爲None

target表示調用對象,即子進程要執行的任務

args表示調用對象的位置參數元組,args=(1,2,'hexin',)

kwargs表示調用對象的字典,kwargs={'name':'hexin','age':18}

name爲子進程的名稱
  • 方法介紹
p.start():啓動進程,並調用該子進程中的p.run() 
p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  

p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
p.is_alive():若是p仍然運行,返回True

p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
  • 屬性介紹
p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置

p.name:進程的名稱

p.pid:進程的pid

p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可)

p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功(瞭解便可)

 

三、Process類的使用

  •  建立並開啓子進程的兩種方式

方法1服務器

import time
import random
from multiprocessing import Process
def piao(name):
    print('%s piao' %name)
    time.sleep(random.randrange(1,5))
    print('%s piao end' %name)



p1=Process(target=piao,args=('e',)) #必須加,號
p2=Process(target=piao,args=('a',))
p3=Process(target=piao,args=('w',))
p4=Process(target=piao,args=('y',))

p1.start()
p2.start()
p3.start()
p4.start()
print('主線程')

輸出網絡

e piao
主線程
a piao
w piao
y piao
e piao end
y piao end
a piao end
w piao end

 

方法2多線程

import time
import random
from multiprocessing import Process


class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s piaoing' %self.name)

        time.sleep(random.randrange(1,5))
        print('%s piao end' %self.name)

p1=Piao('e')
p2=Piao('a')
p3=Piao('w')
p4=Piao('y')

p1.start() #start會自動調用run
p2.start()
p3.start()
p4.start()
print('主線程')

輸出

e piaoing
主線程
a piaoing
w piaoing
y piaoing
e piao end
a piao end
y piao end
w piao end

注意:在windows中Process()必須放到# if __name__ == '__main__':下

 

  • Process對象的其餘方法或屬性
#進程對象的其餘方法一:terminate,is_alive
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s is piao end' %self.name)


p1=Piao('e1')
p1.start()

p1.terminate()#關閉進程,不會當即關閉,因此is_alive馬上查看的結果可能仍是存活
print(p1.is_alive()) #結果爲True

print('開始')
print(p1.is_alive()) #結果爲False

輸出

True
開始
False

 

#進程對象的其餘方法二:p.daemon=True,p.join
from multiprocessing import Process
import time
import random

class Piao(Process):
    def __init__(self,name):
        self.name=name
        super().__init__()
    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)


p=Piao('e')
p.daemon=True #必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程死,p跟着一塊兒死
p.start()
p.join(0.0001) #等待p中止,等0.0001秒就再也不等了
print('開始')

輸出

Piao-1 is piaoing
開始

注意:p.join(),是父進程在等p的結束,是父進程阻塞在原地,而p仍然在後臺運行

 

  • 進程對象的其餘屬性:name,pid
from multiprocessing import Process
import time
import random
class Piao(Process):
    def __init__(self,name):
        # self.name=name
        # super().__init__() #Process的__init__方法會執行self.name=Piao-1,
        #                    #因此加到這裏,會覆蓋咱們的self.name=name

        #爲咱們開啓的進程設置名字的作法
        super().__init__()
        self.name=name

    def run(self):
        print('%s is piaoing' %self.name)
        time.sleep(random.randrange(1,3))
        print('%s is piao end' %self.name)

p=Piao('e')
p.start()
print('開始')
print(p.pid) #查看pid

 

4.進程同步(鎖)

進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的。

共享同一打印終端,發現會有多行內容打印到一行的現象(多個進程共享並搶佔同一個打印終端,亂了)

既然能夠用文件共享數據,那麼進程間通訊用文件做爲數據傳輸介質就能夠了啊,能夠,可是有問題:1.效率 2.須要本身加鎖處理

 

加鎖的目的是爲了保證多個進程修改同一塊數據時,同一時間只能有一個修改,即串行的修改,沒錯,速度是慢了,犧牲了速度而保證了數據安全。

 

文件當作數據庫,模擬搶票(Lock互斥鎖)

#!/usr/bin/env python
# -*- coding:utf-8 -*-

#文件db的內容爲:{"count":2}
#注意必定要用雙引號,否則json沒法識別
from multiprocessing import Process,Lock
import json
import time
import random
import os

def work(filename,lock): #買票
    # lock.acquire()
    with lock:
        with open(filename,encoding='utf-8') as f:
            dic=json.loads(f.read())
            # print('剩餘票數: %s' % dic['count'])
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(random.randint(1,3)) #模擬網絡延遲
            with open(filename,'w',encoding='utf-8') as f:
                f.write(json.dumps(dic))
            print('%s 購票成功' %os.getpid())
        else:
            print('%s 購票失敗' %os.getpid())
    # lock.release()

if __name__ == '__main__':
    lock=Lock()
    p_l=[]
    for i in range(10):
        p=Process(target=work,args=('db',lock))
        p_l.append(p)
        p.start()
    for p in p_l:
        p.join()

    print('主線程')

輸出

7932 購票成功
7933 購票成功
7934 購票失敗
7935 購票失敗
7936 購票失敗
7937 購票失敗
7938 購票失敗
7939 購票失敗
7940 購票失敗
7941 購票失敗
主線程

 

 

3、進程間的通訊

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。

 

1.進程間通訊(IPC)方式一:隊列(推薦使用)

隊列先進先出,棧後進先出

建立隊列的類(底層就是以管道和鎖定的方式實現):

Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。

參數介紹

maxsize是隊列中容許最大項數,省略則無大小限制。

方法介紹:

q.put方法用以插入數據到隊列中
put方法還有兩個可選參數:blocked和timeout。
若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。
若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。

q.get方法能夠從隊列讀取而且刪除一個元素。
get方法有兩個可選參數:blocked和timeout。
若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。
若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
 
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣

 

'''
multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,可是隊列接口
'''

from multiprocessing import Process,Queue
import time
q=Queue(3)


#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了

輸出

True
3
3
3
True

 

2.生產者消費者模型

  • 什麼是生產者消費者模式?

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

  • 爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

 

  • 基於隊列實現生產者消費者模型
from multiprocessing import Process,Queue
import time,random,os


def consumer(q):
    while True:
        time.sleep(random.randint(1,3))
        res=q.get()
        if res is None:break
        print('\033[45m消費者拿到了:%s\033[0m' %res)

def producer(seq,q):
    for item in seq:
        time.sleep(random.randint(1,3))
        print('\033[46m生產者生產了:%s\033[0m' %item)

        q.put(item)

if __name__ == '__main__':
    q=Queue()

    c=Process(target=consumer,args=(q,))
    c.start()

    producer(('包子%s' %i for i in range(5)),q)
    q.put(None)
    c.join()
    print('主線程')

輸出

生產者生產了:包子0
消費者拿到了:包子0
生產者生產了:包子1
消費者拿到了:包子1
生產者生產了:包子2
消費者拿到了:包子2
生產者生產了:包子3
消費者拿到了:包子3
生產者生產了:包子4
消費者拿到了:包子4
主線程

 

  • 建立隊列的另一個類

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

maxsize是隊列中容許最大項數,省略則無大小限制。

 

JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:

    q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
    q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

 

from multiprocessing import Process,JoinableQueue
import time,random
def consumer(q):
    while True:
        # time.sleep(random.randint(1,2))
        res=q.get()
        print('消費者拿到了 %s' %res)
        q.task_done()


def producer(seq,q):
    for item in seq:
        # time.sleep(random.randrange(1,2))
        q.put(item)
        print('生產者作好了 %s' %item)
    q.join()

if __name__ == '__main__':
    q=JoinableQueue()
    seq=('包子%s' %i for i in range(5))

    p=Process(target=consumer,args=(q,))
    p.daemon=True #設置爲守護進程,在主線程中止時p也中止,可是不用擔憂,producer內調用q.join保證了consumer已經處理完隊列中的全部元素
    p.start()

    producer(seq,q)

    print('主線程')

輸出

生產者作好了 包子0
生產者作好了 包子1
生產者作好了 包子2
生產者作好了 包子3
生產者作好了 包子4
消費者拿到了 包子0
消費者拿到了 包子1
消費者拿到了 包子2
消費者拿到了 包子3
消費者拿到了 包子4
主線程

 

 3.進程間通訊(IPC)方式二:管道

  • 建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
  •    參數介紹:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
  • 方法介紹:
    主要方法:
    conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
    conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
 
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。

 

  • 基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的): 

 

from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
    left,right=p
    left.close()
    while True:
        try:
            baozi=right.recv()
            print('%s 收到包子:%s' %(name,baozi))
        except EOFError:
            right.close()
            break
def producer(seq,p):
    left,right=p
    right.close()
    for i in seq:
        left.send(i)
        # time.sleep(1)
    else:
        left.close()
if __name__ == '__main__':
    left,right=Pipe()

    c1=Process(target=consumer,args=((left,right),'c1'))
    c1.start()


    seq=(i for i in range(10))
    producer(seq,(left,right))

    right.close()
    left.close()

    c1.join()
    print('主進程')

輸出

c1 收到包子:0
c1 收到包子:1
c1 收到包子:2
c1 收到包子:3
c1 收到包子:4
c1 收到包子:5
c1 收到包子:6
c1 收到包子:7
c1 收到包子:8
c1 收到包子:9
主進程

 

注意:生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。若是忘記執行這些步驟,程序可能再消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生產EOFError異常。所以在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。

 

管道能夠用於雙向通訊,利用一般在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序,以下

from multiprocessing import Process,Pipe

import time,os
def adder(p,name):
    server,client=p
    client.close()
    while True:
        try:
            x,y=server.recv()
        except EOFError:
            server.close()
            break
        res=x+y
        server.send(res)
    print('server done')
if __name__ == '__main__':
    server,client=Pipe()

    c1=Process(target=adder,args=((server,client),'c1'))
    c1.start()

    server.close()

    client.send((10,20))
    print(client.recv())
    client.close()

    c1.join()
    print('主進程')

輸出

30
server done
主進程

注意:send()和recv()方法使用pickle模塊對對象進行序列化。

 

4、進程池 

開多進程的目的是爲了併發,若是有多核,一般有幾個核就開幾個進程,進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行),但很明顯須要併發執行的任務要遠大於核數,這時咱們就能夠經過維護一個進程池來控制進程數目,好比httpd的進程模式,規定最小進程數和最大進程數...    

當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。並且對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。

  • 建立進程池的類:
Pool([numprocess  [,initializer [, initargs]]]):建立進程池
  •     參數介紹:
numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組
  •  方法介紹:
p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。
   
p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成5 P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數

 

 

  •  應用

   提交任務,並在主進程中拿到結果(以前的Process是執行任務,結果放到隊列裏,如今能夠在主進程中直接拿到結果)

from multiprocessing import Pool
import time
def work(n):
    print('開工啦...')
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    q=Pool()

    #異步apply_async用法:若是使用異步提交的任務,主進程須要使用jion,等待進程池內任務都處理完,而後能夠用get收集結果,不然,主進程結束,進程池可能還沒來得及執行,也就跟着一塊兒結束了
    res=q.apply_async(work,args=(2,))
    q.close()
    q.join() #join在close以後調用
    print(res.get())

    #同步apply用法:主進程一直等apply提交的任務結束後才繼續執行後續代碼
    # res=q.apply(work,args=(2,))
    # print(res)

輸出

開工啦...
4

 

  • 詳解:apply_async與apply
#一:使用進程池(非阻塞,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res)
    print("==============================>") #沒有後面的join,或get,則程序總體結束,進程池中的任務還沒來得及所有執行完也都跟着主進程一塊兒結束了

    pool.close() #關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證實結果已經計算完畢,剩下的事情就是調用每一個對象下的get方法去獲取結果
    for i in res_l:
        print(i.get()) #使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get

#二:使用進程池(阻塞,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
        res_l.append(res) #同步執行,即執行完一個拿到結果,再去執行另一個
    print("==============================>")
    pool.close()
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束

    print(res_l) #看到的就是最終的結果組成的列表
    for i in res_l: #apply是同步的,因此直接獲得結果,沒有get()方法
        print(i)

 

  • 使用進程池維護固定數目的進程
#Pool內的進程數默認是cpu核數,假設爲4(查看方法os.cpu_count())
#開啓6個客戶端,會發現2個客戶端處於等待狀態
#在每一個進程內查看pid,會發現pid使用爲4個,即多個客戶端公用4個進程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)

def talk(conn,client_addr):
    print('進程pid: %s' %os.getpid())
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == '__main__':
    p=Pool()
    while True:
        conn,client_addr=server.accept()
        p.apply_async(talk,args=(conn,client_addr))
        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間只有一個客戶端能訪問

server端
server端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))


while True:
    msg=input('>>: ').strip()
    if not msg:continue

    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
客戶端

 

  •    回調函數(apply_async的擴展用法)

不須要回調函數的場景:若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

 

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待進程池中全部進程執行完畢

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到全部結果
    print(nums) #主進程拿到全部的處理結果,能夠在主進程中進行統一進行處理

 

須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

from multiprocessing import Pool
import time,random,os

def get_page(url):
    print('(進程 %s) 正在下載頁面 %s' %(os.getpid(),url))
    time.sleep(random.randint(1,3))
    return url #用url充當下載後的結果

def parse_page(page_content):
    print('<進程 %s> 正在解析頁面: %s' %(os.getpid(),page_content))
    time.sleep(1)
    return '{%s 回調函數處理結果:%s}' %(os.getpid(),page_content)


if __name__ == '__main__':
    urls=[
        'http://maoyan.com/board/1',
        'http://maoyan.com/board/2',
        'http://maoyan.com/board/3',
        'http://maoyan.com/board/4',
        'http://maoyan.com/board/5',
        'http://maoyan.com/board/7',

    ]
    p=Pool()
    res_l=[]

    #異步的方式提交任務,而後把任務的結果交給callback處理
    #注意:會專門開啓一個進程來處理callback指定的任務(單獨的一個進程,並且只有一個)
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=parse_page)
        res_l.append(res)

    #異步提交完任務後,主進程先關閉p(必須先關閉),而後再用p.join()等待全部任務結束(包括callback)
    p.close()
    p.join()
    print('{主進程 %s}' %os.getpid())

    #收集結果,發現收集的是get_page的結果
    #因此須要注意了:
    #1. 當咱們想要在將get_page的結果傳給parse_page處理,那麼就不須要i.get(),經過指定callback,就能夠將i.get()的結果傳給callback執行的任務
    #2. 當咱們想要在主進程中處理get_page的結果,那就須要使用i.get()獲取後,再進一步處理
    for i in res_l: #本例中,下面這兩步是多餘的
        callback_res=i.get()
        print(callback_res)

'''
打印結果:
(進程 52346) 正在下載頁面 http://maoyan.com/board/1
(進程 52347) 正在下載頁面 http://maoyan.com/board/2
(進程 52348) 正在下載頁面 http://maoyan.com/board/3
(進程 52349) 正在下載頁面 http://maoyan.com/board/4
(進程 52348) 正在下載頁面 http://maoyan.com/board/5
<進程 52345> 正在解析頁面: http://maoyan.com/board/3
(進程 52346) 正在下載頁面 http://maoyan.com/board/7
<進程 52345> 正在解析頁面: http://maoyan.com/board/1
<進程 52345> 正在解析頁面: http://maoyan.com/board/2
<進程 52345> 正在解析頁面: http://maoyan.com/board/4
<進程 52345> 正在解析頁面: http://maoyan.com/board/5
<進程 52345> 正在解析頁面: http://maoyan.com/board/7
{主進程 52345}
http://maoyan.com/board/1
http://maoyan.com/board/2
http://maoyan.com/board/3
http://maoyan.com/board/4
http://maoyan.com/board/5
http://maoyan.com/board/7
'''

 

爬蟲實例

from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(\d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))
相關文章
相關標籤/搜索