快速瞭解Python併發編程的工程實現(上)

關於我
編程界的一名小程序猿,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是咱們團隊的主要技術棧。html

0x00 前言

前面的文章中Python協程的概念和實現作了簡單地介紹。爲了對Python併發編程有更加全面地認識,我也對Python線程和進程的概念和相關技術的使用進行了學習,因而有了這篇文字。python

0x01 線程與進程

當咱們在手機或者PC上打開一個應用時,操做系統就會建立一個進程實例,並開始執行進程裏的主線程,它有獨立的內存空間和數據結構。線程是輕量級的進程。在同一個進程裏,多個線程共享內存和數據結構,線程間的通訊也更加容易。數據庫

0x02 使用線程實現併發

熟悉Java編程的同窗就會發現Python中的線程模型與Java很是相似。下文咱們將主要使用Python中的線程模塊threading包。(對於低級別的API模塊thread不推薦初學者使用。本文全部代碼將使用Python 3.7的環境)編程

threading

要使用線程咱們要導入threading包,這個包是在_thread包(即上文提到的低級別thread模塊)的基礎上封裝了許多高級的API,在開發中應當首選threading包。小程序

常見地,有兩種方式構建一個線程:經過Thread的構造函數傳遞一個callable對象,或繼承Thread類並重寫run方法。數據結構

import threading

import time


def do_in_thread(arg):
    print('do in thread {}'.format(arg))
    time.sleep(2)


if __name__ == '__main__':
    start_time = time.time()
    
    t1 = threading.Thread(target=do_in_thread, args=(1,), name='t1')
    t2 = threading.Thread(target=do_in_thread, args=(2,), name='t2')

    t1.start()
    t2.start()
    
    # join方法讓主線程等待子線程執行完畢
    t1.join()
    t2.join()
    print("\nduration {} ".format(time.time() - start_time))
    
# do in thread 1
# do in thread 2
# duration 2.001628875732422 

複製代碼

還能夠經過繼承threading.Thread類定義線程併發

import threading

import time


def do_in_thread(arg):
    print('do in thread {}'.format(arg))
    time.sleep(2)

    
class MyThread(threading.Thread):

    def __init__(self, arg):
        super().__init__()
        self.arg = arg

    def run(self):
        start_time = time.time()
        do_in_thread(self.arg)
        print("duration {} ".format(time.time() - start_time))


def start_thread_2():
    start_time = time.time()

    print("duration {} ".format(time.time() - start_time))


if __name__ == '__main__':
    mt1 = MyThread(3)
    mt2 = MyThread(4)

    mt1.start()
    mt2.start()
    
    # join方法讓主線程等待子線程執行完畢
    mt1.join()
    mt2.join() 
    
    
# do in thread 3
# do in thread 4
# duration 2.004937171936035 
複製代碼

join方法的做用是讓調用它的線程等待其執行完畢。app

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
複製代碼

定義線程時能夠經過指定構造方法的name參數設置線程名稱。
target用於指定callable對象,將在run方法中被調用。
args設置target對象被調用時的參數,類型是元組(),例如上文中的do_in_thread(arg)方法的參數。
kwargs是一個字典類型的參數,也用於target對象的參數。
daemon設置守護線程的標識,若是設置爲True那麼這個線程就是守護線程,此時若是主線程結束了,那麼守護線程也會當即被殺死。因此當有在守護線程中打開文件、數據庫等資源操做時,釋放資源就有可能出錯。dom

線程池

程序中如有大量的線程建立和銷燬,則對性能影響較大。咱們可使用線程池。一樣地,它的APIJava極爲類似。函數

Executor
concurrent.futures.Executor
複製代碼

這是一個抽象類,定義了線程池的接口。

  • submit(fn, *args, **kwargs)
    執行fn(args,kwargs) 並會返回一個future對象,經過future可獲取到執行結果
  • map(func, *iterables, timeout=None, chunksize=1)
    這個方法與map(func,*iterables)相似
  • shutdown(wait=True)
    關閉線程池
