Python併發編程

1. 進程、線程、協程

進程是cpu資源分配的最小單位,進程是正在進行的一個過程或者說一個任務。
線程是cpu調度的最小單位。
協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。(單線程下的併發)
進程線程區別:python

  • 線程開銷小
  • 主進程下多個進程PID不一樣;主進程下多個線程PID與主進程相同
  • 進程間地址空間相隔離;同一個進程內多個線程共享該進程的地址資源。

2. 殭屍進程

子進程結束後,子進程的狀態信息仍然保存在系統中,直到父進程結束。(全部進程都會經歷殭屍進程)mysql

3. 孤兒進程

父進程已經退出,但它的一個或多個子進程還在運行, 這些進程就會成爲孤兒進程,由init進程收養。sql

4. 守護進程

守護進程會在主進程代碼執行結束後就終止。安全

5. 建立子進程

方式一:多線程

import time
from multiprocessing import Process


def task(name):  # 子進程執行的任務
    print('%s is running' % name)
    time.sleep(3)
    print('%s is done' % name)


if __name__ == '__main__':

    # 實例化獲得對象
    p1 = Process(target=task, args=('子進程1',))  # 必須加逗號
    p2 = Process(target=task, kwargs={'name': '子進程2'})

    # 調用對象下的方法,開啓進程
    p1.start()  # 僅是給操做系統發送信號, 由操做系統開啓子進程
    p2.start()

    print('主')

方式二:併發

import time
from multiprocessing import Process

class MyProcess(Process):

    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):  # 其實是start,可是必須叫run
        print('%s is running' % self.name)
        time.sleep(3)
        print('%s is done' % self.name)


if __name__ == '__main__':

    p1 = MyProcess('子進程1')
    p2 = MyProcess('子進程2')

    p1.start()  # 啓動進程,並調用該子進程中的p.run() 
    p2.start()

    print('主')

6. 查看進程的相關參數

進程名:print(p.name)
進程PID: print(os.getpid) print(p.pid)
父進程PID:print(os.getppid)app

7. 相關函數

等待進程運行結束:p.join()
查看進程是否在運行:print(p.is_alive())
終止進程:p.terminate() 進程結束時間由操做系統決定。
終止進程但不進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程;若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖。
開啓守護進程:p.daemon = True 必須在p.start()以前設置,守護進程將被禁止建立子進程。dom

8. 互斥鎖

例子:異步

from multiprocessing import Process, Lock
import time


def task(name, lock):
    lock.acquire()
    print('%s 1' % name)
    time.sleep(1)
    print('%s 2' % name)
    time.sleep(1)
    print('%s 3' % name)
    lock.release()


if __name__ == '__main__':

    lock = Lock()
    for i in range(3):
        p = Process(target=task, args=('進程%s' % i, lock))
        p.start()

9. 隊列 Queue

例子:socket

from multiprocessing import Queue

que = Queue(3)

que.put(1)
que.put({'a': 'hello'})
que.put([1, 2, 3])
print(que.full())  # 判斷隊列是否已滿
# que.put(4) #再放就阻塞住了

print(que.get())
print(que.get())
print(que.get())
print(que.empty())  # 隊列是否空了
# print(que.get())  # 再取就阻塞住了

10. 隊列 JoinableQueue

例子:

from multiprocessing import Process, JoinableQueue
import time
import random


def producer(q, name, food):
    for i in range(2):
        res = '%s%s' % (food, i)
        time.sleep(random.randint(1, 3))
        print('\033[34m%s 生產了 %s\033[0m' % (name, res))
        q.put(res)  # 入隊
    q.join()  # 等到消費者把隊列中的全部的數據都取走以後,生產者才結束


def consumer(q, name):
    while True:
        res = q.get()  # 出隊
        time.sleep(random.randint(1, 3))
        print('\033[32m%s 吃 %s\033[0m' % (name, res))
        q.task_done()  # 發送信號給q.join(),說明已經從隊列中取走一個數據並處理完畢了


