深刻研究sqlalchemy鏈接池

簡介:

相對於最新的MySQL5.6,MariaDB在性能、功能、管理、NoSQL擴展方面包含了更豐富的特性。好比微秒的支持、線程池、子查詢優化、組提交、進度報告等。html

本文就主要探索MariaDB當中鏈接池的一些特性,配置。來配合咱們的sqlalchemy。python

一:原由

原本是不會寫這個東西的,可是,寫好了python--flask程序,使用sqlalchemy+mariadb,部署之後老是出問題,500錯誤之類的。mysql

使用默認鏈接參數linux

engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)sql

錯誤提示是:docker

sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)數據庫

http://sqlalche.me/e/e3q8:flask

OperationalError:

Exception raised for errors that are related to the database’s operation andnot necessarily under the control of the programmer, e.g. an unexpecteddisconnect occurs, the data source name is not found, a transaction could notbe processed, a memory allocation error occurred during processing, etc.windows

This error is aDBAPI Errorand originates fromthe database driver (DBAPI), not SQLAlchemy itself.api

TheOperationalErroris the most common (but not the only) error class usedby drivers in the context of the database connection being dropped, or notbeing able to connect to the database. For tips on how to deal with this, seethe sectionDealing with Disconnects.

意思是沒有正確斷開和數據庫的鏈接。

二:處理斷開

http://docs.sqlalchemy.org/en/latest/core/pooling.html#pool-disconnects

官方給了三種方案來解決這個問題:

1.悲觀處理

engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)

pool_pre_ping=True

表示每次鏈接從池中檢查,若是有錯誤,監測爲斷開的狀態,鏈接將被當即回收。

2.自定義悲觀的ping

from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select

some_engine = create_engine(...)

@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
    if branch:
        # "branch" refers to a sub-connection of a connection,
        # we don't want to bother pinging on these.
        return

    # turn off "close with result".  This flag is only used with
    # "connectionless" execution, otherwise will be False in any case
    save_should_close_with_result = connection.should_close_with_result
    connection.should_close_with_result = False

    try:
        # run a SELECT 1.   use a core select() so that
        # the SELECT of a scalar value without a table is
        # appropriately formatted for the backend
        connection.scalar(select([1]))
    except exc.DBAPIError as err:
        # catch SQLAlchemy's DBAPIError, which is a wrapper
        # for the DBAPI's exception.  It includes a .connection_invalidated
        # attribute which specifies if this connection is a "disconnect"
        # condition, which is based on inspection of the original exception
        # by the dialect in use.
        if err.connection_invalidated:
            # run the same SELECT again - the connection will re-validate
            # itself and establish a new connection.  The disconnect detection
            # here also causes the whole connection pool to be invalidated
            # so that all stale connections are discarded.
            connection.scalar(select([1]))
        else:
            raise
    finally:
        # restore "close with result"
        connection.should_close_with_result = save_should_close_with_result

說實話,沒怎麼看明白。

像是try一個select 語句,若是沒問題就關閉。

 

3.樂觀處理

from sqlalchemy import create_engine, exc
e = create_engine(...)
c = e.connect()

try:
    # suppose the database has been restarted.
    c.execute("SELECT * FROM table")
    c.close()
except exc.DBAPIError, e:
    # an exception is raised, Connection is invalidated.
    if e.connection_invalidated:
        print("Connection was invalidated!")

# after the invalidate event, a new connection
# starts with a new Pool
c = e.connect()
c.execute("SELECT * FROM table")

這個看懂了,try一個select語句,若是無效,就返回Connection was invalidated!,而後開一個新的鏈接,再去執行select。這個應該寫個裝飾器,放在每一個查詢前面。

4.使用鏈接池回收

from sqlalchemy import create_engine
e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)

這種方式就比較簡單了,在鏈接參數中寫上鍊接超時時間便可。

5.這是本身看文檔找到的方法

from sqlalchemy.pool import QueuePool,NullPool,AssertionPool,StaticPool,SingletonThreadPool,Pool

在sqlalchemy.pool下有已經配置好的鏈接池,直接使用這些鏈接池也應該能夠。

三:測試

docker run  --restart=always --privileged --name My_mariadb_01 -p 3301:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_02 -p 3302:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_03 -p 3303:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_04 -p 3304:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_05 -p 3305:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13

爲避免因數據庫交叉鏈接,首先開啓5個MARIADB

Flask_Plan_01   8801       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)
Flask_Plan_02   8802       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_pre_ping=True)
Flask_Plan_03   8803       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=QueuePool)
Flask_Plan_04   8804       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=NullPool)
Flask_Plan_05   8805       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_recycle=3600)

用這5種鏈接參數進行鏈接測試。

若是你願意,也能夠繼續開,QueuePool,NullPool,AssertionPool,StaticPool,SingletonThreadPool,Pool,把這幾種都測試一下。

 

8801 8805 均會不一樣程度的出現500錯誤,8801頻率還高點。

sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)
sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)


 

Internal Server Error

The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.

等會兒看看8802  8803 8804如何。

四:深刻研究sqlalchemy源碼

VENV\Flask_Base\Lib\site-packages\sqlalchemy\engine\__init__.py

看起來,沒有默認值。因此engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)報錯頻率比較高。

五:研究pool源碼

VENV\Flask_Base\Lib\site-packages\sqlalchemy\pool.py

看來poolclass的類型都定義在這裏了。

