併發編程(五)

前言

上篇博客的內容是守護進程,對於操做系統來講能夠在後臺執行一些程序.這篇的內容是互斥鎖,在上上篇博客上說到進程內存空間互相隔離,因此能夠經過共享文件來操做同一個文件,那麼這樣操做的話會發生什麼呢?python

互斥鎖

多個進程須要共享數據時,先將其鎖定,此時資源狀態爲'鎖定',其餘進程不能更改;知道該進程釋放資源,將資源的狀態變成非'鎖定',其餘的線程才能再次鎖定該資源.互斥鎖保證了每次只有一個進程進入寫入操做,從而保證了多進程狀況下數據的正確性.json

咱們使用一個demo 來模擬多個進程操做同一個文件:安全

import json
import time,random
from multiprocessing import Process

def show_tickets(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
        print('%s 查看 剩餘票數: %s' % (name, data['count']))

def buy_ticket(name):
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        dic = json.load(f)

        if dic['count'] > 0:
            dic['count'] -= 1

            time.sleep(random.randint(1,3))

            with open('ticket.json', 'wt', encoding='utf-8') as f:
                json.dump(dic, f)
                print('%s: 購票成功' % name)

def task(name):
    show_tickets(name)
    buy_ticket(name)

if __name__ == '__main__':
    for i in range(1,11):
        p = Process(target=task, args=(i,))
        p.start()

運行結果:併發

在 ticket.json 裏面只有一張票,結果卻形成多個用戶購買成功,這很顯然是不符合實際狀況的.\dom

那麼怎麼解決呢?若是多個進程對同一個文件進行讀操做能夠不進行限制,可是對同一個文件進行寫操做就必要要進行限制,不能夠同時多我的對同一個文件進行寫操做.python 在多進程模塊裏提供一個類, Lock 類,當進程獲取到鎖的時候其餘的進程就必需要等待鎖釋放才能夠進行爭搶,在這個例子裏面就能夠加上一把鎖來保護數據安全.函數

from multiprocessing import Process, Lock
import json,time,random


def show_tickets(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
        print('%s 查看 剩餘票數: %s' % (name, data['count']))

def buy_ticket(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        dic = json.load(f)

        if dic['count'] > 0:
            dic['count'] -= 1

            time.sleep(random.randint(1,3))

            with open('ticket.json', 'wt', encoding='utf-8') as f:
                json.dump(dic, f)
                print('%s: 購票成功' % name)

def task(name,lock):
    show_tickets(name)
    lock.acquire()
    buy_ticket(name)
    lock.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(1,11):
        p = Process(target=task, args=(i,mutex))
        p.start()

運行結果:性能

這樣加了鎖(互斥鎖)就能夠解決同時操做同一個文件形成的數據混亂問題了.ui

當使用多進程開發時,若是多個進程同時讀寫同一個資源,可能會形成數據的混亂,爲了防止發生問題,使用鎖,或者使用 Process 的方法 join 將並行變爲串行.spa

join 和鎖的區別操作系統

  1. join 人爲控制進程的執行順序
  2. join 把整個進程所有串行,而鎖能夠指定部分代碼串行

一旦串行,效率就會下降,一旦並行,數據就可能會出錯.

進程間通訊

進程間通訊( internal-process communication),咱們在開啓子進程是但願子進程幫助完成任務,不少狀況下須要將數據返回給父進程,然而進程間內存是物理隔離的.

解決辦法:

  1. 將共享數據放到文件中
  2. 管道 多進程模塊中的一個類,須要有父子關係
  3. 共享一快內存區域 須要操做系統分配

管道通訊

Pipe類返回一個由管道鏈接的鏈接對象,默認狀況下爲雙工:

from multiprocessing import Process,Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
    
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

運行結果:

[42, None, 'hello']

實例化 Pipe 類會返回兩個鏈接對象表示管道的兩端.每一個鏈接對象都有 send() 和 recv() 方法(及其餘).請注意,若是兩個進程同時嘗試讀寫管道的同一端,則管道中的數據可能會損壞.固然,同時使用管道的不一樣端部的過程不存在損壞的風險.

共享內存通訊

Queue 通訊

Queue類會生成一個先進先出的容器,經過往隊列中存取數據而進行進程間通訊.

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()

運行結果:

[42, None, 'hello']

隊列其餘特性

# 阻塞操做 必須掌握
q = Queue(3)
# # 存入數據
q.put("hello",block=False)
q.put(["1","2","3"],block=False)
q.put(1,block=False)
# 當容量滿的時候 再執行put 默認會阻塞直到執行力了get爲止
# 若是修改block=False 直接報錯 由於沒地方放了
# q.put({},block=False)

# # # 取出數據
print(q.get(block=False))
print(q.get(block=False))
print(q.get(block=False))
# 對於get   當隊列中中沒有數據時默認是阻塞的  直達執行了put
# 若是修改block=False 直接報錯 由於沒數據可取了
print(q.get(block=False))



# 瞭解
q = Queue(3)
q.put("q",timeout=3)
q.put("q2",timeout=3)
q.put("q3",timeout=3)
# 若是滿了 願意等3秒  若是3秒後還存不進去 就炸
# q.put("q4",timeout=3)

print(q.get(timeout=3))
print(q.get(timeout=3))
print(q.get(timeout=3))
# 若是沒了 願意等3秒  若是3秒後還取不到數據 就炸
print(q.get(timeout=3))

Manager 通訊

demo

from multiprocessing import  Process,Manager
import time

def task(dic):
    print("子進程xxxxx")
    # li[0] = 1
    # print(li[0])
    dic["name"] = "xx"

if __name__ == '__main__':
    m = Manager()
    # li = m.list([100])
    dic = m.dict({})
    # 開啓子進程
    p = Process(target=task,args=(dic,))
    p.start()
    time.sleep(3)

能夠建立一片共享內存區域用來存取數據.

生產者消費者模型

什麼是生產者消費者模型

在軟件開發過程當中,常常碰到這樣的場景:

某些模塊負責生產數據,這些數據由其餘模塊來負責處理(此處的模塊多是:函數,線程,進程等).生產數據的模塊稱爲生產者,而處理數據的模塊稱爲消費者.在生產者與消費者之間的緩衝區稱之爲倉庫.生產者負責往倉庫運輸商品,而消費者負責從倉庫裏取出商品,這就構成了生產者消費者模型.

結構圖以下:

爲了便於理解,咱們舉一個寄信的例子。假設你要寄一封信,大體過程以下:

  1. 你把信寫好——至關於生產者生產數據;
  2. 你把信放入郵箱——至關於生產者把數據放入緩衝區;
  3. 郵遞員把信從郵箱取出,作相應處理——至關於消費者把數據取出緩衝區,處理數據.

生產者消費者模型的優勢

  • 解耦

假設生產者和消費者分別是兩個線程.若是讓生產者直接調用消費者的某個方法,那麼生產者對於消費者就會產生依賴(耦合).若是將來消費者的代碼發生改變,可能會影響到生產者的代碼.而若是二者都依賴於某個緩衝區,二者之間不直接依賴,耦合也就相應下降了.

舉個例子,咱們去郵局投遞信件,若是不使用郵箱(也就是緩衝區,你必須得把信直接交給郵遞員.有同窗會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他.這就產生了你和郵遞員之間的依賴(至關於生產者和消費者的強耦合).萬一哪天郵遞員換人了,你還要從新認識一下(至關於消費者變化致使修改生產者代碼).而郵箱相對來講比較固定,你依賴它的成本就比較低(至關於和緩衝區之間的弱耦合).

  • 併發

因爲生產者與消費者是兩個獨立的併發體,它們之間是使用緩衝區通訊的,生產者只須要往緩衝區裏丟數據,就能夠接着生產下一個數據了,而消費者只須要從緩衝區拿數據便可,這樣就不會由於彼此的處理速度而發生阻塞.

繼續上面的例子,若是沒有郵箱,就得在郵局等郵遞員,知道他回來,把信交給他,這期間咱們什麼事都幹不了(生產者阻塞).或者郵遞員挨家挨戶問,誰要寄信(消費者阻塞).

  • 支持忙閒不均

當生產者製造數據快的時候,消費者來不及處理,爲處理的數據能夠暫時存在緩衝區中,慢慢處理,而不至於由於消費者的性能過慢形成數據丟失或影響生產者生產數據.

再拿寄信的例子,假設郵遞員一次只能帶走1000封信,萬一碰上情人節或者其餘的緊急任務,須要寄出的信超過了1000封,這個時候郵箱做爲緩衝區就派上用場了.郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時在拿走.

使用

from multiprocessing import Process, Queue
import time, random

def producer(name, food, q):
    for i in range(10):
        res = '%s %s' % (food, i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('%s 生產了 %s' % (name, res))
        
def consumer(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print('%s 消費了 %s' % (name, res))

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=('musibii', '🍔', q))
    c = Process(target=consumer, args=('thales', q))
    p.start()
    c.start()
    
    p.join()
    c.join()
    print('主進程')

運行結果:

這樣的話該進程並不會結束,由於 get 方法是阻塞的,數據消費完就會一直等待知道生產者生產新的數據,而生產者只能生產9個數據.因此會一直阻塞.

改進使用

咱們須要在消費者消費的時候知道隊列裏面有多少數據,應該何時消費完了,因此能夠在生產者裏面生產結束後添加一個標誌,好比 None.

import time, random
from multiprocessing import Process, Queue


# 製做熱狗
def make_hotdog(queue, name):
    for i in range(1, 4):
        time.sleep(random.randint(1, 2))
        print("%s 製做了一個🌭 %s" % (name, i))
        # 生產獲得的數據
        data = "%s 生產的🌭%s" % (name, i)
        # 存到隊列中
        queue.put(data)
    # 裝入一個特別的數據 告訴消費方 沒有了
    # queue.put(None)


# 吃熱狗
def eat_hotdog(queue, name):
    while True:
        data = queue.get()
        if not data: break
        time.sleep(random.randint(1, 2))
        print("%s 吃了 %s" % (name, data))


if __name__ == '__main__':
    # 建立隊列
    q = Queue()
    p1 = Process(target=make_hotdog, args=(q, "musibii的熱狗店"))
    p2 = Process(target=make_hotdog, args=(q, "egon的熱狗店"))
    p3 = Process(target=make_hotdog, args=(q, "eureka的熱狗店"))

    c1 = Process(target=eat_hotdog, args=(q, "thales"))
    c2 = Process(target=eat_hotdog, args=(q, "maffia"))

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    # 讓主進程等三家店全都作完後....
    p1.join()
    p2.join()
    p3.join()

    # 添加結束標誌   注意這種方法有幾個消費者就加幾個None 不太合適 不清楚未來有多少消費者
    q.put(None)
    q.put(None)

    # 如今 須要知道何時作完熱狗了 生產者不知道  消費者也不知道
    # 只有隊列知道

    print("主進程over")

    # 生產方不生產了 然而消費方不知道 因此已知等待  get函數阻塞
    # 三家店都放了一個空表示沒熱狗了  可是消費者只有兩個 他們只要看見None 就認爲沒有了
    # 因而進程也就結束了  形成一些數據沒有被處理
    # 等待作有店都作完熱狗在放None

運行結果:

這樣就解決了最第一版本消費之由於沒有數據而阻塞的問題了,可是這裏仍是有問題,由於不知道到底有多少消費者,由於想讓消費者知道數據已經結束了的話,須要給每一個消費者一個標誌位,這樣是不現實的.

完美使用

python 多進程模塊提供了一個JoinableQueue類,追根溯源繼承於 Queue,源碼看的頭疼.

import time, random
from multiprocessing import Process, JoinableQueue


# 製做熱狗
def make_hotdog(queue, name):
    for i in range(1,4):
        time.sleep(random.randint(1, 2))
        print("%s 製做的🌭 %s" % (name, i))
        # 生產獲得的數據
        data = "%s 生產的🌭 %s" % (name, i)
        # 存到隊列中
        queue.put(data)
    # 裝入一個特別的數據 告訴消費方 沒有了
    # queue.put(None)


# 吃熱狗
def eat_hotdog(queue, name):
    while True:
        data = queue.get()
        time.sleep(random.randint(1, 2))
        print("%s 吃了%s" % (name, data))
        # 該函數就是用來記錄一共給消費方多少數據了 就是get次數
        queue.task_done()


if __name__ == '__main__':
    # 建立隊列
    q = JoinableQueue()
    p1 = Process(target=make_hotdog, args=(q, "musibii的熱狗店"))
    p2 = Process(target=make_hotdog, args=(q, "egon的熱狗店"))
    p3 = Process(target=make_hotdog, args=(q, "eureka的熱狗店"))

    c1 = Process(target=eat_hotdog, args=(q, "thales"))
    c2 = Process(target=eat_hotdog, args=(q, "maffia"))

    p1.start()
    p2.start()
    p3.start()

    # 將消費者做爲主進程的守護進程
    c1.daemon = True
    c2.daemon = True

    c1.start()
    c2.start()

    # 讓主進程等三家店全都作完後....
    p1.join()
    p2.join()
    p3.join()

    # 如何知道生產方生產完了 而且 消費方也吃完了
    # 方法一:等待作有店都作完熱狗在放None
    # # 添加結束標誌   注意這種方法有幾個消費者就加幾個None 不太合適 不清楚未來有多少消費者
    # q.put(None)
    # q.put(None)

    # 主進程等到隊列結束時再繼續  那隊列何時算結束? 生產者已經生產完了 而且消費者把數據全取完了
    q.join()  # 已經明確生產放一共有多少數據

    # 如今 須要知道何時作完熱狗了 生產者不知道  消費者也不知道
    # 只有隊列知道

    print("主進程over")
    # 生產方不生產了 然而消費方不知道 因此一直等待  get函數阻塞
    # 三家店都放了一個空表示沒熱狗了  可是消費者只有兩個 他們只要看見None 就認爲沒有了
    # 因而進程也就結束了  形成一些數據沒有被處理

運行結果:

查看 JoinableQueue 類方法 task_done 源碼:

看不懂.........

相關文章
相關標籤/搜索