Python 的併發編程

本文最早發佈在博客: https://blog.ihypo.net/151628...

這篇文章將講解 Python 併發編程的基本操做。併發和並行是對孿生兄弟,概念常常混淆。併發是指可以多任務處理,並行則是是可以同時多任務處理。Erlang 之父 Joe Armstrong 有一張很是有趣的圖說明這兩個概念:
html

我我的更喜歡的一種說法是:併發是宏觀並行而微觀串行。python

GIL

雖然 Python 自帶了很好的類庫支持多線程/進程編程,但衆所周知,由於 GIL 的存在,Python 很難作好真正的並行。編程

GIL 指全局解釋器鎖,對於 GIL 的介紹:安全

全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),是計算機程序設計語言解釋器用於同步線程的一種機制,它使得任什麼時候刻僅有一個線程在執行。多線程

  • 維基百科

其實與其說 GIL 是 Python 解釋器的限制,不如說是 CPython 的限制,由於 Python 爲了保障性能,底層大多使用 C 實現的,而 CPython 的內存管理並非線程安全的,爲了保障總體的線程安全,解釋器便禁止多線程的並行執行。併發

由於 Python 社區認爲操做系統的線程調度已經很是成熟了,沒有必要本身再實現一遍,所以 Python 的線程切換基本是依賴操做系統,在實際的使用中,對於單核 CPU,GIL 並無太大的影響,但對於多核 CPU 卻引入了線程顛簸(thrashing)問題。app

線程顛簸是指做爲單一資源的 GIL 鎖,在被多核心競爭強佔時資源額外消耗的現象。dom

好比下圖,線程1 在釋放 GIL 鎖後,操做系統喚醒了 線程2,並將 線程2 分配給 核心2 執行,可是若是此時 線程2 卻沒有成功得到 GIL 鎖,只能再次被掛起。此時切換線程、切換上下文的資源都將白白浪費。異步

所以,Python 多線程程序在多核 CPU 機器下的性能不必定比單核高。那麼若是是計算密集型的程序,通常仍是考慮用 C 重寫關鍵部分,或者使用多進程避開 GIL。async

多線程

在 Python 中使用多線程,有 threadthreading 可供原則,thread 提供了低級別的、原始的線程以及一個簡單的鎖,由於 thread 過於簡陋,線程管理容易出現人爲失誤,所以官方更建議使用 threading,而 threading 也不過是對 thread 的封裝和補充。(Python3 中 thread 被更名爲 _thread)。

在 Python 中建立線程很是簡單:

