python併發編程-進程間通訊-Queue隊列使用-生產者消費者模型-線程理論-建立及對象屬性方法-線程互斥鎖-守護線程-02

進程補充

進程通訊前言

要想實現進程間通訊,能夠用管道或者隊列python

隊列比管道更好用(隊列自帶管道和鎖)編程

隊列特色:先進先出安全

堆棧特色:先進後出網絡

咱們採用隊列來實現進程間數據通訊,下面先介紹一下隊列多線程

Queue隊列的基本使用

基本方法:q.put(元素) q.get() q.get_nowait() q.full() q.empty() 併發

from multiprocessing import Process, Queue

q = Queue(5)  # 實例化出一個對象
# --------------------------------------
# q.put(元素) 往隊列裏放東西
#   若是隊列滿了還往裏面放,就會等在這裏
# --------------------------------------
# q.put(1)
# q.put(2)
# q.put(3)
# --------------------------------------
# # q.full() 判斷隊列有沒有滿
# --------------------------------------
# print(q.full())  # q.full 判斷隊列有沒有滿
# # False
# q.put(4)
# q.put(5)
# # q.put(6)  # 若是隊列滿了還往裏面放,就會等在這裏
# print(q.full())
# # True

for i in range(5):
    q.put(i)
print(q.full())
# True
# --------------------------------------
# q.get() 從隊列頭取一個值
#   若是隊列空了,就會等在這裏,等數據過來
# --------------------------------------
print(q.get())
print(q.full())
# 0
# False
print(q.get())
print(q.get())
# print(q.get())
# --------------------------------------
# q.get_nowait() 從隊列頭取一個值
#   在隊列有數據的狀況下,與get取值同樣
#   當隊列沒有值的狀況下,取值直接報錯
# --------------------------------------
print(q.get_nowait())  # 在隊列有數據的狀況下,與get取值同樣,當隊列沒有值的狀況下,取值直接報錯
# --------------------------------------
# q.empty() 判斷隊列是否爲空
#   在併發的狀況下,這個方法不許確
# --------------------------------------
print(q.empty())  # 判斷隊列是否爲空,須要注意的是在併發的狀況下,這個方法不許確
print(q.get())
# 1
# 2
# 3
# False
# 4
# print(q.get())  # 若是隊列空了,就會等在這裏,等數據過來
print(q.empty())
# True
# print(q.get_nowait())
# 直接報錯 queue.Empty

經過Queue隊列實現進程間通訊(IPC機制)

數據的互通,可實現主進程與子進程之間的互通,子進程與子進程之間的互通
數據只有一份,取完就沒了,沒法重複獲取同一份數據app

from multiprocessing import Queue, Process


def producer(q):
    q.put('hello baby.')


def consumer(q):
    print(q.get())


if __name__ == '__main__':
    q = Queue()  # 生成一個隊列對象
    p1 = Process(target=producer, args=(q,))
    c1 = Process(target=consumer, args=(q,))
    p1.start()
    c1.start()  # 子進程獲取到了另外一個子進程的數據
    # hello baby.
    # print(q.get())  # 主進程獲取到了子進程的數據
    # hello baby.

生產者消費者模型

生產者:生產/製造數據的dom

消費者:消費/處理數據的異步

例子:作包子的,賣包子的
	1.作的包子遠比買包子的多
	2.作的包子遠比買包子的少
	--> 供需不平衡

用處:解決供需不平衡的問題測試

以作包子買包子爲例實現當包子賣完了中止消費行爲

方式一

from multiprocessing import Process, Queue
import time
import random


def producer(name, food, q: Queue):
    for i in range(10):
        data = f'{name} 生產了 {food}{i}'
        time.sleep(random.random())
        q.put(data)
        print(data)


def consumer(name, q):
    while True:
        res = q.get()
        if not res:  # 已經把生產者作的東西所有吃完了,那麼本消費者也結束食用
            break
        data = res.split(' ')[2]
        data = f'{name} 吃了 {data}'
        print(data)
        time.sleep(random.random())


