Python併發編程之多進程

Python併發編程之多進程

1、什麼是進程

進程:正在進行的一個過程或者說一個任務。而負責執行任務則是cpu。html

進程是資源分配的基本單位python

進程有:代碼段,數據段,進程控制塊(PCB)組成git

2、進程與程序的區別

程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程。github

舉例:算法

想象一位有一手好廚藝的計算機科學家正在爲他的女兒烘製生日蛋糕。編程

他有作生日蛋糕的食譜,json

廚房裏有所需的原料:麪粉、雞蛋、韭菜,蒜泥等。windows

在這個比喻中:數組

作蛋糕的食譜就是程序(即用適當形式描述的算法)安全

計算機科學家就是處理器(cpu)

而作蛋糕的各類原料就是輸入數據

進程就是廚師閱讀食譜、取來各類原料以及烘製蛋糕等一系列動做的總和

須要強調的是:同一個程序執行兩次,那也是兩個進程,好比打開暴風影音,雖然都是同一個軟件,可是一個能夠播放蒼井空,一個能夠播放飯島愛。

3、併發與並行

不管是並行仍是併發,在用戶看來都是'同時'運行的,無論是進程仍是線程,都只是一個任務而已,真是幹活的是cpu,cpu來作這些任務,而一個cpu同一時刻只能執行一個任務

  1. 併發:在同一個時間段內多個任務同時進行,僞並行,即看起來是同時運行。單個cpu+多道技術就能夠實現併發(並行也屬於併發)

    舉例:

    你是一個cpu,你同時談了三個女友,每個均可以是一個戀愛任務,你被這三個任務共享,要玩出併發戀愛的效果,應該是你先跟女朋友1去看電影,看了一會說:很差,我要拉肚子,而後跑去跟第二個女朋友吃飯,吃了一會說:那啥,我去趟洗手間,而後跑去跟女朋友3開了個房

  2. 並行:在同一個時間點上多個任務同時進行,同時運行,只有具有多個cpu才能實現並行

    單核下,能夠利用多道技術,多個核,每一個核也均可以利用多道技術(多道技術是針對單核而言的

    舉例:

    有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4, 一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術,而一旦任務1的I/O結束了,操做系統會從新調用它(需知進程的調度、分配給哪一個cpu運行,由操做系統說了算),可能被分配給四個cpu中的任意一個去執行

4、同步、異步、阻塞、非阻塞

  1. 同步

    同步:某一個任務的執行必須依賴於另外一個任務的返回結果

    所謂同步,就是在發出一個功能調用時,在沒有獲得結果以前,該調用就不會返回。按照這個定義,其實絕大多數函數都是同步調用。可是通常而言,咱們在說同步、異步的時候,特指那些須要其餘部件協做或者須要必定時間完成的任務

  2. 異步

    異步:某一個任務的執行,不須要依賴於另外一個任務的返回,只須要告訴另外一個任務一聲

    異步的概念和同步相對。當一個異步功能調用發出後,調用者不能馬上獲得結果。當該異步功能完成後,經過狀態、通知或回調來通知調用者。

  3. 阻塞

    阻塞:程序由於相似於IO等待、等待事件等致使沒法繼續執行。

    阻塞調用是指調用結果返回以前,當前線程會被掛起(如遇到io操做)。函數只有在獲得結果以後纔會將阻塞的線程激活。有人也許會把阻塞調用和同步調用等同起來,實際上他是不一樣的。對於同步調用來講,不少時候當前線程仍是激活的,只是從邏輯上當前函數沒有返回而已。

  4. 非阻塞

    程序遇到相似於IO操做時,再也不阻塞等待,若是沒有及時的處理IO,就報錯或者跳過等其餘操做

    非阻塞和阻塞的概念相對應,指在不能馬上獲得結果以前也會馬上返回,同時該函數不會阻塞當前線程。

5、進程的基本狀態

進程的三大基本狀態:

  1. 就緒狀態:全部進程須要的資源都獲取到了,等待着CPU的調用
  2. 執行狀態:獲取到了全部資源包括CPU,進程處於運行狀態
  3. 阻塞狀態:程停滯再也不運行,放棄了CPU,進程此時處於內存裏

6、multiprocessing模塊介紹

Python中的多線程沒法利用多核優點,若是想要充分地使用多核CPU的資源(os.cpu_count()查看),在Python中大部分狀況須要使用多進程。Python提供了multiprocessing。
multiprocessing模塊用來開啓子進程,並在子進程中執行咱們定製的任務(好比函數),該模塊與多線程模塊threading的編程接口相似。

multiprocessing模塊的功能衆多:支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。

須要再次強調的一點是:與線程不一樣,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

7、Process類的介紹

  1. 建立進程的類

    Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動)
    
    強調:
    1. 須要使用關鍵字的方式來指定參數
    2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
  2. 參數介紹

    group參數未使用,值始終爲None
    
    target表示調用對象,即子進程要執行的任務
    
    args表示調用對象的位置參數元組,args=(1,2,'egon',)
    
    kwargs表示調用對象的字典,kwargs={'name':'egon','age':18}
    
    name爲子進程的名稱
  3. 方法介紹

    p.start():啓動進程,並調用該子進程中的p.run() 
     p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法  
    
     p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖
     p.is_alive():若是p仍然運行,返回True
    
     p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
  4. 屬性介紹

    p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,p也隨之終止,而且設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置
    
    p.name:進程的名稱
    
    p.pid:進程的pid
    
    p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束
    
    p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性,這類鏈接只有在具備相同的身份驗證鍵時才能成功

