Django鏈接池試驗

前置

接口執行慢,須要優化,通過cProfile分析,時間大部分耗在數據庫鏈接查詢上,故去深究了下django關於db鏈接的狀況,發現django是不支持數據庫鏈接池的,遂查詢django關於爲啥不支持鏈接池的事情,以及試用了下目前開源的一些鏈接池,作此記錄。html

這篇主要解決個人如下疑問:python

  1. web請求過來的流程?
  2. wsgi server 和 wsgi application如何交互?
  3. django什麼時候創建db鏈接的?
  4. django什麼時候關閉db鏈接的?
  5. django長鏈接是怎麼回事?
  6. django爲什麼沒有鏈接池?
  7. django如何實現鏈接池?

工具

cProfile

拿它主要看耗時在哪裏,不作無用功。mysql

cProfile是Python自帶的性能分析的內置模塊,使用起來很方便,一段話就看的明白git

import cProfile
import re
cProfile.run('re.compile("foo|bar")')

輸出以下github

197 function calls (192 primitive calls) in 0.002 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1    0.000    0.000    0.001    0.001 <string>:1(<module>)
     1    0.000    0.000    0.001    0.001 re.py:212(compile)
     1    0.000    0.000    0.001    0.001 re.py:268(_compile)
     1    0.000    0.000    0.000    0.000 sre_compile.py:172(_compile_charset)
     1    0.000    0.000    0.000    0.000 sre_compile.py:201(_optimize_charset)
     4    0.000    0.000    0.000    0.000 sre_compile.py:25(_identityfunction)
   3/1    0.000    0.000    0.000    0.000 sre_compile.py:33(_compile)

解釋:
ncall: 表示函數的調用次數,若是這一列有兩個數值,表示有遞歸調用,第一個是總調用次數,第二個是原生調用次數
tottime: 函數調用時間(不包括調用其餘函數的時間)
percall: (第一個)函數運行一次的平均時間,tottime/ncalls
cumtime: 函數調用時間(包括內部調用其餘函數的時間)
percall: (第二個)函數運行一次的平均時間,cumtime/ncalls
filename:lineno(function): 很直觀

也可指定輸出的文件web

import cProfile
import re
cProfile.run('re.compile("foo|bar")', 'result')

用pstats模塊查看結果,能夠按指定的字段進行排序,會使結果更易於分析sql

import pstats
p=pstats.Stats('result')
p.sort_stats('time').print_stats()

通常只關心時間,經常使用排序字段 tottime cumtime

除了在程序中導入cProflie外,還能夠直接經過命令行執行腳本的形式查看數據庫

python -m cProfile [-o output_file] [-s sort_order] myscript.py

python cProfile指令接受-o參數後,-s參數無效。-s參數僅在沒有-o參數存在下才會生效,即直接輸出到屏幕。django

Django

講完了工具,下面能夠考慮django數據庫鏈接的事情了,以前一直覺得每次objects.filter的時候就會開啓一個db鏈接,查詢完了以後會關閉掉,經過看文檔以及實驗發現,其實django是在一個request請求到來時,若是view裏有數據庫動做,那麼開啓鏈接,以後會一直複用,直到request結束才關閉鏈接,這個在django文檔裏有描述後端

持久鏈接¶
持久鏈接避免了在每一個請求中從新創建與數據庫的鏈接的開銷。它們由CONN_MAX_AGE定義鏈接的最大生存期的參數控制 。能夠爲每一個數據庫獨立設置。默認值爲0,保留了在每一個請求結束時關閉數據庫鏈接的歷史行爲。要啓用持久鏈接,請將其設置CONN_MAX_AGE爲秒的正整數。對於無限的持久鏈接,請將其設置爲None。

這個參數的原理就是在每次建立完數據庫鏈接以後,把鏈接放到一個Theard.local的實例中。在request請求開始結束的時候,打算關閉鏈接時會判斷是否超過CONN_MAX_AGE設置這個有效期。超過則關閉。每次進行數據庫請求的時候其實只是判斷local中有沒有已存在的鏈接,有則複用。

基於上述緣由,Django中對於CONN_MAX_AGE的使用是有些限制的,使用不當,會拔苗助長。由於保存的鏈接是基於線程局部變量的,所以若是你部署方式採用多線程,必需要注意保證你的最大線程數不會多餘數據庫能支持的最大鏈接數(一個線程一個鏈接)。另外,若是使用開發模式運行程序(直接runserver的方式),建議不要設置CONN_MAX_AGE,由於這種狀況下,每次請求都會建立一個Thread。同時若是你設置了CONN_MAX_AGE,將會致使你建立大量的不可複用的持久的鏈接。