if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=('大廚egon', '饅頭', q))
    p2 = Process(target=producer, args=('跟班tank', '生蠔', q))
    c = Process(target=consumer, args=('jason', q))
    c2 = Process(target=consumer, args=('吃貨kevin', q))
    p.start()
    p2.start()
    c.start()
    c2.start()
    # 不知道何時生產者何時生成完
    p.join()
    p2.join()
    q.put(None)  # 經過 None來標誌生產者已生產完成
    q.put(None)
    # 能夠實現,可是很差

方式二

改用JoinableQueue模塊的隊列守護進程來實現

from multiprocessing import Process, JoinableQueue
import time
import random


def producer(name, food, q: JoinableQueue):
    for i in range(10):
        data = f'{name} 生產了 {food}{i}'
        time.sleep(random.random())
        q.put(data)
        print(data)


def consumer(name, q):
    while True:
        res = q.get()
        if not res:
            break
        data = res.split(' ')[2]
        data = f'{name} 吃了 {data}'
        print(data)
        time.sleep(random.random())
        q.task_done()  # 告訴隊列,你已經從隊列中取出了一個數據,而且處理完畢了


if __name__ == '__main__':
    q = JoinableQueue()
    p = Process(target=producer, args=('大廚egon', '饅頭', q))
    p2 = Process(target=producer, args=('跟班tank', '生蠔', q))
    c = Process(target=consumer, args=('jason', q))
    c2 = Process(target=consumer, args=('吃貨kevin', q))
    p.start()
    p2.start()
    c.daemon = True  # 配合join,結束程序消費者也結束(注意join是主進程的最後一句代碼)
    c.start()
    c2.daemon = True
    c2.start()
    # 不知道何時生產者何時生成完
    p.join()
    p2.join()

    q.join()  # 等待隊列中數據所有取出,執行完了這句話,也就意味着隊列中沒有數據了(消費者那裏仍是會卡住,get不到東西等待)
    # 配合上 守護進程 來實現....

線程

什麼是線程

進程和線程其實都是虛擬單位,都是用來幫助咱們形象的描述某種事物

進程:資源單位(一塊獨立的內存空間)

線程:執行單位

將內存比喻成工廠,那麼進程就至關於工廠裏的車間,而你的線程就至關因而車間裏面的流水線

CPU其實運行的實際上是線程,進程只是資源單位

線程執行時須要的資源單位都跟進程要

ps:每一個進程都自帶一個線程,線程纔是真正的執行單位,進程只是在線程運行過程當中提供代碼運行所須要的資源

每一個進程都會自帶一個線程

線程沒有主次之分,只不過咱們默認就把主進程自帶的那個線程叫作主線程

爲何要有線程

開進程

  • 申請內存空間 ---> 耗資源
  • 「拷貝代碼」 ---> 耗資源

開線程

  • 一個進程內能夠起多個線程,而且線程與線程之間數據是共享的

ps:開啓線程的開銷要遠遠小於開啓進程的開銷(可能剛執行完建立線程的代碼線程就建立好了)

開啓線程的兩種方式

方式一

from threading import Thread
import time


def task(name):
    print(f"{name} is running")
    time.sleep(3)
    print(f"{name} is over")


t = Thread(target=task, args=('egon', ))  # 開線程不須要在 __main__ 代碼塊內,可是習慣性的仍是寫在 __main__ 內
t.start()  # 告訴操做系統開啓一個線程
# 線程的開銷遠遠小於進程,小到以致於能夠代碼執行完,線程就已經開啓了
print("主")  # 線程沒有主次之分,都在同一個進程的名稱空間裏,只是人爲把進程自帶的線程叫作主線程
# egon is running
# 主線程  # 進程的時候這個主線程可能會是最早打印的
# egon is over