if __name__ == '__main__':

    que = JoinableQueue()

    # 3個生產者
    p1 = Process(target=producer, args=(que, '生產者1', '包子'))
    p2 = Process(target=producer, args=(que, '生產者2', '餃子'))
    p3 = Process(target=producer, args=(que, '生產者3', '月餅'))

    # 2個消費者
    c1 = Process(target=consumer, args=(que, '消費者1'))
    c2 = Process(target=consumer, args=(que, '消費者2'))
    c1.daemon = True  # 將消費者設爲守護進程
    c2.daemon = True  # 消費者在生產者結束後, 隨主進程一塊兒結束

    l1 = [p1, p2, p3]
    l2 = [c1, c2]

    # 開始生產
    for p in l1:
        p.start()

    # 開始消費
    for c in l2:
        c.start()

    # 主進程等生產者p一、p二、p3結束
    # 而p一、p二、p3在消費者把全部數據都取乾淨以後結束
    for p in l1:
        p.join()
    print('主')

11. 協程

使用greenlet庫:

from greenlet import greenlet

def eat(name):
    print('%s eat 1' % name)
    g2.switch('egon')  # 傳入第一次參數, 以後不用再傳
    print('%s eat 2' % name)
    g2.switch()
    
    
def play(name):
    print('%s play 1' % name)
    g1.switch()
    print('%s play 2' % name)


g1 = greenlet(eat)
g2 = greenlet(play)

g1.switch('egon')  # 在第一次switch時傳入參數,之後都不須要

使用gevent(沒法識別socket time等模塊的阻塞,須要使用gevent自帶的阻塞)

import gevent


def eat(name):
    print('%s eat 1' % name)
    gevent.sleep(2)  # gevent識別到阻塞,進行切換
    print('%s eat 2' % name)


def play(name):
    print('%s play 1' %name)
    gevent.sleep(1)
    print('%s play 2' %name)


g1 = gevent.spawn(eat, 'egon')
g2 = gevent.spawn(play, name='egon')
g1.join()
g2.join()
# gevent.joinall([g1,g2])  # 等待列表中全部協程對象結束
print('主')

導入monkey 可識別socket time等阻塞:

from gevent import monkey; monkey.patch_all()
# patch_all()必須放在導入socket、time等模塊前,不然gevent沒法識別socket、time的阻塞
import gevent
import time


def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')


def play():
    print('play 1')
    time.sleep(1)
    print('play 2')


g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1, g2])
print('主')

12. 進程池、線程池

導入模塊:

from concurrent.futures import ProcessPoolExecutor  # 進程池
from concurrent.futures import ThreadPoolExecutor  # 線程池

建立進程/線程池:

executor = ProcessPoolExecutor(max_worker=3)  # 進程池
executor = ThreadPoolExecutor(max_workers=3)  #線程池

將進程/線程放入池內:

future = exector.submit(task, parm)

"""
executor.map(task, range(1,12))
至關於:
for i in range(11):
    exector.submit(task, i)
"""

關閉進程/線程池:

exector.shutdown()  # 默認wait參數爲True

wait=True 等待池內全部任務執行完畢回收完資源後再執行後續代碼。
wait=False 不join,直接執行後續代碼。
關閉進程/線程池後,不容許再向已關閉的進程/線程池內加入進程/線程。

拿到進程/線程運行結果:

print(future.result())

回調函數:
future.add_done_callback(func) futuretask結束後,會自動把future對象當作參數傳給回調函數func。

例子:

from concurrent.futures import ProcessPoolExecutor
import time
import random


def task(name):
    print("%s is running" % name)
    time.sleep(random.randint(3, 5))
    return name


def func(future):
    name = future.result()
    print("%s's callback function" % name)


if __name__ == '__main__':
    executor = ProcessPoolExecutor(5)
    futures = []

    for i in range(3):
        future = executor.submit(task, "task"+str(i))
        future.add_done_callback(func)
        futures.append(future)

    executor.shutdown(True)

    print("主")

