5分鐘看懂系列:Python 線程池原理及實現

概述

傳統多線程方案會使用「即時建立, 即時銷燬」的策略。儘管與建立進程相比,建立線程的時間已經大大的縮短,可是若是提交給線程的任務是執行時間較短,並且執行次數極其頻繁,那麼服務器將處於不停的建立線程,銷燬線程的狀態。python

一個線程的運行時間能夠分爲3部分:線程的啓動時間、線程體的運行時間和線程的銷燬時間。在多線程處理的情景中,若是線程不能被重用,就意味着每次建立都須要通過啓動、銷燬和運行3個過程。這必然會增長系統相應的時間,下降了效率。web

使用線程池:因爲線程預先被建立並放入線程池中,同時處理完當前任務以後並不銷燬而是被安排處理下一個任務,所以可以避免屢次建立線程,從而節省線程建立和銷燬的開銷,能帶來更好的性能和系統穩定性。算法

線程池原理圖.png

線程池模型

這裏使用建立Thread()實例來實現,下面會再用繼承threading.Thread()的類來實現服務器

# 建立隊列實例, 用於存儲任務
queue = Queue()

# 定義須要線程池執行的任務
def do_job():
    while True:
        i = queue.get()
        time.sleep(1)
        print 'index %s, curent: %s' % (i, threading.current_thread())
        queue.task_done()

if __name__ == '__main__':
    # 建立包括3個線程的線程池
    for i in range(3):
        t = Thread(target=do_job)
        t.daemon=True # 設置線程daemon  主線程退出,daemon線程也會推出,即時正在運行
        t.start()

    # 模擬建立線程池3秒後塞進10個任務到隊列
    time.sleep(3)
    for i in range(10):
        queue.put(i)

    queue.join()複製代碼
  • daemon說明:
    若是某個子線程的daemon屬性爲False,主線程結束時會檢測該子線程是否結束,若是該子線程還在運行,則主線程會等待它完成後再退出;
    若是某個子線程的daemon屬性爲True,主線程運行結束時不對這個子線程進行檢查而直接退出,同時全部daemon值爲True的子線程將隨主線程一塊兒結束,而不管是否運行完成。
    daemon=True 說明線程是守護線程,守護線程外部無法觸發它的退出,因此主線程退出就直接讓子線程跟隨退出
  • queue.task_done() 說明:
    queue.join()的做用是讓主程序阻塞等待隊列完成,就結束退出,可是怎麼讓主程序知道隊列已經所有取出而且完成呢?queue.get() 只能讓主程序知道隊列取完了,但不表明隊列裏的任務都完成,因此程序須要調用queue.task_done() 告訴主程序,又一個任務完成了,直到所有任務完成,主程序退出

輸出結果網絡

index 1, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 0, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 2, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 4, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 3, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 5, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 6, curent: <Thread(Thread-1, started daemon 139652189157120)>
index 7, curent: <Thread(Thread-2, started daemon 139652180764416)>
index 8, curent: <Thread(Thread-3, started daemon 139652172371712)>
index 9, curent: <Thread(Thread-1, started daemon 139652189157120)>
finish複製代碼

能夠看到全部任務都是在這幾個線程中完成Thread-(1-3)多線程

線程池原理

線程池基本原理: 咱們把任務放進隊列中去,而後開N個線程,每一個線程都去隊列中取一個任務,執行完了以後告訴系統說我執行完了,而後接着去隊列中取下一個任務,直至隊列中全部任務取空,退出線程。併發

上面這個例子生成一個有3個線程的線程池,每一個線程都無限循環阻塞讀取Queue隊列的任務全部任務都只會讓這3個預生成的線程來處理。框架

具體工做描述以下:dom

  1. 建立Queue.Queue()實例,而後對它填充數據或任務
  2. 生成守護線程池,把線程設置成了daemon守護線程
  3. 每一個線程無限循環阻塞讀取queue隊列的項目item,並處理
  4. 每次完成一次工做後,使用queue.task_done()函數向任務已經完成的隊列發送一個信號
  5. 主線程設置queue.join()阻塞,直到任務隊列已經清空了,解除阻塞,向下執行

這個模式下有幾個注意的點:socket

  • 將線程池的線程設置成daemon守護進程,意味着主線程退出時,守護線程也會自動退出,若是使用默認
    daemon=False的話, 非daemon的線程會阻塞主線程的退出,因此即便queue隊列的任務已經完成
    線程池依然阻塞無限循環等待任務,使得主線程也不會退出。
  • 當主線程使用了queue.join()的時候,說明主線程會阻塞直到queue已是清空的,而主線程怎麼知道queue已是清空的呢?就是經過每次線程queue.get()後並處理任務後,發送queue.task_done()信號,queue的數據就會減1,直到queue的數據是空的,queue.join()解除阻塞,向下執行。
  • 這個模式主要是以隊列queue的任務來作主導的,作完任務就退出,因爲線程池是daemon的,因此主退出線程池全部線程都會退出。 有別於咱們平時可能以隊列主導thread.join()阻塞,這種線程完成以前阻塞主線程。看需求使用哪一個join():
    若是是想作完必定數量任務的隊列就結束,使用queue.join(),好比爬取指定數量的網頁
    若是是想線程作完任務就結束,使用thread.join()

示例:使用線程池寫web服務器

import socket
import threading
from threading import Thread
import threading
import sys
import time
import random
from Queue import Queue

host = ''
port = 8888
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(3)