方式二

from threading import Thread
import time


class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"{self.name} is running")
        time.sleep(1)
        print(f"{self.name} is over")


if __name__ == '__main__':
    t = MyThread('jason')
    t.start()  # 開啓線程的速度很是快,幾乎代碼執行完線程就已經開啓
    print("主")

# jason is running
# 主
# jason is over

線程之間數據共享

from threading import Thread

money = 666


def task():
    global money
    money = 999


t = Thread(target=task)
t.start()
t.join()  # 確保是線程運行結束後
print(money)
# 999  # 主線程與子線程之間數據是通用的

線程間想要實現數據通訊,不須要藉助於隊列(線程間支持數據通訊)

線程對象的其餘屬性和方法

import time
from threading import Thread, active_count, current_thread
import os


def task(name):
    print(f"{name} is running {os.getpid()}")
    # # ------------------------------------------------
    # # current_thread().name current_thread().getname() 當前線程名
    # #   記得導入模塊
    # # ------------------------------------------------
    # print(f"current_thread().name:{current_thread().name}")
    # current_thread().name:Thread-1
    time.sleep(1)
    print(f"{name} is over")


# t = Thread(target=task, args=('jason', ))
# t.start()
# # ------------------------------------------------
# # os.getpid() os.getppid() 獲取進程號 父進程號
# #   多個線程屬於同一進程
# # ------------------------------------------------
# print(f"pid {os.getpid()}")
# # jason is running 5572
# # pid 5572
# # jason is over


t = Thread(target=task, args=('jason', ))
t.start()
# ------------------------------------------------
# active_count()  統計當前存活的線程數
#   記得導入模塊
# ------------------------------------------------
print(active_count())
print(f"pid {os.getpid()}")
# jason is running 5728
# 2
# pid 5728
print(f"主 current_thread().name:{current_thread().name}")
# 主 current_thread().name:MainThread

t.join()  # 主線程等待子線程運行結束
# jason is over
print("主 active_count", active_count())  # 可能會有問題,多線程是異步,可能join的線程結束了,其餘線程也正好結束了(多個線程時)
# 主 active_count 1
# Thread.join(t)  # 能夠考慮用類調用對象方法,傳入對象來在循環裏對線程對象進行操做

守護線程

主線程要等待全部非守護線程結束後纔會結束(不是主線程的代碼執行完了就立馬結束了)

主線程結束後,守護(子)線程也會當即結束

主線程運行結束以後爲何須要等待子線程結束才能結束呢?

主線程的結束也就意味着進程的結束
主線程必須等待其餘非守護線程的結束才能結束
由於子線程在運行的時候須要使用進程中的資源,而主線程一旦結束了,資源也就銷燬了

# from threading import Thread, current_thread
# import time
#
#
# def task(i):
#     print(f"{current_thread().name}")
#     time.sleep(i)
#     print("GG")
#
#
# for i in range(3):
#     t = Thread(target=task, args=(i, ))
#     t.start()
#
#
# print("主")
# # 循環的時候就已經打印了部分數據了(異步)
# # Thread-1
# # GG
# # Thread-2
# # Thread-3
# # 主
# # GG
# # GG

# 主線程運行結束以後爲何須要等待子線程結束才能結束呢?
'''
主線程的結束也就意味着進程的結束
    主線程必須等待其餘非守護線程的結束才能結束
由於子線程在運行的時候須要使用進程中的資源,而主線程一旦結束了,資源也就銷燬了
'''
from threading import Thread, current_thread
import time


def task(i):
    print(f"{current_thread().name}")
    time.sleep(i)
    print("GG")


for i in range(3):
    t = Thread(target=task, args=(i,))
    t.daemon = True
    t.start()

print("主")
# Thread-1
# GG
# Thread-2
# Thread-3
# 主

測試

下面程序的執行結果是什麼?

from threading import Thread
import time