8、Process類的使用

注意:在windows中Process()必須放到# if name == 'main':下

  1. 建立並開啓子進程的兩種方式

    from multiprocessing import Process
    import os
    
    
    def child_process():
        print("這是子進程{0},父進程是{1}".format(os.getpid(), os.getppid()))
    
    if __name__ == '__main__':
        child_p = Process(target=child_process)
        child_p.start()
    
        # child_p.join()
        print("這是父進程{0}".format(os.getpid()))
    from multiprocessing import Process
    import os
    
    
    class ChildProcess(Process):
        def __init__(self):
            super(ChildProcess, self).__init__()
    
        def run(self):
            print("這是子進程{0},父進程是{1}".format(os.getpid(), os.getppid()))
    
    
    if __name__ == '__main__':
        child_p = ChildProcess()
        child_p.start()
    
        # child_p.join()
        print("這是父進程{0}".format(os.getpid()))
  2. 進程之間的內存空間是隔離的

    from multiprocessing import Process
    import os
    
    num = 100
    
    def chile_process():
        global num
        num = 0
        print("子進程中:{0}".format(num))
    
    if __name__ == '__main__':
        p = Process(target=chile_process)
        p.start()
    
        print("父進程中:{0}".format(num))
    
    # 父進程中:100
    # 子進程中:0
  3. Process中的join()方法

    join():主進程等待,等待子進程結束

    from multiprocessing import Process
    import os
    import time
    
    def child_process():
        time.sleep(3)
        print("這是子進程")
    
    
    if __name__ == '__main__':
        p = Process(target=child_process)
        p.start()
        # p.join()
    
        print("這是主進程")
    
    # 這是主進程
    # 這是子進程
    # 分析:若是不加join那麼則是先打印主進程中的「這是主進程」,而後等待三秒在打印「這是子進程」
    
    
    from multiprocessing import Process
    import os
    import time
    
    def child_process():
        time.sleep(3)
        print("這是子進程")
    
    
    if __name__ == '__main__':
        p = Process(target=child_process)
        p.start()
        p.join()
    
        print("這是主進程")
    # 這是子進程
    # 這是主進程
    # 分析:若是加了join那麼主進程會等待子進程執行完以後再執行主進程,也就是說會先等待三秒而後同時打印出「這是子進程」和「這是主進程」

9、守護進程