from concurrent.futures import ThreadPoolExecutor
# 使用max_workers參數指定線程池中線程的最大數量爲2
with ThreadPoolExecutor(max_workers=2) as executor:
    # 提交任務到線程池
    future = executor.submit(pow, 2, 31) # 計算2^31
    future2 = executor.submit(pow, 1024, 2)
    # 使用future 獲取執行結果
    print(future.result())
    print(future2.result())

# 執行結果
# 2147483648
# 1048576
複製代碼
同步

如有多個線程對同一個資源或內存進行訪問或操做就有會產生競爭條件。
Python提供了鎖、信號量、條件和事件等同步原語可幫忙咱們實現線程的同步機制。

Lock

Lock有兩種狀態:lockedunlocked。它有兩個基本方法:acquire()release(),且都是原子操做的。
一個線程經過acquire()獲取到鎖,Lock的狀態變成locked,而其它線程調用acquire()時只能等待鎖被釋放。 當線程調用了release()Lock的狀態就變成了unlocked,此時其它等待線程中只有一個線程將得到鎖。

import threading

share_mem_lock = 0
share_mem = 0
count = 1000000

locker = threading.Lock()


def add_in_thread_with_lock():
    global share_mem_lock
    for i in range(count):
        locker.acquire()
        share_mem_lock += 1
        locker.release()


def minus_in_thread_with_lock():
    global share_mem_lock
    for i in range(count):
        locker.acquire()
        share_mem_lock -= 1
        locker.release()


def add_in_thread():
    global share_mem

    for i in range(count):
        share_mem += 1


def minus_in_thread():
    global share_mem

    for i in range(count):
        share_mem -= 1


if __name__ == '__main__':
    t1 = threading.Thread(target=add_in_thread_with_lock)
    t2 = threading.Thread(target=minus_in_thread_with_lock)

    t3 = threading.Thread(target=add_in_thread)
    t4 = threading.Thread(target=minus_in_thread)

    t1.start()
    t2.start()

    t3.start()
    t4.start()

    t1.join()
    t2.join()

    t3.join()
    t4.join()

    print("share_mem_lock : ", share_mem_lock)
    print("share_mem : ", share_mem)

# 執行結果
# share_mem_lock : 0
# share_mem : 51306
複製代碼

沒有使用鎖機制的代碼執行後最後的值頗有可能就不爲0。而有鎖的代碼則能夠保證同步。

RLock

RLockReentrant Lock,就是能夠重複進入的鎖,也叫遞歸鎖。它有3個特色:

  • 誰獲取鎖誰釋放。如A線程獲取了鎖,那麼只有A線程才能釋放該鎖
  • 同一線程可重複屢次獲取該鎖。便可以調用acquire屢次
  • acquire多少次,對應release就多少次,且最後一次release纔會釋放鎖。
Condition

條件是另外一種同步原語機制。其實它的內部是封裝了RLock,它的acquire()release()方法就是RLock的方法。
Condition經常使用的API還有wait()notify()notify_all()方法。 wait()方法會釋放鎖,而後進入阻塞狀態直到其它線程經過notify()notify_all()喚醒本身。wait()方法從新獲取到鎖就會返回。
notify()會喚醒其中一個等待的線程,而notify_all()會喚醒全部等待的線程。
須要注意的是notify()notify_all()執行後並不會釋放鎖,只有調用了release()方法後鎖纔會釋放。
讓咱們看一個來自於《Python並行編程手冊》中的一個生產者與消費者例子

from threading import Thread, Condition
import time

items = []
condition = Condition()


class consumer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def consume(self):
        global condition
        global items
        # 獲取鎖
        condition.acquire()
        if len(items) == 0:
            # 當items爲空時,釋放了鎖,並等待生產者notify
            condition.wait()
            print("Consumer notify : no item to consume")
        # 開始消費
        items.pop()
        print("Consumer notify : consumed 1 item")
        print("Consumer notify : items to consume are " + str(len(items)))
        # 消費以後notify喚醒生產者,由於notify不會釋放鎖,因此還要調用release釋放鎖
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 10):
            time.sleep(2)
            self.consume()


