day29

進程互斥鎖

加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。json

雖然能夠用文件共享數據實現進程間通訊,但問題是:安全

  1. 效率低(共享數據基於文件,而文件是硬盤上的數據)
  2. 須要本身加鎖處理

多進程同時搶購餘票

# 文件db的內容爲:{"count":1}
# 注意必定要用雙引號,否則json沒法識別
# 併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('剩餘票數%s' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(0.1)  # 模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2)  # 模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('購票成功')

def task():
    search()
    get()

if __name__ == '__main__':
    for i in range(100):  # 模擬併發100個客戶端搶票
        p=Process(target=task)
        p.start()

使用鎖來保證數據安全

# 文件db的內容爲:{"count":5}
# 注意必定要用雙引號,否則json沒法識別
# 併發運行,效率高,但競爭寫同一文件,數據寫入錯亂
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db'))
    print('剩餘票數%s' %dic['count'])

def get():
    dic=json.load(open('db'))
    time.sleep(random.random())  # 模擬讀數據的網絡延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(random.random())  # 模擬寫數據的網絡延遲
        json.dump(dic,open('db','w'))
        print('購票成功')
    else:
        print('購票失敗')

def task(lock):
    search()
    lock.acquire() #lock.acquire要放在打開文件以前
    get()
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(100):  # 模擬併發100個客戶端搶票
        p=Process(target=task,args=(lock,))
        p.start()

隊列

隊列和管道都是將數據存放於內存中,隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性網絡

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。多線程

Queue([maxsize])建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。併發

底層隊列使用管道和鎖定實現。app

from multiprocessing import Queue
q=Queue(3)
print(q.empty())
q.put(1)
q.put(2)
q.put(3)
print(q.full())
q.get()
q.get()
q.get()
print(q.empty())
print(q.put_niwait(4))
print(q._buffer)  #查看列表內容

子進程發送數據給父進程

mport time
from multiprocessing import Process, Queue

def f(q):
    #調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。
    q.put([time.asctime(), 'from Eva', 'hello'])  

if __name__ == '__main__':
    q = Queue() #建立一個Queue對象
    p = Process(target=f, args=(q,)) #建立一個進程
    p.start()
    print(q.get())
    p.join()

IPC(進程間通訊)

進程間數據是相互隔離的,若想實現進程間通訊,能夠利用隊列dom

基於隊列實現生產者消費者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        print(222)
        res=q.get()
        time.sleep(1)

        print('%s 吃 %s' %(os.getpid(),res))

def producer(q):
    for i in range(5):
        print(11111)
        time.sleep(2)
        res='包子%s' %i
        q.put(res)

        # print('生產了 %s' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('主')
    
'''
此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環
'''

線程

進程是資源分配的最小單位,線程是CPU調度的最小單位。每個進程中至少有一個線程函數

進程: 資源單位 - 線程: 執行單位ui

線程與進程都是虛擬單位,目的是爲了更好地描述某種事物

GIL鎖

Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中能夠「運行」多個線程,但在任意時刻只有一個線程在解釋器中運行。

對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。

在多線程環境中,Python 虛擬機按如下方式執行:

  1. 設置 GIL;
  2. 切換到一個線程去運行;
  3. 運行指定數量的字節碼指令或者線程主動讓出控制(能夠調用 time.sleep(0));
  4. 把線程設置爲睡眠狀態;
  5. 解鎖 GIL;
  6. 再次重複以上全部步驟。

線程和進程的區別

  1. 地址空間和其餘資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其餘進程不可見
  2. 通訊:進程間通訊IPC,線程間能夠直接讀寫進程數據段(如全局變量)來通訊,須要進程同步和互斥手段的輔助,以保證數據的一致性
  3. 調度和切換:線程上下文切換比進程上下文切換要快的多
  4. 在多線程操做系統中,進程不是一個可執行的實體
  5. 注意:線程不能實現並行, 線程只能實現併發, 進程能夠實現並行

建立線程的兩種方式

from threading import Thread
import time


# 開啓線程方式1:
def task():
    print('線程開啓')
    time.sleep(1)
    print('線程結束')


# t = Thread()
if __name__ == '__main__':
    # 調用Thread線程類實例化獲得線程對象
    t = Thread(target=task)
    t.start()
# 開啓線程方式2:
class MyThread(Thread):
    def run(self):
        print('線程開啓')
        time.sleep(1)
        print('線程結束')
        
if __name__ == '__main__':
    t = MyThread()
    t.start()

線程對象的屬性

from threading import Thread
from threading import current_thread
import time


def task():
    print(f'線程開啓{current_thread().name}')
    time.sleep(3)
    print(f'線程結束{current_thread().name}')
    
if __name__ == '__main__':
    t = Thread(target=task)
    t.daemon = True
    t.start()

    print('主')

線程互斥鎖

#線程間數據共享

from threading import Thread
import time
x = 100


def task():
    print('開啓線程...')
    time.sleep(1)
    global x
    x = 200
    print('關閉線程...')

if __name__ == '__main__':
    t = Thread(target=task)
    t.start()
    t.join()  # 關鍵
    print(x)  # x = 200
    print('主')
from threading import Thread, Lock
import time

mutex = Lock()

n = 100
def task(i):
    print(f'線程{i}啓動...')
    global n
    mutex.acquire()
    temp = n
    # time.sleep(0.1)  # 一共等待10秒
    n = temp-1
    print(n)
    mutex.release()

if __name__ == '__main__':
    t_l=[]
    for i in range(5):
        t = Thread(target=task, args=(i, ))
        t_l.append(t)
        t.start()
        
    for t in t_l:
        t.join()
'''
>>>                 
線程0啓動...        
99                 
線程1啓動...            
98
線程2啓動...
97
線程3啓動...
96
線程4啓動...
95
'''
相關文章
相關標籤/搜索