鏈接管理¶
Django首次進行數據庫查詢時會打開與數據庫的鏈接。它使該鏈接保持打開狀態,並在後續請求中重用它。Django一旦超過了定義的最大CONN_MAX_AGE使用期限或再也不可用時,便關閉鏈接 。

詳細來講,Django會在須要數據庫且還沒有創建數據庫時自動打開與數據庫的鏈接-要麼是由於這是第一個鏈接,要麼是由於上一個鏈接已關閉。

在每一個請求的開始,若是Django已達到最大使用期限,則會關閉該鏈接。若是您的數據庫在一段時間後終止了空閒鏈接,則應將其設置CONN_MAX_AGE爲較低的值,以便Django不會嘗試使用已由數據庫服務器終止的鏈接。(此問題可能隻影響流量很是小的站點。)

在每一個請求結束時,若是Django已達到其最大使用期限或處於不可恢復的錯誤狀態,它將關閉該鏈接。若是在處理請求時發生任何數據庫錯誤,則Django會檢查鏈接是否仍然有效,若是沒有,則將其關閉。所以,數據庫錯誤最多影響一個請求。若是鏈接變得不可用,則下一個請求將得到新的鏈接。

注意:

因爲每一個線程都維護本身的鏈接,所以你的數據庫必須至少支持與工做線程同樣多的併發鏈接。

有時,大多數views都不會訪問數據庫,例如,由於它是外部系統的數據庫,或者歸功於緩存。 在這種狀況下,應該將CONN_MAX_AGE設置爲較小的值甚至0,由於維護不太可能重用的鏈接沒有意義。這將有助於將與數據庫的併發鏈接數量保持在較小的值。

開發模式的Server爲它處理的每一個請求建立一個新線程,無視長鏈接的做用。在開發過程當中不須要啓用長鏈接。

當Django創建與數據庫的鏈接時,它會根據所使用的後端設置恰當的參數。 若是啓用長鏈接,則不會再對每一個請求重複設置。 若是修改鏈接的隔離級別或時區等參數,則應在每一個請求結束時恢復Django的默認值,在每一個請求開始時強制使用適當的值,或者禁用長鏈接。

爲啥django不支持鏈接池

能夠參照好久遠的一個帖子https://groups.google.com/forum/#!topic/django-developers/NwY9CHM4xpU

大體意思就是:

  1. django不須要引入這個複雜度
  2. 好多第三方已經作的很好了
  3. MySQL的鏈接很是輕量和高效,大量的Web應用都沒有使用鏈接池
  4. django一個request內db鏈接是能夠複用的
  5. django1.6版本後提供了長鏈接的支持
  6. 。。。。

那爲什麼還要在Django中啓用鏈接池

依據上面描述,

  1. Django服務每一個線程都維護本身的鏈接,有多少線程就會就有多少鏈接;若是採用分佈式部署,線程數較多,則會創建較多的鏈接。不只很是消耗資源,還可能出現MySQL鏈接數不夠用的狀況。
  2. 從速度上來說,有了鏈接池,下次請求過來的直接拿到鏈接,至少節省了本次request中的db鏈接時間。

鏈接池方案

目前測試的是下面2個方案,都是基於SQLAlchemy的鏈接池作的。

1.djorm-ext-pool

2.django-db-connection-pool

使用方式都很是簡單, 參照github上安裝便可

注: djorm-ext-pool在python3,django3下跑還有bug,以下方式修正

djorm-pool/__init__.py
...
def patch_mysql():
    class hashabledict(dict):
        def __hash__(self):
            # return hash(tuple(sorted(self.items())))  註釋掉這個,換成下面的
            return hash(frozenset(self))
...

 

django裏的wsgi

django框架實現了wsgi接口(關於wsgi見捅開web應用的那層紗),通常都是和實現了wsgi協議的服務器對接,這裏就想了解wsgi在django裏如何實現的。

按wsgi要求,應用程序要提供一個可調用對象,來接收2個參數,簡單示例以下:

def application(environ, start_response):
    status = '200 OK'
    response_headers = [('Content-Type', 'text/plain')]
    start_response(status, response_headers)
    return ['Hello world']