class producer(Thread):

    def __init__(self):
        Thread.__init__(self)

    def produce(self):
        global condition
        global items
        condition.acquire()
        if len(items) == 5:
            # 若items時滿的,則執行wait,釋放鎖,並等待消費者notify
            condition.wait()
            print("Producer notify : items producted are " + str(len(items)))
            print("Producer notify : stop the production!!")
        # 開始生產
        items.append(1)
        print("Producer notify : total items producted " + str(len(items)))
        # 生產後notify消費者,由於notify不會釋放鎖,因此還執行了release釋放鎖。
        condition.notify()
        condition.release()

    def run(self):
        for i in range(0, 10):
            time.sleep(1)
            self.produce()


if __name__ == "__main__":
    producer = producer()
    consumer = consumer()
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

複製代碼
Semaphore

信號量內部維護一個計數器。acquire()會減小這個計數,release()會增長這個計數,這個計數器永遠不會小於0。當計數器等於0時,acquire()方法就會等待其它線程調用release()
仍是藉助一個生產者與消費者的例子來理解

# -*- coding: utf-8 -*-

"""Using a Semaphore to synchronize threads"""
import threading
import time
import random

# 默認狀況內部計數爲1,這裏設置爲0。
# 若設置爲負數則會拋出ValueError
semaphore = threading.Semaphore(0)


def consumer():
    print("consumer is waiting.")
    # 獲取一個信號量,由於初始化時內部計數設置爲0,因此這裏一開始時是處於等待狀態
    semaphore.acquire()
    # 開始消費
    print("Consumer notify : consumed item number %s " % item)


def producer():
    global item
    time.sleep(2)
    # create a random item
    item = random.randint(0, 1000)
    # 開始生產
    print("producer notify : produced item number %s" % item)
    # 釋放信號量, 內部計數器+1。當有等待的線程發現計數器大於0時,就會喚醒並從acquire方法中返回
    semaphore.release()


if __name__ == '__main__':
    for i in range(0, 5):
        t1 = threading.Thread(target=producer)
        t2 = threading.Thread(target=consumer)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    print("program terminated")

複製代碼

信號量常常會用於資源容量肯定的場景,好比數據庫鏈接池等。

Event

事件在線程間的通訊方式很是簡單。一個線程發送事件另外一個線程等待接收。
Event對象內部維護了一個bool變量flag。經過set()方法設置該變量爲Trueclear()方法設置flagFalsewait()方法會一直等待直到flag變成True

結合例子

# -*- coding: utf-8 -*-

import time
from threading import Thread, Event
import random

items = []
event = Event()

class consumer(Thread):
    def __init__(self, items, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        while True:
            time.sleep(2)
            # 等待事件
            self.event.wait()
            # 開始消費
            item = self.items.pop()
            print('Consumer notify : %d popped from list by %s' % (item, self.name))


class producer(Thread):
    def __init__(self, integers, event):
        Thread.__init__(self)
        self.items = items
        self.event = event

    def run(self):
        global item
        while True:
            time.sleep(2)
            # 開始生產
            item = random.randint(0, 256)
            self.items.append(item)
            print('Producer notify : item N° %d appended to list by %s' % (item, self.name))
            print('Producer notify : event set by %s' % self.name)
            # 發送事件通知消費者消費
            self.event.set()
            print('Produce notify : event cleared by %s ' % self.name)
            # 設置事件內部變量爲False,隨後消費者線程調用wait()方法時,進入阻塞狀態
            self.event.clear()


if __name__ == '__main__':
    t1 = producer(items, event)
    t2 = consumer(items, event)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

複製代碼
Timer

定時器TimerThread的子類。用於處理定時執行的任務。啓動定時器使用start(),取消定時器使用cancel()

from threading import Timer

def hello():
    print("hello, world")

t = Timer(3.0, hello)
t.start()  # 3秒後 打印 "hello, world"
複製代碼
with語法

LockRLockConditionSemaphore可使用with語法。 這幾個對象都實現擁有acquire()release()方法,且都實現了上下文管理協議。

with some_lock:
    # do something...
複製代碼

等價於

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()
複製代碼

0x03 小結

本文主要介紹Python中線程的使用,主要是對threading模塊中Thread對象、線程池Executor常見用法的展現。還了解了線程的同步原語LockRLockConditionSemaphoreEvent以及TimerAPI的使用。

0x04 引用

相關文章
相關標籤/搜索