1.SingletonThreadPool

A Pool that maintains one connection per thread

每一個線程維護一個鏈接的池。

2.QueuePool

A :class:`.Pool` that imposes a limit on the number of open connections.

這種方式限制了鏈接數量,QueuePool是默認的鏈接池方式,除非使用了方言,也就是第三方連接庫。

難怪我使用MySQL-connector-python時老出錯呢,沒打開鏈接池啊。

3.NullPool

A Pool which does not pool connections...

不使用鏈接池

4.StaticPool

A Pool of exactly one connection, used for all requests.

一個完整的鏈接池,用於全部的鏈接。

5.AssertionPool

A :class:`.Pool` that allows at most one checked out connection at any given time.

任什麼時候間只給一個簽出鏈接?爲了debug模式?不懂了。

看的官方說明也沒這麼詳細。

這麼看來,若是我使用默認連接庫,能夠不加參數試試。

mysql-python是sqlalchemy默認的mysql連接庫,我在windows下裝不上。放棄測試默認連接庫,手動指定鏈接池爲QueuePool。

或者指定鏈接池類型爲:QueuePool   StaticPool   SingletonThreadPool(多線程的時候)

六:鏈接池類型測試

修改測試docker

docker run  --restart=always --privileged --name My_mariadb_01 -p 3301:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_02 -p 3302:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_03 -p 3303:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_04 -p 3304:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_05 -p 3305:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_06 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13

Flask_Plan_01   8801       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_pre_ping=True))
Flask_Plan_02   8802       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=SingletonThreadPool)
Flask_Plan_03   8803       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=QueuePool)
Flask_Plan_04   8804       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=NullPool)
Flask_Plan_05   8805       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=StaticPool)
Flask_Plan_06   8806       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=AssertionPool)

七:編寫測試腳本

 

import requests
import time
i = 1
while True:
    try:
        r=requests.get('http://192.168.0.104:8801',timeout=5)
        if  r.status_code==200:
            print(time.strftime('%Y-%m-%d %H:%M:%S')+'---'+str(i)+'---'+str(r.status_code)+'---ok')
        else:
            print(time.strftime('%Y-%m-%d %H:%M:%S') + '---' + str(i) + '---' + str(r.status_code) + '-----------badr')
            break
        time.sleep(1)
        i+=1
    except:
        print('except')
        print(time.strftime('%Y-%m-%d %H:%M:%S') +'---'+str(i)+'-----------bad')
        break

修改地址,把幾個測試服務都開始跑。

出錯就會停了。

代碼很爛,湊活測試而已。

從晚上22:30睡覺到早上6:10起牀,pool_pre_ping=True,SingletonThreadPool,QueuePool,NullPool,StaticPool,AssertionPool,都很穩定,訪問代碼都是200

八:繼續研究相關代碼

http://docs.sqlalchemy.org/en/latest/core/pooling.html?highlight=use_threadlocal#using-connection-pools-with-multiprocessing

使用鏈接池進行多重處理

http://docs.sqlalchemy.org/en/latest/core/pooling.html?highlight=use_threadlocal#api-documentation-available-pool-implementations

api文檔--鏈接池的實現

classsqlalchemy.pool.Pool(creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)

 

Parameters:    
creator–可調用的函數返回對象。
recycle– 超時回收時間。若是鏈接超過這個時間,鏈接就被關閉,換一個新的鏈接
logging_name - 日誌標識名稱
echo– 是否打印sql語句
use_threadlocal–是否使用線程,在同一應用程序的線程使用相同的鏈接對象
reset_on_return–在返回前的操做
    rollback,大概是自動回滾
    True 同爲回滾
    commit 大概是自動提交的意思
    None 無操做
    none 無操做
    False 無操做
events– 列表元組,每一個表單會傳遞給listen………………沒搞懂
listeners - 棄用,被listen取代
dialect–連接庫,使用create_engine時不使用,由引擎建立時處理
pre_ping–是否測試鏈接

基本上這些參數都在engine-creation-api中

http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html#engine-creation-api

Pool                  (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
StaticPool         (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
NullPool            (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
QueuePool          (creator,pool_size=5,max_overflow=10,timeout=30,**kw)
SingletonThreadPool(creator,pool_size=5,**kw)
AssertionPool      (*args,**kw)

這下清楚了,Pool,StaicPool,NullPool,都同樣,直接回收,效率必定低了。

咱們就指定默認的QueuePool好了。之後觀察着服務器的負載,負載大了之後,調整就行了。

自定義方法以下:

engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',
                       pool_size=5,
                       max_overflow=10,
                       pool_timeout=30,
                       pool_pre_ping=True)

九:總結

曲折的道路,終於找到了解決方案。

sqlalchemy的教程當中,不多有講如何部署的。不少又是linux開發。可能在linux下很容易裝默認連接庫,部署的時候就自動使用了QueuePool鏈接池。因此這種問題不多出現。

我在windows下開發,部署在linux,開發和部署都使用了非默認連接庫,致使沒有使用默認鏈接池。

那麼隨着深刻研究,找到了鏈接池的配置,並掌握這一知識,爲之後的開發部署工做,掃除了障礙。

雖然源碼裏面還有不少看不懂,可是讀書百遍其義自見,仍是要多讀(我是懶蛋,遇到問題,再去解決,下一個問題是什麼呢?)。

相關文章
相關標籤/搜索