Django異步任務線程池

當數據庫數據量很大時(百萬級),許多批量數據修改請求的響應會很是慢,一些不須要即時響應的任務能夠放到後臺的異步線程中完成,發起異步任務的請求就能夠當即響應python

選擇用線程池的緣由是:線程比進程更爲可控。不像子進程,子線程會在所屬進程結束時當即結束。線程可共享內存。mysql

請求任務異步處理的原理

使用python manage.py runserver模式啓動的Django應用只有一個進程,對於每一個請求,主線程會開啓一個子線程來處理請求。請求子線程向主線程申請一個新線程,而後把耗時的任務交給新線程,自身當即響應,這就是請求任務異步處理的原理。sql

可視化線程池

若是想要管理這批異步線程,知道他們是否在運行中,可使用線程池(ThreadPoolExecutor)。數據庫

線程池會先啓動若干數量的線程,並讓這些線程都處於睡眠狀態,當向線程池submit一個任務後,會喚醒線程池中的某一個睡眠線程,讓它來處理這個任務,當處理完這個任務,線程又處於睡眠狀態。django

submit任務後會返回一個期程(future),這個對象能夠查看線程池中執行此任務的線程是否仍在處理中json

所以能夠構建一個全局可視化線程池:異步

from concurrent.futures.thread import ThreadPoolExecutor


class ThreadPool(object):
    def __init__(self):
        # 線程池
        self.executor = ThreadPoolExecutor(20)
        # 用於存儲每一個項目批量任務的期程
        self.future_dict = {}

    # 檢查某個項目是否有正在運行的批量任務
    def is_project_thread_running(self, project_id):
        future = self.future_dict.get(project_id, None)
        if future and future.running():
            # 存在正在運行的批量任務
            return True
        return False

    # 展現全部的異步任務
    def check_future(self):
        data = {}
        for project_id, future in self.future_dict.items():
            data[project_id] = future.running()
        return data

    def __del__(self):
        self.executor.shutdown()

# 主線程中的全局線程池
# global_thread_pool的生命週期是Django主線程運行的生命週期
global_thread_pool = ThreadPool()

使用:ui

# 檢查異步任務
if global_thread_pool.is_project_thread_running(project_id):
    raise exceptions.ValidationError(detail='存在正在處理的批量任務,請稍後重試')

# 提交一個異步任務
future = global_thread_pool.executor.submit(self.batch_thread, project_id)
global_thread_pool.future_dict[project_id] = future

# 查看全部異步任務
@login_required
def check_future(request):
    data = global_thread_pool.check_future()
    return HttpResponse(status=status.HTTP_200_OK, content=json.dumps(data))

串行執行

使用線程鎖線程

在全局線程池中初始化線程鎖code

class ThreadPool(object):
    def __init__(self):
        self.executor = ThreadPoolExecutor(20)
        self.future_dict = {}
        self.lock = threading.Lock()

而後執行線程前須要獲取鎖並再執行結束後釋放鎖

def batch_thread(self):
    global_thread_pool.lock.acquire()
    try:
        ...
        global_thread_pool.lock.release()
    except Exception:
        trace_log = traceback.format_exc()
        logger.error('異步任務執行失敗:\n %s' % trace_log)
        global_thread_pool.lock.release()

須要捕捉異常預防子線程出錯而沒法釋放鎖的狀況

異步線程任務執行前先檢查數據庫鏈接是否可用,而後關掉不可用鏈接

因爲django的數據庫鏈接是保存到線程本地變量中的,經過ThreadPoolExecutor建立的線程會保存各自的數據庫鏈接。

當鏈接被保存的時間超過mysql鏈接的最大超時時間,鏈接失效,但不會被線程釋放。

以後再調起線程執行涉及到數據庫操做的異步任務時,會用到失效的數據庫鏈接,致使報錯「MySQL server has gone away」。

解決方案是在線程池的全部異步任務執行前先檢查數據庫鏈接是否可用,而後關掉不可用鏈接

def batch_thread(self):
    for conn in connections.all():
        conn.close_if_unusable_or_obsolete()
    ...
相關文章
相關標籤/搜索