好,咱們看Django的實現方式。

好比gunicorn的啓動

gunicorn dj_db_pool.wsgi:application -b 0.0.0.0:80 --workers=5

gunicorn實現了server端的功能,監控HTTP請求,提供入口,調用應用程序端的application。

此處的application爲dj_db_pool.wsgi:application,即:

# dj_db_pool/wsgi.py

import os

from django.core.wsgi import get_wsgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'dj_db_pool.settings')

application = get_wsgi_application()

get_wsgi_application爲:

# django/core/wsgi.py

import django
from django.core.handlers.wsgi import WSGIHandler


def get_wsgi_application():
    """
    The public interface to Django's WSGI support. Return a WSGI callable.

    Avoids making django.core.handlers.WSGIHandler a public API, in case the
    internal WSGI implementation changes or moves in the future.
    """
    django.setup(set_prefix=False)  # 初始化django環境
    return WSGIHandler()  # 返回server端可調用的應用程序對象

WSGIHandler爲:

# django/core/handlers/wsgi.py

class WSGIHandler(base.BaseHandler):
    request_class = WSGIRequest

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.load_middleware()  # 加載django中間件

    def __call__(self, environ, start_response):  # 此做爲可調用對象傳給wsgi server端,每次有請求進來這裏都會執行
        set_script_prefix(get_script_name(environ))
        signals.request_started.send(sender=self.__class__, environ=environ)  # 觸發了request_started的信號
        request = self.request_class(environ)  # 調用WSGIRequest實例化請求
        response = self.get_response(request)  # 處理response

        response._handler_class = self.__class__

        status = '%d %s' % (response.status_code, response.reason_phrase)
        response_headers = [
            *response.items(),
            *(('Set-Cookie', c.output(header='')) for c in response.cookies.values()),
        ]
        start_response(status, response_headers)  # 此處wsgi協議要求的第二個可調函數,把HTTP狀態和頭部信息返回給server
        if getattr(response, 'file_to_stream', None) is not None and environ.get('wsgi.file_wrapper'):
            # If `wsgi.file_wrapper` is used the WSGI server does not call
            # .close on the response, but on the file wrapper. Patch it to use
            # response.close instead which takes care of closing all files.
            response.file_to_stream.close = response.close
            response = environ['wsgi.file_wrapper'](response.file_to_stream, response.block_size)
        return response  # 返回處理結果

http請求到達django內部的過程大概如上所述。

django request裏的db鏈接

好,咱們看哪裏對db作了處理?

發現前面在WSGIHandler裏有對request_started信號的觸發:

signals.request_started.send(sender=self.__class__, environ=environ)  # 觸發了request_started的信號

request_started信號

那麼信號的處理在這裏

# django/db/__init__.py

......

connections = ConnectionHandler()

router = ConnectionRouter()

......


# For backwards compatibility. Prefer connections['default'] instead.
connection = DefaultConnectionProxy()


# Register an event to reset saved queries when a Django request is started.
def reset_queries(**kwargs):
    for conn in connections.all():
        conn.queries_log.clear()


signals.request_started.connect(reset_queries)  # 這裏被處理,調用了reset_queries


# Register an event to reset transaction state and close connections past
# their lifetime.
def close_old_connections(**kwargs):
    for conn in connections.all():  #1. connections.all()會給出一個列表,裏面的元素爲DatabaseWrapper類
        conn.close_if_unusable_or_obsolete()  #2. 關閉不可用和超時的db鏈接


signals.request_started.connect(close_old_connections)   # 請求開始時request_started被調用,調用了close_old_connections,初始化各db實例化類,如開啓長鏈接,那麼檢測時間關閉db鏈接,self.connection爲空則不作任務操做
signals.request_finished.connect(close_old_connections)  # 請求完成之後request_finished被調用,關閉了不可用鏈接

能夠看到這裏request_started觸發調用了close_old_connections,而close_old_connections裏調用了connections(即ConnetionHandler實例)的all方法,其實它返回的是一個列表,裏面的元素爲DatabaseWrapper類,其實就是循環setting配置的DATABASES,實例化各ENGINE指定的db後端。

