進程之間的通訊

互斥鎖

from multiprocessing import Process,Lock
import os,time
def work(lock):
    lock.acquire()    上鎖
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())
    lock.release()    解鎖
if __name__ == '__main__':
    lock=Lock()      造鎖(實例化一個鎖)
    for i in range(3):
        p=Process(target=work,args=(lock,))
        p.start()
調用「鎖」的方式

示例背景

三臺電腦同一時刻共同調用打印機完成打印任務,即三個進程同一時刻共搶同一個資源(輸出平臺)json

多個進程共搶一個資源,應結果第一位,效率第二位,因此應該犧牲效率,保求結果(串行)安全

示例代碼

以上的代碼雖然完成串行結果,但沒有實現公平,執行的順序都是人爲寫好的,應該作到公平的搶佔資源,誰先搶到就執行誰網絡

from multiprocessing import Process
from multiprocessing import Lock
import time
import random


def task1(lock):
    print('task1')  # 驗證cpu遇到io切換了
    lock.acquire()
    print('task1: 開始打印')
    time.sleep(random.randint(1, 3))
    print('task1: 打印完成')
    lock.release()

def task2(lock):
    print('task2')  # 驗證cpu遇到io切換了
    lock.acquire()
    print('task2: 開始打印')
    time.sleep(random.randint(1, 3))
    print('task2: 打印完成')
    lock.release()


def task3(lock):
    print('task3') # 驗證cpu遇到io切換了
    lock.acquire()
    print('task3: 開始打印')
    time.sleep(random.randint(1, 3))
    print('task3: 打印完成')
    lock.release()


if __name__ == '__main__':

    lock = Lock()

    p1 = Process(target=task1, args=(lock,))
    p2 = Process(target=task2, args=(lock,))
    p3 = Process(target=task3, args=(lock,))

    p1.start()
    p2.start()
    p3.start()
正確代碼

上鎖時必定要給進程上同一把鎖,並且上鎖一次,就必定要解鎖一次併發

 互斥鎖和join的區別

共同點

都是完成了進程之間的串行dom

區別

join是經過人爲控制使進程串行ide

互斥鎖是隨機搶佔資源,保證了公平性ui

業務需求分析:

買票以前先要查票,必須經歷的流程: 你在查票的同時,100我的也在查本此列票.
買票時,你要先從服務端獲取到票數,票數>0 ,買票,而後服務端票數減一. 中間確定有網絡延遲.
from multiprocessing import Process
from multiprocessing import Lock
import time
import json
import os
import random
多進程原則上是不能互相通訊的,它們在內存級別數據隔離的.不表明磁盤上數據隔離.
它們能夠共同操做一個文件.

def search():
    time.sleep(random.random())
    with open('db.json',encoding='utf-8') as f1:
        dic = json.load(f1)
    print(f'剩餘票數{dic["count"]}')


def get():
    with open('db.json',encoding='utf-8') as f1:
        dic = json.load(f1)
    time.sleep(random.randint(1,3))
    if dic['count'] > 0:
        dic['count'] -= 1
        with open('db.json', encoding='utf-8', mode='w') as f1:
            json.dump(dic,f1)
        print(f'{os.getpid()}用戶購買成功')
    else:
        print('沒票了.....')

def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()


if __name__ == '__main__':
    lock = Lock()
    for i in range(5):
        p = Process(target=task,args=(lock,))
        p.start()
模擬搶票系統

隊列

進程之間的通訊最好的方式是基於隊列spa

from multiprocessing import Process
from multiprocessing import Queue
import time
import os
import random
# 多進程原則上是不能互相通訊的,它們在內存級別數據隔離的.不表明磁盤上數據隔離.
# 它們能夠共同操做一個文件.

def search(ticket):
    time.sleep(random.random())
    print(f'剩餘票數{ticket}')


def _get(q):
    dic = q.get()
    time.sleep(random.randint(1,3))
    if dic['count'] > 0:
        dic['count'] -= 1
        print(f'{os.getpid()}用戶購買成功')
    else:
        print('沒票了.....')
    q.put(dic)