守護進程的特色:

  1. 守護進程會在主進程執行完成後終止

  2. 設置了守護進程後,守護進程不能再開啓子進程,不然會報異常

    from multiprocessing import Process
    import time
    
    def func(name):
        time.sleep(1)
        print("我是{0}".format(name))
    
    
    def foo(name):
        time.sleep(3)
        print("{0}是誰".format(name))
    
    
    if __name__ == '__main__':
        p1 = Process(target=func, args=("oldwang",))
        p2 = Process(target=foo, args=("oldwang",))
    
        p1.daemon = True
        # 必定要在p.start()前設置,設置p爲守護進程,禁止p建立子進程,而且父進程代碼執行結束,p即終止運行
        p1.start()
        p2.start()
    
        print("這是主進程...")
    
    # 執行結果:
    # 這是主進程...
    # oldwang是誰

10、進程同步

進程之間數據不共享,可是共享同一套文件系統,因此訪問同一個文件,或同一個打印終端,是沒有問題的,

而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理

# 模擬搶票,購票行爲由並行變成了串行,犧牲了效率,提升了數據安全性
from multiprocessing import Process, Lock
import json
import time
import os

def search_ticket():
    with open("file/ticket", mode="r", encoding="utf-8") as f:
        ticket_num = int(f.read())
        print("剩餘票數:{0}".format(ticket_num))

def get_ticket():
    with open("file/ticket", mode="r", encoding="utf-8") as f:
        ticket = int(f.read())
        time.sleep(0.1)  # 模擬搶票延時
        if ticket:
            ticket -= 1
            print("{0}搶到了一張票,還剩{1}張票".format(os.getpid(), ticket))

        else:
            print("{0}沒有搶到票".format(os.getpid()))

    f = open("file/ticket", mode="w", encoding="utf-8")
    f.write(str(ticket))

def task(lock):
    search_ticket()
    lock.acquire()
    get_ticket()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100):
        p = Process(target=task, args=(lock,))
        p.start()

總結:加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。

雖然能夠用文件共享數據實現進程間通訊,但問題是:

  1. 效率低(共享數據基於文件,而文件是硬盤上的數據)
  2. 須要本身加鎖處理