class ThreadPoolManger():
    """線程池管理器"""
    def __init__(self, thread_num):
        # 初始化參數
        self.work_queue = Queue()
        self.thread_num = thread_num
        self.__init_threading_pool(self.thread_num)

    def __init_threading_pool(self, thread_num):
        # 初始化線程池,建立指定數量的線程池
        for i in range(thread_num):
            thread = ThreadManger(self.work_queue)
            thread.start()

    def add_job(self, func, *args):
        # 將任務放入隊列,等待線程池阻塞讀取,參數是被執行的函數和函數的參數
        self.work_queue.put((func, args))

class ThreadManger(Thread):
    """定義線程類,繼承threading.Thread"""
    def __init__(self, work_queue):
        Thread.__init__(self)
        self.work_queue = work_queue
        self.daemon = True

    def run(self):
        # 啓動線程
        while True:
            target, args = self.work_queue.get()
            target(*args)
            self.work_queue.task_done()

# 建立一個有4個線程的線程池
thread_pool = ThreadPoolManger(4)

# 處理http請求,這裏簡單返回200 hello world
def handle_request(conn_socket):
    recv_data = conn_socket.recv(1024)
    reply = 'HTTP/1.1 200 OK \r\n\r\n'
    reply += 'hello world'
    print 'thread %s is running ' % threading.current_thread().name
    conn_socket.send(reply)
    conn_socket.close()

# 循環等待接收客戶端請求
while True:
    # 阻塞等待請求
    conn_socket, addr = s.accept()
    # 一旦有請求了,把socket扔到咱們指定處理函數handle_request處理,等待線程池分配線程處理
    thread_pool.add_job(handle_request, *(conn_socket, ))

s.close()複製代碼
# 運行進程
[master][/data/web/advance_python/socket]$ python sock_s_threading_pool.py 

# 查看線程池情況
[master][/data/web/advance_python/socket]$ ps -eLf|grep sock_s_threading_pool
lisa+ 27488 23705 27488  0    5 23:22 pts/30   00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27489  0    5 23:22 pts/30   00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27490  0    5 23:22 pts/30   00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27491  0    5 23:22 pts/30   00:00:00 python sock_s_threading_pool.py
lisa+ 27488 23705 27492  0    5 23:22 pts/30   00:00:00 python sock_s_threading_pool.py

# 跟咱們預期同樣一共有5個線程,一個主線程,4個線程池線程複製代碼

這個線程池web服務器編寫框架包括下面幾個組成部分及步驟:

  • 定義線程池管理器ThreadPoolManger,用於建立並管理線程池,提供add_job()接口,給線程池加任務
  • 定義工做線程ThreadManger, 定義run()方法,負責無限循環工做隊列,並完成隊列任務
  • 定義socket監聽請求s.accept() 和處理請求 handle_requests() 任務。
  • 初始化一個4個線程的線程池,都阻塞等待這讀取隊列queue的任務
  • 當socket.accept()有請求,則把connsocket作爲參數,handlerequest方法,丟給線程池,等待線程池分配線程處理

GIL 對多線程的影響

由於Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執行前,必須先得到GIL鎖,而後,每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把全部線程的執行代碼都給上了鎖,因此,多線程在Python中只能交替執行,即便100個線程跑在100核CPU上,也只能用到1個核。

可是對於IO密集型的任務,多線程仍是起到很大效率提高,這是協同式多任務當一項任務好比網絡 I/O啓動,而在長的或不肯定的時間,沒有運行任何 Python 代碼的須要,一個線程便會讓出GIL,從而其餘線程能夠獲取 GIL 而運行 Python。這種禮貌行爲稱爲協同式多任務處理,它容許併發;多個線程同時等待不一樣事件。

兩個線程在同一時刻只能有一個執行 Python ,但一旦線程開始鏈接,它就會放棄 GIL ,這樣其餘線程就能夠運行。這意味着兩個線程能夠併發等待套接字鏈接,這是一件好事。在一樣的時間內它們能夠作更多的工做。

線程池要設置爲多少?

服務器CPU核數有限,可以同時併發的線程數有限,並非開得越多越好,以及線程切換是有開銷的,若是線程切換過於頻繁,反而會使性能下降

線程執行過程當中,計算時間分爲兩部分:

  • CPU計算,佔用CPU
  • 不須要CPU計算,不佔用CPU,等待IO返回,好比recv(), accept(), sleep()等操做,具體操做就是好比
    訪問cache、RPC調用下游service、訪問DB,等須要網絡調用的操做

那麼若是計算時間佔50%, 等待時間50%,那麼爲了利用率達到最高,能夠開2個線程:假如工做時間是2秒, CPU計算完1秒後,線程等待IO的時候須要1秒,此時CPU空閒了,這時就能夠切換到另一個線程,讓CPU工做1秒後,線程等待IO須要1秒,此時CPU又能夠切回去,第一個線程這時恰好完成了1秒的IO等待,可讓CPU繼續工做,就這樣循環的在兩個線程以前切換操做。

那麼若是計算時間佔20%, 等待時間80%,那麼爲了利用率達到最高,能夠開5個線程:能夠想象成完成任務須要5秒,CPU佔用1秒,等待時間4秒,CPU在線程等待時,能夠同時再激活4個線程,這樣就把CPU和IO等待時間,最大化的重疊起來

抽象一下,計算線程數設置的公式就是:N核服務器,經過執行業務的單線程分析出本地計算時間爲x,等待時間爲y,則工做線程數(線程池線程數)設置爲 N*(x+y)/x,能讓CPU的利用率最大化。因爲有GIL的影響,python只能使用到1個核,因此這裏設置N=1

關於我

若是文章對你有收穫,能夠收藏轉發,這會給我一個大大鼓勵喲!另外能夠關注我公衆號【碼農富哥】 (coder2025),我會持續輸出原創的算法,計算機基礎文章!

相關文章
相關標籤/搜索