def task(ticket,q):
    search(ticket)
    _get(q)


if __name__ == '__main__':
    q = Queue(1)
    q.put({'count': 1})
    ticket = 1
    for i in range(5):
        p = Process(target=task,args=(ticket,q))
        p.start()
經過隊列模擬搶票系統

什麼是隊列

隊列是存在與內存中的一個容器,最大的特色:先進先出(FIFO)3d

利用隊列進行進程之間的通訊,簡單,方便,不用本身手動加鎖,隊列自帶優質阻塞,可持續化取數據code

from multiprocessing import Queue

q = Queue(3)  # 能夠設置元素個數

def func():
     print('in func')

q.put('alex')
q.put({'count': 1})
q.put(func)
q.put(666)  # 當隊列數據已經達到上限,在插入數據的時候,程序就會夯住.
                     當你將數據所有取完繼續再取值時,程序也會夯住

put中有block參數 默認爲True 當你插入的數據超過最大限度,默認阻塞.改爲False 數據超過最大限度,不阻塞了直接報錯.

put中有timeout參數 延時報錯,好比q.put(3,timeout = 3) 超過三秒再put不進,程序就會報錯

get也有這兩個參數
隊列的基本方法
# 小米:搶手環4.預期發售10個.
# 有100我的去搶.
import os
from multiprocessing import Queue
from multiprocessing import Process

def task(q):
    try:
        q.put(f'{os.getpid()}',block=False)
    except Exception:
        return


if __name__ == '__main__':
    q = Queue(10)
    for i in range(100):
        p = Process(target=task,args=(q,))
        p.start()
    for i in range(1,11):
        print(f'排名第{i}的用戶: {q.get()}',)
用進程通訊隊列模擬實例

棧與隊列性質相似,特色與隊列相反:先進後出(FILO)

import queue

q = queue.LifoQueue()
q.put(1)
q.put(3)
q.put('barry')

print(q.get())
print(q.get())
print(q.get())
print(q.get())
棧的基本方法

優先級隊列

須要經過元組的形式放入,(int,數據) int 表明優先級,數字越低,優先級越高

import queue
q = queue.PriorityQueue(3)

q.put((10, '垃圾消息'))
q.put((-9, '緊急消息'))
q.put((3, '通常消息'))

print(q.get())
print(q.get())
print(q.get())
隊列的基本方法

生產者消費者模型(多應用於併發)

生產者:生產數據進程

消費者:對生產者生產出來的數據作進一步處理進程

from multiprocessing import Process
from multiprocessing import Queue
import time
import random


def producer(name,q):
    for i in range(1,6):
        time.sleep(random.randint(1,3))
        res = f'{i}號包子'
        q.put(res)

        print(f'\033[0;32m 生產者{name}: 生產了{res}\033[0m')



def consumer(name,q):
    while 1:
        try:
            time.sleep(random.randint(1,3))
            ret = q.get(timeout=5)
            print(f'消費者{name}: 吃了{ret}')
        except Exception:
            return




if __name__ == '__main__':

    q = Queue()

    p1 = Process(target=producer, args=('alex',q))
    p2 = Process(target=consumer, args=('barry',q))

    p1.start()
    p2.start()

 模型中有三個主體:生產者,消費者,容器隊列

合理的調控多個進程去生成數據以及提取數據,中間有個必不可少的環節就是容器隊列

若是沒有容器,生產者與消費者造成強耦合性,不合理,因此要有一個緩衝區(容器),平衡了生產力與消費力

總結

進程之間的通訊

基於文件 + 鎖    效率低,麻煩

基於隊列       推薦

基於管道       管道本身加鎖,底層可能會出現數據丟失損壞

應用

多個進程搶佔一個資源

串行,有序以及數據安全

多個進程實現併發效果

生產者消費者模型

相關文章
相關標籤/搜索