# django/db/utils.py
# 
# for conn in connections.all() -->
#                 |
#                 |
#                 v
class ConnectionHandler:
    ......
    def all(self):  # 1  connections.all()調用這裏
        return [self[alias] for alias in self]  #2 self可迭代,alias即爲self.databases。而self[alias]即調用的__getitem__的實現

    def __iter__(self):  # 2
        return iter(self.databases)  # 3  self.databases即爲setting配置文件裏的DATABASES

    @cached_property
    def databases(self):  # 3
        if self._databases is None:
            self._databases = settings.DATABASES  # 4  here
        """
        忽略細節
        """
        return self._databases

上面的self[alias]就是各db實現的後端,接着往下看

# django/db/utils.py

class ConnectionHandler:
    ......
    def __getitem__(self, alias):  # 1.1
        #1.2 關鍵點,若是local內有的話直接返回,調用方式 connections['default']
        if hasattr(self._connections, alias):
            return getattr(self._connections, alias)

        self.ensure_defaults(alias)
        self.prepare_test_settings(alias)
        db = self.databases[alias]  #1.3 獲取配置文件裏獲取db信息
        backend = load_backend(db['ENGINE'])  #1.4 加載對應數據庫ENGINE,如:django.db.backends.mysql
        conn = backend.DatabaseWrapper(db, alias)  #1.5 上面load的base.py文件裏都有DatabaseWrapper類,這裏實例化這個類。它主要負責對應db後端的鏈接和關閉
        setattr(self._connections, alias, conn)  #1.6 鏈接放到local裏,以接下來的複用
        return conn

    ...... 
    def close_all(self):  # 關閉數據庫鏈接
        for alias in self:
            try:
                connection = getattr(self._connections, alias)
            except AttributeError:
                continue
            connection.close()

......

# 101行處
def load_backend(backend_name):  #1.4
    # This backend was renamed in Django 1.9.
    if backend_name == 'django.db.backends.postgresql_psycopg2':
        backend_name = 'django.db.backends.postgresql'

    try:
        return import_module('%s.base' % backend_name)  #1.4.1 加載對應的數據庫處理類,實際就是django.db.backends.mysql.base類
    except ImportError as e_user:

上面能夠看到connections.all()裏就是DatabaseWrapper類,每一個都繼承於BaseDatabaseWrapper,提供了基本的connect函數用於數據庫鏈接,且賦值於屬性self.connection(self.connection = self.get_new_connection(conn_params) )

# django/db/backends/base/base.py

class BaseDatabaseWrapper:
    ......
    @async_unsafe
    def connect(self):
        """Connect to the database. Assume that the connection is closed."""
        # Check for invalid configurations.
        self.check_settings()
        # In case the previous connection was closed while in an atomic block
        self.in_atomic_block = False
        self.savepoint_ids = []
        self.needs_rollback = False
        # Reset parameters defining when to close the connection
        max_age = self.settings_dict['CONN_MAX_AGE']  # 默認值0
        self.close_at = None if max_age is None else time.monotonic() + max_age  # 也就是這裏是當前時間
        self.closed_in_transaction = False
        self.errors_occurred = False
        # Establish the connection
        conn_params = self.get_connection_params()
        self.connection = self.get_new_connection(conn_params)  # 調用各子類實現的特定後端方法鏈接數據庫
        self.set_autocommit(self.settings_dict['AUTOCOMMIT'])
        self.init_connection_state()
        connection_created.send(sender=self.__class__, connection=self)

        self.run_on_commit = []
    ......
    @async_unsafe
    def ensure_connection(self):  # 主要調這裏創建鏈接
        """Guarantee that a connection to the database is established."""
        if self.connection is None:
            with self.wrap_database_errors:
                self.connect()
    ......
    def _close(self):  # 關閉db鏈接
        if self.connection is not None:
            with self.wrap_database_errors:
                return self.connection.close()
    ......
    @async_unsafe
    def close(self):
        """Close the connection to the database."""
        self.validate_thread_sharing()
        self.run_on_commit = []

        # Don't call validate_no_atomic_block() to avoid making it difficult
        # to get rid of a connection in an invalid state. The next connect()
        # will reset the transaction state anyway.
        if self.closed_in_transaction or self.connection is None:
            return
        try:
            self._close()
        finally:
            if self.in_atomic_block:
                self.closed_in_transaction = True
                self.needs_rollback = True
            else:
                self.connection = None

對於mysql後端來講,self.get_new_connection就是經過MySQLdb新建db鏈接

# django/db/backends/mysql/base.py

class DatabaseWrapper(BaseDatabaseWrapper):
    ......
    def get_new_connection(self, conn_params):
        return Database.connect(**conn_params)  #經過MySQLdb新建db鏈接