11、隊列(推薦使用)

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的

  1. Queue(底層就是以管道和鎖的方式實現

    方法介紹

    maxsize是隊列中容許最大項數,省略則無大小限制。    
    
    q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
    
    q.get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常.
    
    q.get_nowait():同q.get(False)
    q.put_nowait():同q.put(False)
    
    q.empty():調用此方法時q爲空則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中又加入了項目。
    q.full():調用此方法時q已滿則返回True,該結果不可靠,好比在返回True的過程當中,若是隊列中的項目被取走。
    q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()同樣
    
    q.cancel_join_thread():不會在進程退出時自動鏈接後臺線程。能夠防止join_thread()方法阻塞
    
    q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,若是某個使用者正在被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。
    
    q.join_thread():鏈接隊列的後臺線程。此方法用於在調用q.close()方法以後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread方法能夠禁止這種行爲
    from multiprocessing import Queue
    
    q = Queue(maxsize=3)
    
    q.put(1)
    q.put({"name":"dogfa"})
    q.put([1,2,3])
    
    print(q.full())      # True
    
    print(q.get())       # 1
    print(q.get())       # {'name': 'dogfa'}
    print(q.get())       # [1, 2, 3]
    
    print(q.empty()) # True
  2. 生產者消費者模型

    在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

  3. 爲何使用生產者消費者模式

    在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

  4. 什麼是生產者消費者模式

    生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

  5. JoinableQueue()

    #JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
    
       #參數介紹:
        maxsize是隊列中容許最大項數,省略則無大小限制。    
      #方法介紹:
        JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
        q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
        q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
  6. 生產者消費者模型的實現

    from multiprocessing import JoinableQueue, Process
    import os
    import random
    import time
    
    def customer(q):
        while 1:
            time.sleep(0.5)
            print("{0}號顧客吃了{1}".format(os.getpid(), q.get()))
            q.task_done()
    
    
    def producter(food, q):
        for i in range(10):
            time.sleep(random.randint(1, 2))
            q.put(food)
            print("{0}號廚師完成了{1}的製做".format(os.getpid(), food))
    
        q.join()
    
    
    if __name__ == '__main__':
        q = JoinableQueue()
    
        pro1 = Process(target=producter, args=("包子", q))
        pro2 = Process(target=producter, args=("油條", q))
        pro3 = Process(target=producter, args=("花捲", q))
    
        cus1 = Process(target=customer, args=(q,))
        cus2 = Process(target=customer, args=(q,))
    
        cus1.daemon = True
        cus2.daemon = True
        lst = [pro1, pro2, pro3, cus1, cus2]
        [i.start() for i in lst]
    
        pro1.join()
        pro2.join()
        pro3.join()
    
        print("ending...")
        # 主進程等待pro1,Pro2,pro3執行完成,當pro執行完成意味着cus一定執行完成,因此能夠將cus設置成守護進程
  7. 生產者消費者模式總結

    #程序中有兩類角色
            一類負責生產數據(生產者)
            一類負責處理數據(消費者)
    
        #引入生產者消費者模型爲了解決的問題是:
            平衡生產者與消費者之間的工做能力,從而提升程序總體處理數據的速度
    
        #如何實現:
            生產者<-->隊列<——>消費者
        #生產者消費者模型實現類程序的解耦和

12、管道(不推薦使用)

  1. 管道

    #建立管道的類:
    Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
    #參數介紹:
    dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
    #主要方法:
        conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
        conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
     #其餘方法:
    conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
    conn1.fileno():返回鏈接使用的整數文件描述符
    conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
    
    conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
    conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
    
    conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
  2. 利用管道實現進程間的通訊

    from multiprocessing import Process,Pipe
    
    import time,os
    def consumer(p,name):
        left,right=p
        left.close()
        while True:
            try:
                baozi=right.recv()
                print('%s 收到包子:%s' %(name,baozi))
            except EOFError:
                right.close()
                break
    def producer(seq,p):
        left,right=p
        right.close()
        for i in seq:
            left.send(i)
            # time.sleep(1)
        else:
            left.close()
    if __name__ == '__main__':
        left,right=Pipe()
    
        c1=Process(target=consumer,args=((left,right),'c1'))
        c1.start()
    
    
        seq=(i for i in range(10))
        producer(seq,(left,right))
    
        right.close()
        left.close()
    
        c1.join()
        print('主進程')

十3、數據共享

進程間數據是獨立的,能夠藉助於隊列或管道實現通訊,兩者都是基於消息傳遞的,雖然進程間數據獨立,但能夠經過Manager實現數據共享

from multiprocessing import Process, Manager, Lock
import os
import random
import time


def func(dic, lock):
    lock.acquire()  # 不加鎖確定會形成數據混亂
    time.sleep(random.randrange(2))
    dic["count"] -= 1
    lock.release()



if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
        dic = m.dict({"count": 100})
        lst = []
        for i in range(100):
            p = Process(target=func, args=(dic, lock))
            lst.append(p)
            p.start()

        [i.join() for i in lst]
        print(dic["count"])

進程間通訊應該儘可能避免使用上述共享數據的方式

十3、進程池

在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。多進程是實現併發的手段之一,須要注意的問題是:

  1. 很明顯須要併發執行的任務一般要遠大於核數
  2. 一個操做系統不可能無限開啓進程,一般有幾個核就開幾個進程
  3. 進程開啓過多,效率反而會降低(開啓進程是須要佔用系統資源的,並且開啓多餘核數目的進程也沒法作到並行)

若是當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個。。。手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。咱們就能夠經過維護一個進程池來控制進程數目。

ps:對於遠程過程調用的高級應用程序而言,應該使用進程池,Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,就重用進程池中的進程。

建立進程池的類:若是指定numprocess爲3,則進程池會從無到有建立三個進程,而後自始至終使用這三個進程去執行全部任務,不會開啓其餘進程

  1. 建立進程池

    Pool([numprocess  [,initializer [, initargs]]]):建立進程池
  2. 參數介紹

    numprocess:要建立的進程數,若是省略,將默認使用cpu_count()的值
    initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
    initargs:是要傳給initializer的參數組
  3. 方法介紹

    p.apply(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。須要強調的是:此操做並不會在全部池工做進程中並執行func函數。若是要經過不一樣參數併發地執行func函數,必須從不一樣線程調用p.apply()函數或者使用p.apply_async()
    
    p.apply_async(func [, args [, kwargs]]):在一個池工做進程中執行func(*args,**kwargs),而後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變爲可用時,將理解傳遞給callback。callback禁止執行任何阻塞操做,不然將接收其餘異步操做中的結果。
    
    p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
    P.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法:
    
    obj.get():返回結果,若是有必要則等待結果到達。
    obj.ready():若是調用完成,返回True
    obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
    obj.wait([timeout]):等待結果變爲可用。
    obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數
  4. 進程池的使用

    1. 使用進程池(異步調用,apply_async)

      from multiprocessing import Process, Pool
      import os
      import time
      import socket
      
      def func(i):
          print(i)
          time.sleep(1)
          return i ** 2
      
      if __name__ == '__main__':
          pool = Pool(os.cpu_count() + 1)
          ret_lst = []
      
          for i in range(100):
              # 維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
              ret = pool.apply_async(func, args=(i,))
              ret_lst.append(ret)
      
          # 沒有後面的join,或get,則程序總體結束,進程池中的任務還沒來得及所有執行完也都跟着主進程一塊兒結束了
          print("=======================")
           # 關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成
          pool.close()
          # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束
          pool.join()
          # 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>對象組成的列表,而非最終的結果,但這一步是在join後執行的,證實結果已經計算完畢,剩下的事情就是調用每一個對象下的get方法去獲取結果
          print(ret_lst)
        # 使用get來獲取apply_aync的結果,若是是apply,則沒有get方法,由於apply是同步執行,馬上獲取結果,也根本無需get
          [print(i.get()) for i in ret_lst]
    2. 使用進程池(同步調用,apply)

      from multiprocessing import Process, Pool
      import os
      import time
      import socket
      
      def func(i):
          print(i)
          time.sleep(1)
          return i ** 2
      
      if __name__ == '__main__':
        # 維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去
          pool = Pool(os.cpu_count() + 1)
          # 同步執行,即執行完一個拿到結果,再去執行另一個
          ret_lst = []
      
          for i in range(100):
              ret = pool.apply(func, args=(i,))
              ret_lst.append(ret)
        # 看到的就是最終的結果組成的列表,apply是同步的,因此直接獲得結果,沒有get()方法
          print(ret_lst)
    3. 進程池實現基於TCP協議的socket併發效果

      # 服務端
      from multiprocessing import Process, Pool
      import os
      import time
      import socket
      
      def func(conn, client_addr):
          print("進程:{0}".format(os.getpid()))
          while 1:
              try:
                  c_msg = conn.recv(1024).decode("utf-8")
                  if not c_msg: break
                  print(c_msg)
                  conn.send(c_msg.upper().encode("utf-8"))
              except Exception:
                  break
      
      if __name__ == '__main__':
          sk = socket.socket()
          sk.bind(("127.0.0.1", 8080))
          sk.listen(5)
          pool = Pool(os.cpu_count() + 1)
          while 1:
              conn, addr = sk.accept()
              pool.apply_async(func, args=(conn, addr))
      
      
      
      # 服務端
      import socket
      
      sk = socket.socket()
      
      sk.connect(("127.0.0.1", 8080))
      
      while 1:
          c_msg = input(">>")
          if not c_msg: continue
          sk.send(c_msg.encode("utf-8"))
          s_msg = sk.recv(1024).decode("utf-8")
          print(s_msg)

      當鏈接數達到開啓的進程池中的最大進程數量時,再有其它客戶端進行鏈接,將會阻塞等待,當另外的客戶端結束鏈接時纔會創建起會話鏈接。

  5. 回調函數(callback())

    須要回調函數的場景:進程池中任何一個任務一旦處理完了,就當即告知主進程:我好了額,你能夠處理個人結果了。主進程則調用一個函數去處理該結果,該函數即回調函數

    咱們能夠把耗時間(阻塞)的任務放到進程池中,而後指定回調函數(主進程負責執行),這樣主進程在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

    from multiprocessing import Pool
    import requests
    import json
    import os
    
    def get_page(url):
        print('<進程%s> get %s' %(os.getpid(),url))
        respone=requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    
    def pasrse_page(res):
        print('<進程%s> parse %s' %(os.getpid(),res['url']))
        parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
        with open('db.txt','a') as f:
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.python.org',
            'https://www.openstack.org',
            'https://help.github.com/',
            'http://www.sina.com.cn/'
        ]
    
        p=Pool(3)
        res_l=[]
        for url in urls:
            res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
            res_l.append(res)
    
        p.close()
        p.join()
        print([res.get() for res in res_l]) # 拿到的是get_page的結果,其實徹底不必拿該結果,該結果已經傳給回調函數處理了
    
    '''
    打印結果:
    <進程3388> get https://www.baidu.com
    <進程3389> get https://www.python.org
    <進程3390> get https://www.openstack.org
    <進程3388> get https://help.github.com/
    <進程3387> parse https://www.baidu.com
    <進程3389> get http://www.sina.com.cn/
    <進程3387> parse https://www.python.org
    <進程3387> parse https://help.github.com/
    <進程3387> parse http://www.sina.com.cn/
    <進程3387> parse https://www.openstack.org
    [{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]
    '''

    若是在主進程中等待進程池中全部任務都執行完畢後,再統一處理結果,則無需回調函數

十4、信號量

互斥鎖 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去,若是指定信號量爲3,那麼來一我的得到一把鎖,計數加1,當計數等於3時,後面的人均須要等待。一旦釋放,就有人能夠得到一把鎖

信號量與進程池的概念很像,可是要區分開,信號量涉及到加鎖的概念

from multiprocessing import Process,Semaphore
import time,random

def go_wc(sem,user):
    sem.acquire()
    print('%s 佔到一個茅坑' %user)
    time.sleep(random.randint(0,3)) #模擬每一個人拉屎速度不同,0表明有的人蹲下就起來了
    sem.release()

if __name__ == '__main__':
    sem=Semaphore(5)
    p_l=[]
    for i in range(13):
        p=Process(target=go_wc,args=(sem,'user%s' %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print('============》')

十5、事件

Python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。

事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

clear:將「Flag」設置爲False
set:將「Flag」設置爲True

from multiprocessing import Process,Event
import time,random

def car(e,n):
    while True:
        if not e.is_set(): #Flase
            print('\033[31m紅燈亮\033[0m,car%s等着' %n)
            e.wait()
            print('\033[32m車%s 看見綠燈亮了\033[0m' %n)
            time.sleep(random.randint(3,6))
            if not e.is_set():
                continue
            print('走你,car', n)
            break

def police_car(e,n):
    while True:
        if not e.is_set():
            print('\033[31m紅燈亮\033[0m,car%s等着' % n)
            e.wait(1)
            print('燈的是%s,警車走了,car %s' %(e.is_set(),n))
            break

def traffic_lights(e,inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            e.clear() #e.is_set() ---->False
        else:
            e.set()

if __name__ == '__main__':
    e=Event()
    # for i in range(10):
    #     p=Process(target=car,args=(e,i,))
    #     p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))
        p.start()
    t=Process(target=traffic_lights,args=(e,10))
    t.start()

    print('============》')

..............

相關文章
相關標籤/搜索