帶你進入數據庫鏈接池

[原文連接](https://mp.weixin.qq.com/s/7wT_mw4uC0GuhhsJJIV0Pg)複製代碼

概述

鏈接池的做用就是爲了提升性能,將已經建立好的鏈接保存在池中,當有請求來時,直接使用已經建立好的鏈接對Server端進行訪問。這樣省略了建立鏈接和銷燬鏈接的過程(TCP鏈接創建時的三次握手和銷燬時的四次握手),從而在性能上獲得了提升。
鏈接池設計的基本原理是這樣的:
(1)創建鏈接池對象(服務啓動)。
(2)按照事先指定的參數建立初始數量的鏈接(即:空閒鏈接數)。
(3)對於一個訪問請求,直接從鏈接池中獲得一個鏈接。若是鏈接池對象中沒有空閒的鏈接,且鏈接數沒有達到最大(即:最大活躍鏈接數),建立一個新的鏈接;若是達到最大,則設定必定的超時時間,來獲取鏈接。
(4)運用鏈接訪問服務。
(5)訪問服務完成,釋放鏈接(此時的釋放鏈接,並不是真正關閉,而是將其放入空閒隊列中。如實際空閒鏈接數大於初始空閒鏈接數則釋放鏈接)。
(6)釋放鏈接池對象(服務中止、維護期間,釋放鏈接池對象,並釋放全部鏈接)。python

說的通俗點,能夠把鏈接池理解爲一個一個的管道,在管道空閒時,即可以取出使用;同時,也能夠鋪設新的管道(固然不能超過最大鏈接數的限制)。使用完以後,管道就變爲空閒了。數據庫

一般比較經常使用的鏈接池是數據庫鏈接池,HTTP Client鏈接池,我也本身編寫過鏈接池,如Thrift鏈接池及插入Rabbitmq隊列的鏈接池。數組

下面分析三個典型的鏈接池的設計。緩存

數據庫鏈接池

首先剖析一下數據庫鏈接池的設計與實現的原理。DBUtils 屬於數據庫鏈接池實現模塊,用於鏈接DB-API 2模塊,對數據庫鏈接線程化,使能夠安全和高效的訪問數據庫的模塊。本文主要分析一下PooledDB的流程。安全

DBUtils.PooledDB使用DB-API 2模塊實現了一個強硬的、線程安全的、有緩存的、可複用的數據庫鏈接。bash

以下圖展現了使用PooledDB時的工做流程:服務器

本文主要考慮dedicated connections,即專用數據庫鏈接,在初始化時鏈接池時,就須要指定mincached、maxcached以及maxconnections等參數,分別表示鏈接池的最小鏈接數、鏈接池的最大鏈接數以及系統可用的最大鏈接數,同時,blocking參數表徵了當獲取不到鏈接的時候是阻塞等待獲取鏈接仍是返回異常:架構

if not blocking:
    def wait():
        raise TooManyConnections
    self._condition.wait = wait複製代碼

在鏈接池初始化時,就會創建mincached個鏈接,代碼以下:併發

# Establish an initial number of idle database connections:
idle = [self.dedicated_connection() for i in range(mincached)]
while idle:
    idle.pop().close()複製代碼

裏面有close方法,看一下鏈接close方法的實現:app

def close(self):
    """Close the pooled dedicated connection."""
    # Instead of actually closing the connection,
    # return it to the pool for future reuse.
    if self._con:
        self._pool.cache(self._con)
        self._con = None複製代碼

主要是實現了cache方法,看一下具體代碼:

def cache(self, con):
    """Put a dedicated connection back into the idle cache."""
    self._condition.acquire()
    try:
        if not self._maxcached or len(self._idle_cache) < self._maxcached:
            con._reset(force=self._reset) # rollback possible transaction
            # the idle cache is not full, so put it there
            self._idle_cache.append(con) # append it to the idle cache
        else: # if the idle cache is already full,
            con.close() # then close the connection
        self._connections -= 1
        self._condition.notify()
    finally:
        self._condition.release()複製代碼

由上述代碼可見,close並非把鏈接關閉,而是在鏈接池的數目小於maxcached的時候,將鏈接放回鏈接池,而大於此值時,關閉該鏈接。同時能夠注意到,在放回鏈接池以前,須要將事務進行回滾,避免在使用鏈接池的時候有存活的事務沒有提交。這能夠保證進入鏈接池的鏈接都是可用的。

而獲取鏈接的過程正如以前討論的,先從鏈接池中獲取鏈接,若是獲取鏈接失敗,則新創建鏈接:

# try to get a dedicated connection
    self._condition.acquire()
    try:
        while (self._maxconnections
                and self._connections >= self._maxconnections):
            self._condition.wait()
        # connection limit not reached, get a dedicated connection
        try: # first try to get it from the idle cache
            con = self._idle_cache.pop(0)
        except IndexError: # else get a fresh connection
            con = self.steady_connection()
        else:
            con._ping_check() # check connection
        con = PooledDedicatedDBConnection(self, con)
        self._connections += 1
    finally:
        self._condition.release()複製代碼

關閉鏈接正如剛剛建立mincached個鏈接後關閉鏈接的流程,在鏈接池的數目小於maxcached的時候,將鏈接放回鏈接池,而大於此值時,關閉該鏈接。

RabbitMQ隊列插入消息鏈接池

異步消息傳遞是高併發系統經常使用的一種技術手段。而這其中就少不了消息隊列。頻繁的向消息隊列裏面插入消息,創建鏈接釋放鏈接會是比較大的開銷。因此,可使用鏈接池來提升系統性能。

鏈接池的設計實現以下:

在獲取鏈接的時候,先從隊列裏面獲取鏈接,若是獲取不到,則新創建一個鏈接,若是不能新創建鏈接,則根據超時時間,阻塞等待從隊列裏面獲取連接。若是沒成功,則作最後的嘗試,從新創建鏈接。代碼實現以下:

def get_connection_pipe(self):
        """ 獲取鏈接 :return: """
        try:
            connection_pipe = self._queue.get(False)
        except Queue.Empty:
            try:
                connection_pipe = self.get_new_connection_pipe()
            except GetConnectionException:
                timeout = self.timeout
                try:
                    connection_pipe = self._queue.get(timeout=timeout)
                except Queue.Empty:
                    try:
                        connection_pipe = self.get_new_connection_pipe()
                    except GetConnectionException:
                        logging.error("Too much connections, Get Connection Timeout!")
        if (time.time() - connection_pipe.use_time) > self.disable_time:
            self.close(connection_pipe)
            return self.get_connection_pipe()
        return connection_pipe複製代碼

一個RabbitMQ插入消息隊列的完整鏈接池設計以下:

# coding:utf-8
import logging
import threading
import Queue
from kombu import Connection
import time

class InsertQueue():
    def __init__(self, host=None, port=None, virtual_host=None, heartbeat_interval=3, name=None, password=None, logger=None, maxIdle=10, maxActive=50, timeout=30, disable_time=20):
        """ :param str host: Hostname or IP Address to connect to :param int port: TCP port to connect to :param str virtual_host: RabbitMQ virtual host to use :param int heartbeat_interval: How often to send heartbeats :param str name: auth credentials name :param str password: auth credentials password """
        self.logger = logging if logger is None else logger
        self.host = host
        self.port = port
        self.virtual_host = virtual_host
        self.heartbeat_interval = heartbeat_interval
        self.name = name
        self.password = password
        self.mutex = threading.RLock()
        self.maxIdle = maxIdle
        self.maxActive = maxActive
        self.available = self.maxActive
        self.timeout = timeout
        self._queue = Queue.Queue(maxsize=self.maxIdle)
        self.disable_time = disable_time

    def get_new_connection_pipe(self):
        """ 產生新的隊列鏈接 :return: """

        with self.mutex:
            if self.available <= 0:
                raise GetConnectionException
            self.available -= 1
        try:

            conn = Connection(hostname=self.host,
                              port=self.port,
                              virtual_host=self.virtual_host,
                              heartbeat=self.heartbeat_interval,
                              userid=self.name,
                              password=self.password)
            producer = conn.Producer()

            return ConnectionPipe(conn, producer)
        except:
            with self.mutex:
                self.available += 1
            raise GetConnectionException

    def get_connection_pipe(self):
        """ 獲取鏈接 :return: """
        try:
            connection_pipe = self._queue.get(False)
        except Queue.Empty:
            try:
                connection_pipe = self.get_new_connection_pipe()
            except GetConnectionException:
                timeout = self.timeout
                try:
                    connection_pipe = self._queue.get(timeout=timeout)
                except Queue.Empty:
                    try:
                        connection_pipe = self.get_new_connection_pipe()
                    except GetConnectionException:
                        logging.error("Too much connections, Get Connection Timeout!")
        if (time.time() - connection_pipe.use_time) > self.disable_time:
            self.close(connection_pipe)
            return self.get_connection_pipe()
        return connection_pipe

    def close(self, connection_pipe):
        """ close the connection and the correlative channel :param connection_pipe: :return: """
        with self.mutex:
            self.available += 1
            connection_pipe.close()
        return

    def insert_message(self, exchange=None, body=None, routing_key='', mandatory=True):
        """ insert message to queue :param str exchange: exchange name :param str body: message :param str routing_key: routing key :param bool mandatory: is confirm: True means confirm, False means not confirm :return: """

        put_into_queue_flag = True
        insert_result = False
        connection_pipe = None
        try:

            connection_pipe = self.get_connection_pipe()
            producer = connection_pipe.channel
            use_time = time.time()
            producer.publish(exchange=exchange,
                                             body=body,
                                             delivery_mode=2,
                                             routing_key=routing_key,
                                             mandatory=mandatory
                                             )
            insert_result = True

        except Exception:
            insert_result = False
            put_into_queue_flag = False
        finally:

            if put_into_queue_flag is True:
                try:
                    connection_pipe.use_time = use_time
                    self._queue.put_nowait(connection_pipe)
                except Queue.Full:
                    self.close(connection_pipe)
            else:
                if connection_pipe is not None:
                    self.close(connection_pipe)

        return insert_result

class ConnectionPipe(object):
    """ connection和channel對象的封裝 """

    def __init__(self, connection, channel):
        self.connection = connection
        self.channel = channel
        self.use_time = time.time()

    def close(self):
        try:
            self.connection.close()
        except Exception as ex:
            pass

class GetConnectionException():
    """ 獲取鏈接異常 """
    pass複製代碼

Thrift鏈接池

Thrift是什麼呢?簡而言之,Thrift定義一個簡單的文件,包含數據類型和服務接口,以做爲輸入文件,編譯器生成代碼用來方便地生成RPC客戶端和服務器通訊的方式。實際上就是一種遠程調用的方式,由於協議棧爲TCP層,因此相對於HTTP層效率會更高。

Thrift鏈接池的設計同數據庫鏈接池相似,流程圖以下:

思路依舊是,在獲取鏈接時,先從鏈接池中獲取鏈接,若池中無鏈接,則判斷是否能夠新建鏈接,若不能新建鏈接,則阻塞等待鏈接。

在從池中獲取不到隊列的時候的處理方式,本設計處理方式爲:當獲取不到鏈接時,將這部分請求放入一個等待隊列,等待獲取鏈接;而當關閉鏈接放回鏈接池時,優先判斷這個隊列是否有等待獲取鏈接的請求,如有,則優先分配給這些請求。

獲取不到鏈接時處理代碼以下,將請求放入一個隊列進行阻塞等待獲取鏈接:

async_result = AsyncResult()
self.no_client_queue.appendleft(async_result)
client = async_result.get()  # blocking複製代碼

而當有鏈接釋放須要放回鏈接池時,須要優先考慮這部分請求,代碼以下:

def put_back_connections(self, client):
    """ 線程安全 將鏈接放回鏈接池,邏輯以下: 一、若是有請求還沒有獲取到鏈接,請求優先 二、若是鏈接池中的鏈接的數目小於maxIdle,則將該鏈接放回鏈接池 三、關閉鏈接 :param client: :return: """
    with self.lock:
        if self.no_client_queue.__len__() > 0:
            task = self.no_client_queue.pop()
            task.set(client)
        elif self.connections.__len__() < self.maxIdle:
            self.connections.add(client)
        else:
            client.close()
            self.pool_size -= 1複製代碼

最後,基於thrift鏈接池,介紹一個簡單的服務化框架的實現。

服務化框架分爲兩部分:RPC、註冊中心。
一、RPC:遠程調用,遠程調用的傳輸協議有不少種,能夠走http、Webservice、TCP等。Thrift也是世界上主流的RPC框架。其重點在於安全、快速、最好能跨語言。
二、註冊中心:用於存放,服務的IP地址和端口信息等。比較好的存放服務信息的方案有:Zookeeper、Redis等。其重點在於避免單點問題,而且好維護。

一般的架構圖爲:

經過Thrift鏈接池做爲客戶端,而Zookeeper做爲註冊中心,設計服務框架。具體就是服務端在啓動服務的時候到Zookeeper進行註冊,而客戶端在啓動的時候經過Zookeeper發現服務端的IP和端口,經過Thrift鏈接池輪詢創建鏈接訪問服務端的服務。

具體設計的代碼以下,代碼有點長,細細研讀必定有所收穫的:

# coding: utf-8

import threading
from collections import deque
import logging
import socket
import time
from kazoo.client import KazooClient
from thriftpy.protocol import TBinaryProtocolFactory
from thriftpy.transport import (
    TBufferedTransportFactory,
    TSocket,
)
from gevent.event import AsyncResult
from gevent import Timeout

from error import CTECThriftClientError
from thriftpy.thrift import TClient
from thriftpy.transport import TTransportException

class ClientPool:
    def __init__(self, service, server_hosts=None, zk_path=None, zk_hosts=None, logger=None, max_renew_times=3, maxActive=20, maxIdle=10, get_connection_timeout=30, socket_timeout=30, disable_time=3):
        """ :param service: Thrift的Service名稱 :param server_hosts: 服務提供者地址,數組類型,['ip:port','ip:port'] :param zk_path: 服務提供者在zookeeper中的路徑 :param zk_hosts: zookeeper的host地址,多個請用逗號隔開 :param max_renew_times: 最大重連次數 :param maxActive: 最大鏈接數 :param maxIdle: 最大空閒鏈接數 :param get_connection_timeout:獲取鏈接的超時時間 :param socket_timeout: 讀取數據的超時時間 :param disable_time: 鏈接失效時間 """
        # 負載均衡隊列
        self.load_balance_queue = deque()
        self.service = service
        self.lock = threading.RLock()
        self.max_renew_times = max_renew_times
        self.maxActive = maxActive
        self.maxIdle = maxIdle
        self.connections = set()
        self.pool_size = 0
        self.get_connection_timeout = get_connection_timeout
        self.no_client_queue = deque()
        self.socket_timeout = socket_timeout
        self.disable_time = disable_time
        self.logger = logging if logger is None else logger

        if zk_hosts:
            self.kazoo_client = KazooClient(hosts=zk_hosts)
            self.kazoo_client.start()
            self.zk_path = zk_path
            self.zk_hosts = zk_hosts
            # 定義Watcher
            self.kazoo_client.ChildrenWatch(path=self.zk_path,
                                            func=self.watcher)
            # 刷新鏈接池中的鏈接對象
            self.__refresh_thrift_connections(self.kazoo_client.get_children(self.zk_path))
        elif server_hosts:
            self.server_hosts = server_hosts
            # 複製新的IP地址到負載均衡隊列中
            self.load_balance_queue.extendleft(self.server_hosts)
        else:
            raise CTECThriftClientError('沒有指定服務器獲取方式!')

    def get_new_client(self):
        """ 輪詢在每一個ip:port的鏈接池中獲取鏈接(線程安全) 從當前隊列右側取出ip:port信息,獲取client 將鏈接池對象放回到當前隊列的左側 請求或鏈接超時時間,默認30秒 :return: """
        with self.lock:
            if self.pool_size < self.maxActive:
                try:
                    ip = self.load_balance_queue.pop()
                except IndexError:
                    raise CTECThriftClientError('沒有可用的服務提供者列表!')
                if ip:
                    self.load_balance_queue.appendleft(ip)
                    # 建立新的thrift client
                    t_socket = TSocket(ip.split(':')[0], int(ip.split(':')[1]),
                                       socket_timeout=1000 * self.socket_timeout)
                    proto_factory = TBinaryProtocolFactory()
                    trans_factory = TBufferedTransportFactory()
                    transport = trans_factory.get_transport(t_socket)
                    protocol = proto_factory.get_protocol(transport)
                    transport.open()
                    client = TClient(self.service, protocol)
                    self.pool_size += 1
                return client
            else:
                return None

    def close(self):
        """ 關閉全部鏈接池和zk客戶端 :return: """
        if getattr(self, 'kazoo_client', None):
            self.kazoo_client.stop()

    def watcher(self, children):
        """ zk的watcher方法,負責檢測zk的變化,刷新當前雙端隊列中的鏈接池 :param children: 子節點,即服務提供方的列表 :return: """
        self.__refresh_thrift_connections(children)

    def __refresh_thrift_connections(self, children):
        """ 刷新服務提供者在當前隊列中的鏈接池信息(線程安全),主要用於zk刷新 :param children: :return: """
        with self.lock:
            # 清空負載均衡隊列
            self.load_balance_queue.clear()
            # 清空鏈接池
            self.connections.clear()
            # 複製新的IP地址到負載均衡隊列中
            self.load_balance_queue.extendleft(children)

    def __getattr__(self, name):
        """ 函數調用,最大重試次數爲max_renew_times :param name: :return: """

        def method(*args, **kwds):

            # 從鏈接池獲取鏈接
            client = self.get_client_from_pool()

            # 鏈接池中無鏈接
            if client is None:
                # 設置獲取鏈接的超時時間
                time_out = Timeout(self.get_connection_timeout)
                time_out.start()
                try:
                    async_result = AsyncResult()
                    self.no_client_queue.appendleft(async_result)
                    client = async_result.get()  # blocking
                except:
                    with self.lock:
                        if client is None:
                            self.no_client_queue.remove(async_result)
                            self.logger.error("Get Connection Timeout!")
                finally:
                    time_out.cancel()

            if client is not None:

                for i in xrange(self.max_renew_times):

                    try:
                        put_back_flag = True
                        client.last_use_time = time.time()
                        fun = getattr(client, name, None)
                        return fun(*args, **kwds)
                    except socket.timeout:
                        self.logger.error("Socket Timeout!")
                        # 關閉鏈接,不關閉會致使亂序
                        put_back_flag = False
                        self.close_one_client(client)
                        break

                    except TTransportException, e:
                        put_back_flag = False

                        if e.type == TTransportException.END_OF_FILE:
                            self.logger.warning("Socket Connection Reset Error,%s", e)
                            with self.lock:
                                client.close()
                                self.pool_size -= 1
                                client = self.get_new_client()
                        else:
                            self.logger.error("Socket Error,%s", e)
                            self.close_one_client(client)
                            break

                    except socket.error, e:
                        put_back_flag = False
                        if e.errno == socket.errno.ECONNABORTED:
                            self.logger.warning("Socket Connection aborted Error,%s", e)
                            with self.lock:
                                client.close()
                                self.pool_size -= 1
                                client = self.get_new_client()
                        else:
                            self.logger.error("Socket Error, %s", e)
                            self.close_one_client(client)
                            break

                    except Exception as e:
                        put_back_flag = False

                        self.logger.error("Thrift Error, %s", e)
                        self.close_one_client(client)
                        break

                    finally:
                        # 將鏈接放回鏈接池
                        if put_back_flag is True:
                            self.put_back_connections(client)
            return None

        return method

    def close_one_client(self, client):
        """ 線程安全 關閉鏈接 :param client: :return: """
        with self.lock:
            client.close()
            self.pool_size -= 1

    def put_back_connections(self, client):
        """ 線程安全 將鏈接放回鏈接池,邏輯以下: 一、若是有請求還沒有獲取到鏈接,請求優先 二、若是鏈接池中的鏈接的數目小於maxIdle,則將該鏈接放回鏈接池 三、關閉鏈接 :param client: :return: """
        with self.lock:
            if self.no_client_queue.__len__() > 0:
                task = self.no_client_queue.pop()
                task.set(client)
            elif self.connections.__len__() < self.maxIdle:
                self.connections.add(client)
            else:
                client.close()
                self.pool_size -= 1

    def get_client_from_pool(self):
        """ 線程安全 從鏈接池中獲取鏈接,若鏈接池中有鏈接,直接取出,不然, 新建一個鏈接,若一直沒法獲取鏈接,則返回None :return: """
        client = self.get_one_client_from_pool()

        if client is not None and (time.time() - client.last_use_time) < self.disable_time:
            return client
        else:
            if client is not None:
                self.close_one_client(client)

        client = self.get_new_client()
        if client is not None:
            return client

        return None

    def get_one_client_from_pool(self):
        """ 線程安全 從鏈接池中獲取一個鏈接,若取不到鏈接,則返回None :return: """
        with self.lock:
            if self.connections:
                try:
                    return self.connections.pop()
                except KeyError:
                    return None
            return None複製代碼
相關文章
相關標籤/搜索