# 而Database就是
try:
    import MySQLdb as Database
except ImportError as err:
    raise ImproperlyConfigured(
        'Error loading MySQLdb module.\n'
        'Did you install mysqlclient?'
    ) from err

到這裏,一個request的過程基本就完了,在這裏並無觸發db的鏈接,只是處理了DatabaseWrapper類的實例化,關閉超時的長鏈接等。

DB操做

具體的db鏈接建立是在第一次執行orm操做的時候,好比在for row in xxx.objects.filter()時

# django/db/models/query.py
class ModelIterable(BaseIterable):
    """Iterable that yields a model instance for each row."""

    def __iter__(self):
        queryset = self.queryset
        db = queryset.db
        compiler = queryset.query.get_compiler(using=db)  # 獲取具體sql編譯器
        # Execute the query. This will also fill compiler.select, klass_info,
        # and annotations.
        results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size) #執行sql

compiler.execute_sql見下

# django/db/models/query.py

class SQLCompiler:
    ......
    def execute_sql(self, result_type=MULTI, chunked_fetch=False, chunk_size=GET_ITERATOR_CHUNK_SIZE):
        result_type = result_type or NO_RESULTS
        try:
            sql, params = self.as_sql()
            if not sql:
                raise EmptyResultSet
        except EmptyResultSet:
            if result_type == MULTI:
                return iter([])
            else:
                return
        if chunked_fetch:
            cursor = self.connection.chunked_cursor()
        else:
            cursor = self.connection.cursor()  # 獲取數據庫鏈接
        try:
            cursor.execute(sql, params)  # 執行db操做
        except Exception:
            # Might fail for server-side cursors (e.g. connection closed)
            cursor.close()
            raise

        """
        暫時忽略,主要是result返回的處理
        """
        return result

關鍵點cursor = self.connection.cursor() 處觸發數據庫鏈接,並獲取對應的cursor,見代碼部分

# django/db/backends/base/base.py

class BaseDatabaseWrapper:
    ......
    @async_unsafe
    def cursor(self):  # 上面的入口
        """Create a cursor, opening a connection if necessary."""
        return self._cursor()  #1 建立一個cursor,必要時打開數據庫鏈接

    def _cursor(self, name=None):  #1
        self.ensure_connection()  #2 無數據庫鏈接時會新建db鏈接
        with self.wrap_database_errors:
            return self._prepare_cursor(self.create_cursor(name)) #3 

    @async_unsafe
    def ensure_connection(self):  #2
        """Guarantee that a connection to the database is established."""
        if self.connection is None:  # 無數據庫鏈接時會新建
            with self.wrap_database_errors:
                self.connect()  #2.1 新建db鏈接
    # self.connect()參照上面self.connection = self.get_new_connection(conn_params)處

    def _prepare_cursor(self, cursor):  #3 此處爲MySQLdb的cursor
        self.validate_thread_sharing()
        if self.queries_logged:
            wrapped_cursor = self.make_debug_cursor(cursor)
        else:
            wrapped_cursor = self.make_cursor(cursor)  #3.1 調用mysql的
        return wrapped_cursor

    def make_cursor(self, cursor):  #3.1
        """Create a cursor without debug logging."""
        return utils.CursorWrapper(cursor, self)  #3.2 調用utils包裏的CursorWrapper類包裝

上面self._prepare_cursor(self.create_cursor(name)) 黑體部分回調mysql對應的DatabaseWrapper類裏的create_cursor,返回對應的mysql的CursorWrapper,這個類裏有對應的execute

# django/db/backends/mysql/base.py

class DatabaseWrapper(BaseDatabaseWrapper):
    ......
    @async_unsafe
    def create_cursor(self, name=None):
        cursor = self.connection.cursor()  # 返回MySQLdb鏈接的cursor
        return CursorWrapper(cursor)  # 對cursor進行包裝,方便循環之類的

接着看self._prepare_cursor(self.create_cursor(name)),self._prepare_cursor部分代碼

# django/db/backends/base/base.py

class BaseDatabaseWrapper:
    ......
    def _prepare_cursor(self, cursor):  #3 此處爲MySQLdb的cursor
        self.validate_thread_sharing()
        if self.queries_logged:
            wrapped_cursor = self.make_debug_cursor(cursor)
        else:
            wrapped_cursor = self.make_cursor(cursor)  #3.1
        return wrapped_cursor

    def make_cursor(self, cursor):  #3.1
        """Create a cursor without debug logging."""
        return utils.CursorWrapper(cursor, self)  #3.2 調用utils包裏的CursorWrapper類包裝