import time
import threading


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(1)
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(threading.Thread(
            target=do_task,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主線程
        t.join()
    print("Finish.")

直接建立線程簡單優雅,若是邏輯複雜,也能夠經過繼承 Thread 基類完成多線程:

import time
import threading


class MyTask(threading.Thread):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(1)
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主線程
        t.join()
    print("Finish.")

多進程

在 Python 中,可使用 multiprocessing 庫來實現多進程編程,和多線程同樣,有兩種方法可使用多進程編程。

直接建立進程:

import time
import random
import multiprocessing


def do_something(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(multiprocessing.Process(
            target=do_something,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主線程
        t.join()
    print("Finish.")

繼承進程父類:

import time
import random
import multiprocessing


class MyTask(multiprocessing.Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主線程
        t.join()
    print("Finish.")

multiprocessing 除了經常使用的多進程編程外,我認爲它最大的意義在於提供了一套規範,在該庫下有一個 dummy 模塊,即 multiprocessing.dummy,裏面對 threading 進行封裝,提供了和 multiprocessing 相同 API 的線程實現,換句話說,class::multiprocessing.Process 提供的是進程任務類,而 class::multiprocessing.dummy.Process,也正是有 multiprocessing.dummy 的存在,能夠快速的講一個多進程程序改成多線程:

import time
import random
from multiprocessing.dummy import Process


class MyTask(Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 建立 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 開始執行 task
        t.start()

    for t in tasks:
        # 等待 task 執行完畢
        # 完畢前會阻塞住主線程
        t.join()
    print("Finish.")

不管是多線程仍是多進程編程,這也是我通常會選擇 multiprocessing 的緣由。

除了直接建立進程,還能夠用進程池(或者 multiprocessing.dummy 裏的進程池):

import time
import random
from multiprocessing import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     建立 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

線程池:

import time
import random
from multiprocessing.dummy import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     建立 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

這裏示例有個問題,pool 在 join 前須要 close 掉,不然就會拋出異常,不過 Python 之禪的做者 Tim Peters 給出解釋:

As to Pool.close(), you should call that when - and only when - you're never going to submit more work to the Pool instance. So Pool.close() is typically called when the parallelizable part of your main program is finished. Then the worker processes will terminate when all work already assigned has completed.

It's also excellent practice to call Pool.join() to wait for the worker processes to terminate. Among other reasons, there's often no good way to report exceptions in parallelized code (exceptions occur in a context only vaguely related to what your main program is doing), and Pool.join() provides a synchronization point that can report some exceptions that occurred in worker processes that you'd otherwise never see.

同步原語

在多進程編程中,由於進程間的資源隔離,不須要考慮內存的線程安全問題,而在多線程編程中便須要同步原語來保存線程安全,由於 Python 是一門簡單的語言,不少操做都是封裝的操做系統 API,所以支持的同步原語蠻全,但這裏只寫兩種常見的同步原語:鎖和信號量。

經過使用鎖能夠用來保護一段內存空間,而信號量能夠被多個線程共享。

threading 中能夠看到 Lock 鎖和 RLock 重用鎖兩種鎖,區別如名。這兩種鎖都只能被一個線程擁有,第一種鎖只能被得到一次,而重用鎖能夠被屢次得到,但也須要一樣次數的釋放才能真正的釋放。

當多個線程對同一塊內存空間同時進行修改的時候,常常遇到奇怪的問題:

import time
import random
from threading import Thread, Lock

count = 0


def do_task():
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

如上就是典型的非線程安全致使 count 沒有達到預期的效果。而經過鎖即可以控制某一段代碼,或者說某段內存空間的訪問:

import time
import random
from threading import Thread, Lock

count = 0
lock = Lock()


def do_task():
    lock.acquire()
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)
    lock.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

固然,上述例子很是暴力,直接強行把併發改成串行。

對於信號量常見於有限資源強佔的場景,能夠定義固定大小的信號量供多個線程獲取或者釋放,從而控制線程的任務執行,好比下面的例子,控制最多有 5 個任務在執行:

import time
import random
from threading import Thread, BoundedSemaphore

sep = BoundedSemaphore(5)


def do_task(task_name):
    sep.acquire()
    print("do Task: {}".format(task_name))
    time.sleep(random.randint(1, 10))
    sep.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task, args=("task_{}".format(i),)))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish.")

Queue 和 Pipe

由於多進程的內存隔離,不會存在內存競爭的問題。但同時,多個進程間的數據共享成爲了新的問題,而進程間通訊常見:隊列,管道,信號。

這裏只講解隊列和管道。

隊列常見於雙進程模型,通常用做生產者-消費者模式,由生產者進程向隊列中發佈任務,並由消費者從隊列首部拿出任務進行執行:

import time
from multiprocessing import Process, Queue


class Task1(Process):
    def __init__(self, queue):
        super(Task1, self).__init__()
        self.queue = queue

    def run(self):
        item = self.queue.get()
        print("get item: [{}]".format(item))


class Task2(Process):
    def __init__(self, queue):
        super(Task2, self).__init__()
        self.queue = queue

    def run(self):
        print("put item: [Hello]")
        time.sleep(1)
        self.queue.put("Hello")


if __name__ == "__main__":
    queue = Queue()
    t1 = Task1(queue)
    t2 = Task2(queue)
    t1.start()
    t2.start()
    t1.join()
    print("Finish.")

理論上每一個進程均可以向隊列裏的讀或者寫,能夠認爲隊列是半雙工路線。可是每每只有特定的讀進程(好比消費者)和寫進程(好比生產者),儘管這些進程只是開發者本身定義的。

而 Pipe 更像一個全工路線:

import time
from multiprocessing import Process, Pipe


class Task1(Process):
    def __init__(self, pipe):
        super(Task1, self).__init__()
        self.pipe = pipe

    def run(self):
        item = self.pipe.recv()
        print("Task1: recv item: [{}]".format(item))
        print("Task1: send item: [Hi]")
        self.pipe.send("Hi")


class Task2(Process):
    def __init__(self, pipe):
        super(Task2, self).__init__()
        self.pipe = pipe

    def run(self):
        print("Task2: send item: [Hello]")
        time.sleep(1)
        self.pipe.send("Hello")
        time.sleep(1)
        item = self.pipe.recv()
        print("Task2: recv item: [{}]".format(item))


if __name__ == "__main__":
    pipe = Pipe()
    t1 = Task1(pipe[0])
    t2 = Task2(pipe[1])
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finish.")

除了上面介紹的 threadingmultiprocessing 兩個庫外,還有一個好用的使人髮指的庫 concurrent.futures。和前面兩個庫不一樣,這個庫是更高等級的抽象,隱藏了不少底層的東西,但也所以很是好用。用官方的例子:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

該庫中自帶了進程池和線程池,能夠經過上下文管理器來管理,並且對於異步任務執行完後,結果的得到也很是簡單。再拿一個官方的多進程計算的例子做爲結束:

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

歡迎關注我的公衆號:CS實驗室

相關文章
相關標籤/搜索