13. IO模型

  • IO涉及的兩個對象:進程/線程內核
  • IO操做的兩個階段:
    1. 等待數據準備
    2. 將數據從內存拷貝到進程中
  • 5種IO模型:
    • 阻塞IO(blocking IO): 在內核將數據準備好以前,系統調用會一直等待。
    • 非阻塞IO(nonblocking IO): 每次客戶詢問內核是否有數據準備好,當有數據報準備好時,就進行拷貝數據報的操做,當沒有數據報準備好時,也不阻塞程序,內核直接返回未準備就緒的信號,等待用戶程序的下一個輪尋。
    • 多路複用IO(IO multiplexing): 多路複用IO屬於阻塞IO,但能夠對多個文件描述符進行阻塞監聽,因此效率較阻塞IO的高。
    • 信號驅動IO(signal driven IO): 在信號驅動IO模型中,當用戶線程發起一個IO請求操做,會給對應的socket註冊一個信號函數,而後用戶線程會繼續執行,當內核數據就緒時會發送一個信號給用戶線程,用戶線程接收到信號以後,便在信號函數中調用IO讀寫操做來進行實際的IO請求操做。
    • 異步IO(asynchronous IO): 數據準備、將數據從內從拷貝到進程均由內核完成,完成後告知進程該IO操做已完成。期間程序不阻塞。

阻塞程度:阻塞IO>非阻塞IO>多路轉接IO>信號驅動IO>異步IO,效率是由低到高。

14. 信號量

信號量也是一把鎖,但同一時間能夠被指定大小的任務獲取。

模塊導入:

from threading import Semaphore

設置計數器大小:

sm = Semaphore(value=1)  # 計數器大小默認爲1

兩個主要的方法:

acquire()  # 內置計數器-1, 當計數器爲0時阻塞,等待其餘線程調用release()
release()  # 內置計數器+1

例子:

from threading import Thread, Semaphore
import threading
import time


def func():
    sm.acquire()
    print('%s get sm' % threading.current_thread().getName())
    time.sleep(3)
    sm.release()


if __name__ == '__main__':
    sm = Semaphore(5)
    for _ in range(23):
        t = Thread(target=func)
        t.start()

15. Event

Event對象:
用於線程間通訊,即程序中的其一個線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,就用到了Event對象。

模塊導入:

from threading import Event

建立Event對象:

event = Event()

相關方法:

event.isSet()  # 返回event的狀態值
event.wait([maxtime])  # 若是 event.isSet() == False將阻塞線程, [maxtime]-超時時間
event.set()  # 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度
event.clear()  # 恢復event的狀態值爲False

例子:

from threading import Thread, Event
import threading
import time
import random


def conn_mysql():
    count = 1
    while not event.is_set():
        if count > 3:
            print('\033[31m[%s]鏈接失敗\033[0m' % threading.current_thread().getName())
            exit(0)
        print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))
        event.wait(1)
        count += 1
    print('<%s>鏈接成功' % threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(1, 4))
    event.set()


if __name__ == '__main__':
    event = Event()
    conn1 = Thread(target=conn_mysql)
    conn2 = Thread(target=conn_mysql)
    check = Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()

16. 定時器

定時器Timer:指定n秒後執行某項操做(不會像sleep同樣阻塞)
例子:

from threading import Timer


def hello():
    print("hello, world")


t = Timer(1, hello)  // 一秒後執行hello函數
t.start()

17. 遞歸鎖

死鎖現象: 指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,這種永遠在互相等待的進程稱爲死鎖進程。
遞歸鎖: RLock能夠連續acquire屢次。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。

例子:

from threading import Thread, RLock
import time

mutexA = mutexB = RLock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' % self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' % self.name)

        mutexB.release()
        print('\033[41m%s 釋放A鎖\033[0m' % self.name)

        mutexA.release()
        print('\033[42m%s 釋放B鎖\033[0m' % self.name)

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' % self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' % self.name)

        mutexA.release()
        print('\033[43m%s 釋放B鎖\033[0m' % self.name)

        mutexB.release()
        print('\033[44m%s 釋放A鎖\033[0m' % self.name)


if __name__ == '__main__':
    for _ in range(4):
        t = MyThread()
        t.start()

18. GIL全局解釋器鎖

cpython中引進GIL,保證同一時刻同一進程中只有一個線程被執行,獲取鎖並獲取資源,避免了多線程併發執行,保證了線程的安全,但沒法使用多核優點。

結論:

多線程用於IO密集型
多進程用於計算密集型

相關文章
相關標籤/搜索