utils裏的CursorWrapper類比較重要的兩個方法

# django/db/backends/utils.py 
class CursorWrapper:
    def __init__(self, cursor, db):
        self.cursor = cursor
        self.db = db
    ......
    def execute(self, sql, params=None):  # 上面query.py裏的cursor.execute(sql, params)調用部分
        return self._execute_with_wrappers(sql, params, many=False, executor=self._execute)  # self._execute的定義見下,其實就是前面的mysql對應的CursorWrapper

    def _execute_with_wrappers(self, sql, params, many, executor):
        context = {'connection': self.db, 'cursor': self}
        for wrapper in reversed(self.db.execute_wrappers):
            executor = functools.partial(wrapper, executor)
        return executor(sql, params, many, context)  # 這裏執行的execute實際就是mysql/base.py裏的CursorWrapper類裏的execute方法

    def _execute(self, sql, params, *ignored_wrapper_args):
        self.db.validate_no_broken_transaction()
        with self.db.wrap_database_errors:
            if params is None:
                # params default might be backend specific.
                return self.cursor.execute(sql)
            else:
                return self.cursor.execute(sql, params)

至此,觸發db鏈接的過程也分析結束了

request_finished信號

那麼,關閉db鏈接是在什麼地方?

咱們發現前面的django/db/__init__.py文件裏不光request_started的信號在這裏處理,request_finished的信號也在這裏處理。

那麼request_finished的信號是在哪裏觸發的?見下

# django/http/response.py
......
class HttpResponseBase:

    ......
    # The WSGI server must call this method upon completion of the request.
    # See http://blog.dscpl.com.au/2012/10/obligations-for-calling-close-on.html
    def close(self):  # 此處不是django裏代碼調用,而是由wsgi server去調用,這裏注意
        for closer in self._resource_closers:
            try:
                closer()
            except Exception:
                pass
        # Free resources that were still referenced.
        self._resource_closers.clear()
        self.closed = True
        signals.request_finished.send(sender=self._handler_class)  # 請求結束時會統一調用close,這裏觸發了request_finished的信號

注意:HttpResponseBase裏的close是由wsgi server去調用的,具體分析見此處

close_if_unusable_or_obsolete

好,知道了請求在哪裏結束,那麼咱們回頭來看下close_if_unusable_or_obsolete這個方法裏都幹了啥

# django/db/backends/base/base.py

class BaseDatabaseWrapper:
    ......
    @async_unsafe
    def connect(self):
        ......
        max_age = self.settings_dict['CONN_MAX_AGE']  # 默認是0
        self.close_at = None if max_age is None else time.monotonic() + max_age  #1 那close_at這裏就是鏈接開始的時間
        ......

    ......
    def close_if_unusable_or_obsolete(self):
        # 有鏈接才處理,對於request_started信號觸發的調用,通常狀況下self.connection都是None,這裏通常都忽略掉
        if self.connection is not None:
            if self.get_autocommit() != self.settings_dict['AUTOCOMMIT']:
                self.close()
                return
  
            if self.errors_occurred:
                if self.is_usable():
                    self.errors_occurred = False
                else:
                    self.close()
                    return

            # 請求結束時db鏈接關閉就是在這裏作的,看上面1註釋,close_at有值,max_age默認0的狀況下time.monotonic()確定大於close_at,因此請求結束db鏈接是關閉的。
            if self.close_at is not None and time.monotonic() >= self.close_at:
                self.close()
                return

上面特別注意close_at,以前就是看了半天都覺得不是在這裏關閉的,其實max_age默認是0,不配置默認也是0,坑爹。

因此 if self.close_at is not None and time.monotonic() >= self.close_at,這個地方就確定是True了,因此默認狀況下請求結束db鏈接是關閉的。

至此,咱們也分析完了db什麼時候關閉。

試驗鏈接池

咱們拿django-db-connection-pool來測試。

安裝

pip install django-db-connection-pool

配置

'default': {
    ......
    'ENGINE': 'dj_db_conn_pool.backends.mysql',
    ......
    'POOL_OPTIONS': {
        'POOL_SIZE': 10,  # 池大小
        'MAX_OVERFLOW': 0  # 池滿了以後容許溢出的大小,最大鏈接數就是POOL_SIZE+MAX_OVERFLOW
    }
}