def foo():
    print(123)
    time.sleep(1)
    print("end123")


def bar():
    print(456)
    time.sleep(3)
    print("end456")


t1 = Thread(target=foo)
t2 = Thread(target=bar)

t1.daemon = True
t1.start()
t2.start()
print("main-------")


# 123
# 456
# main-------
# end123
# end456

線程互斥鎖

從線程間通訊那裏的案例能夠看出,線程間數據是相通的,那麼多個線程對同一份數據進行操做會產生問題

下面一樣模擬一個網絡延遲來對數據進行操做(確保全部線程都執行完的操做能夠記一下)

不加鎖遇到延遲的狀況

# 模擬網絡延遲的現象
#   多個線程操做同一個數據,也會形成數據不安全
import time
from threading import Thread

n = 10


def task():
    global n
    tmp = n
    time.sleep(1)
    n = tmp - 1


# -------------------------------
t_list = []
for i in range(10):
    t = Thread(target=task)
    t.start()
    t_list.append(t)

# 確保其餘線程都執行完了以後再打印
for t in t_list:
    t.join()
# -------------------------------

print(n)
# 9

加鎖後遇到延遲

# 加鎖解決問題
import time
from threading import Thread, Lock

n = 10


def task(mutex):
    mutex.acquire()  # 搶鎖
    global n
    tmp = n
    time.sleep(1)
    n = tmp - 1
    mutex.release()  # 釋放鎖


t_list = []
mutex = Lock()
for i in range(10):
    t = Thread(target=task, args=(mutex, ))
    t.start()
    t_list.append(t)

# 確保其餘線程都執行完了以後再打印
for t in t_list:
    t.join()
print(n)
# 0  # 等10s多點 後打印出結果,數據未受延遲影響,保證了數據安全

爲何用互斥鎖不用 線程/進程對象.join()

雖然互斥鎖也是將併發改爲串行,犧牲效率來保證數據安全,這一點線程對象.join()也能夠實現將併發改爲串行,一樣保證數據安全,但線程對象.join()是將每個線程的運行都變成串行的,對比互斥鎖的只將數據操做部分編程串行消耗的時間要多得多,若果線程耗時長,執行效率就會低的可怕

# # 不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全
# from threading import current_thread, Thread, Lock
# import os
# import time
#
#
# def task():
#     # 未加鎖的代碼併發運行
#     time.sleep(3)
#     print('%s start to run' % current_thread().getName())
#     global n
#     # 加鎖的代碼串行運行
#     lock.acquire()
#     temp = n
#     time.sleep(0.5)
#     n = temp - 1
#     lock.release()
#
#
# if __name__ == '__main__':
#     n = 100
#     lock = Lock()
#     threads = []
#     start_time = time.time()
#     for i in range(100):
#         t = Thread(target=task)
#         threads.append(t)
#         t.start()
#     for t in threads:
#         t.join()
#     stop_time = time.time()
#     print('主:%s n:%s' % (stop_time - start_time, n))
#
# '''
# Thread-3 start to run
# Thread-1 start to run
# ......
# Thread-100 start to run
# Thread-96 start to run
# 主:53.06105661392212 n:0
# '''

# 利用 join 保證數據安全
from threading import current_thread, Thread, Lock
import os
import time


def task():
    time.sleep(3)
    print('%s start to run' % current_thread().getName())
    global n
    temp = n
    time.sleep(0.5)
    n = temp - 1


if __name__ == '__main__':
    n = 100
    lock = Lock()
    start_time = time.time()
    for i in range(100):
        t = Thread(target=task)
        t.start()
        t.join()
    stop_time = time.time()
    print('主:%s n:%s' % (stop_time - start_time, n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.1616487503052 n:0 # 耗時是多麼的恐怖
'''

線程和進程的用戶大同小異,能夠對比着來記

後續能夠畫圖或表格用對比的方式來整理一下,方便記憶~

相關文章
相關標籤/搜索