測試

隨便查詢一個表用cProfile看查看時間

爲了測試效果pool_size我配置1,且gunicorn啓動時我只啓動了一個出來進程

gunicorn dj_db_pool.wsgi:application -b 0.0.0.0:80 --workers=1
這樣,第一次請求應該是須要鏈接db的,第二次應該是不須要的。

可見,鏈接池起效果了,👍

原理

原理不復雜,主要就是覆蓋了mysql/base.py的DatabaseWrapper裏的get_new_connection和close方法,用sqlalchemy裏的pool來實現鏈接池。

class PooledDatabaseWrapperMixin(object):
    def get_new_connection(self, conn_params):
        """
        覆蓋 Django 的 get_new_connection 方法
        在 Django 調用此方法時,檢查 pool_container 中是否有 self.alias 的鏈接池
        若是沒有,則初始化 self.alias 的鏈接池,而後從池中取出一個鏈接
        若是有,則直接從池中取出一個鏈接返回
        :return:
        """
        with pool_container.lock:
            # 獲取鎖後,判斷當前數據庫(self.alias)的池是否存在
            # 不存在,開始初始化
            if not pool_container.has(self.alias):
                # 複製一份默認參數給當前數據庫
                pool_params = deepcopy(pool_container.pool_default_params)

                # 開始解析、組裝當前數據庫的鏈接配置
                pool_setting = {
                    # 把 POOL_OPTIONS 內的參數名轉換爲小寫
                    # 與 QueuePool 的參數對應
                    key.lower(): value
                    # 取每一個 POOL_OPTIONS 內參數
                    for key, value in
                    # self.settings_dict 由 Django 提供,是 self.alias 的鏈接參數
                    self.settings_dict.get('POOL_OPTIONS', {}).items()
                    # 此處限制 POOL_OPTIONS 內的參數:
                    # POOL_OPTIONS 內的參數名必須是大寫的
                    # 並且其小寫形式必須在 pool_default_params 內
                    if key == key.upper() and key.lower() in pool_container.pool_default_params
                }

                # 如今 pool_setting 已經組裝完成
                # 覆蓋 pool_params 的參數(以輸入用戶的配置)
                pool_params.update(**pool_setting)

                # 如今參數已經具有
                # 建立 self.alias 的鏈接池實例
                alias_pool = pool.QueuePool(
                    # QueuePool 的 creator 參數
                    # 在獲取一個新的數據庫鏈接時,SQLAlchemy 會調用這個匿名函數
                    lambda: super(PooledDatabaseWrapperMixin, self).get_new_connection(conn_params),
                    # 數據庫方言
                    # 用於 SQLAlchemy 維護該鏈接池
                    dialect=self.SQLAlchemyDialect(dbapi=self.Database),
                    # 一些固定的參數
                    pre_ping=True, echo=False, timeout=None, **pool_params
                )
                logger.debug(_("%s's pool has been created, parameter: %s"), self.alias, pool_params)

                # 數據庫鏈接池已建立
                # 放到 pool_container,以便重用
                pool_container.put(self.alias, alias_pool)

        # 調用 SQLAlchemy 從鏈接池內取一個鏈接
        conn = pool_container.get(self.alias).connect()
        logger.debug(_("got %s's connection from its pool"), self.alias)
        return conn

    def close(self, *args, **kwargs):
        logger.debug(_("release %s's connection to its pool"), self.alias)
        return super(PooledDatabaseWrapperMixin, self).close(*args, **kwargs)

gunicorn多worker表現

上面啓動了一個worker,多個worker如何表現

gunicorn dj_db_pool.wsgi:application -b 0.0.0.0:80 --workers=3

--worker是多進程方式,上面起了3個進程

還能夠--threads=2,每一個進程再啓動2個線程

會看到請求第一次到一個worker上仍是有新的db鏈接創建,以後就走池裏鏈接了

因此起鏈接池時,要算好起的進程數(or線程數)以及POOL_SIZE的大小,以防止起過多DB鏈接。

結語

如今數據庫鏈接已經很快了,我測試過程當中,基本在50-100ms之間,對於須要精準耗時的,這個時間優化仍是能夠的,對於正常1,2秒返回的,可能優化效果不大。

相關文章
相關標籤/搜索