python redis連接創建實現分析

  今天在寫zabbix storm job監控腳本的時候用到了python的redis模塊,以前也有用過,可是沒有過多的瞭解,今天看了下相關的api和源碼,看到有ConnectionPool的實現,這裏簡單說下。
在ConnectionPool以前,若是須要鏈接redis,我都是用StrictRedis這個類,在源碼中能夠看到這個類的具體解釋:
python

redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an 
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server

使用的方法:
redis

 r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
 r.xxxx()

有了ConnectionPool這個類以後,可使用以下方法
api

pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx)
r = redis.Redis(connection_pool=pool)

這裏Redis是StrictRedis的子類
簡單分析以下:
在StrictRedis類的__init__方法中,能夠初始化connection_pool這個參數,其對應的是一個ConnectionPool的對象:數組

class StrictRedis(object):
........
    def __init__(self, host='localhost', port=6379,
                 db=0, password=None, socket_timeout=None,
                 socket_connect_timeout=None,
                 socket_keepalive=None, socket_keepalive_options=None,
                 connection_pool=None, unix_socket_path=None,
                 encoding='utf-8', encoding_errors='strict',
                 charset=None, errors=None,
                 decode_responses=False, retry_on_timeout=False,
                 ssl=False, ssl_keyfile=None, ssl_certfile=None,
                 ssl_cert_reqs=None, ssl_ca_certs=None):
         if not connection_pool:
             ..........
              connection_pool = ConnectionPool(**kwargs)
         self.connection_pool = connection_pool

在StrictRedis的實例執行具體的命令時會調用execute_command方法,這裏能夠看到具體實現是從鏈接池中獲取一個具體的鏈接,而後執行命令,完成後釋放鏈接:

bash

   # COMMAND EXECUTION AND PROTOCOL PARSING
    def execute_command(self, *args, **options):
        "Execute a command and return a parsed response"
        pool = self.connection_pool
        command_name = args[0]
        connection = pool.get_connection(command_name, **options)  #調用ConnectionPool.get_connection方法獲取一個鏈接
        try:
            connection.send_command(*args)  #命令執行,這裏爲Connection.send_command
            return self.parse_response(connection, command_name, **options)
        except (ConnectionError, TimeoutError) as e:
            connection.disconnect()
            if not connection.retry_on_timeout and isinstance(e, TimeoutError):
                raise
            connection.send_command(*args)  
            return self.parse_response(connection, command_name, **options)
        finally:
            pool.release(connection)  #調用ConnectionPool.release釋放鏈接

在來看看ConnectionPool類:
app

     class ConnectionPool(object):  
       ...........
    def __init__(self, connection_class=Connection, max_connections=None,
                 **connection_kwargs):   #類初始化時調用構造函數
        max_connections = max_connections or 2 ** 31
        if not isinstance(max_connections, (int, long)) or max_connections < 0:  #判斷輸入的max_connections是否合法
            raise ValueError('"max_connections" must be a positive integer')
        self.connection_class = connection_class  #設置對應的參數
        self.connection_kwargs = connection_kwargs
        self.max_connections = max_connections
        self.reset()  #初始化ConnectionPool 時的reset操做
    def reset(self):
        self.pid = os.getpid()
        self._created_connections = 0  #已經建立的鏈接的計數器
        self._available_connections = []   #聲明一個空的數組,用來存放可用的鏈接
        self._in_use_connections = set()  #聲明一個空的集合,用來存放已經在用的鏈接
        self._check_lock = threading.Lock()
.......
    def get_connection(self, command_name, *keys, **options):  #在鏈接池中獲取鏈接的方法
        "Get a connection from the pool"
        self._checkpid()
        try:
            connection = self._available_connections.pop()  #獲取並刪除表明鏈接的元素,在第一次獲取connectiong時,由於_available_connections是一個空的數組,
            會直接調用make_connection方法
        except IndexError:
            connection = self.make_connection()
        self._in_use_connections.add(connection)   #向表明正在使用的鏈接的集合中添加元素
        return connection   
    def make_connection(self): #在_available_connections數組爲空時獲取鏈接調用的方法
        "Create a new connection"
        if self._created_connections >= self.max_connections:   #判斷建立的鏈接是否已經達到最大限制,max_connections能夠經過參數初始化
            raise ConnectionError("Too many connections")
        self._created_connections += 1   #把表明已經建立的鏈接的數值+1
        return self.connection_class(**self.connection_kwargs)     #返回有效的鏈接,默認爲Connection(**self.connection_kwargs)
    def release(self, connection):  #釋放鏈接,連接並無斷開,只是存在連接池中
        "Releases the connection back to the pool"
        self._checkpid()
        if connection.pid != self.pid:
            return
        self._in_use_connections.remove(connection)   #從集合中刪除元素
        self._available_connections.append(connection) #並添加到_available_connections 的數組中
    def disconnect(self): #斷開全部鏈接池中的連接
        "Disconnects all connections in the pool"
        all_conns = chain(self._available_connections,
                          self._in_use_connections)
        for connection in all_conns:
            connection.disconnect()

execute_command最終調用的是Connection.send_command方法,關閉連接爲 Connection.disconnect方法,而Connection類的實現:
socket

class Connection(object):
    "Manages TCP communication to and from a Redis server"
    def __del__(self):   #對象刪除時的操做,調用disconnect釋放鏈接
        try:
            self.disconnect()
        except Exception:
            pass

核心的連接創建方法是經過socket模塊實現:
tcp

    def _connect(self):
        err = None
        for res in socket.getaddrinfo(self.host, self.port, 0,
                                      socket.SOCK_STREAM):
            family, socktype, proto, canonname, socket_address = res
            sock = None
            try:
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
                # TCP_KEEPALIVE
                if self.socket_keepalive:   #構造函數中默認 socket_keepalive=False,所以這裏默認爲短鏈接
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in iteritems(self.socket_keepalive_options):
                        sock.setsockopt(socket.SOL_TCP, k, v)
                # set the socket_connect_timeout before we connect
                sock.settimeout(self.socket_connect_timeout)  #構造函數中默認socket_connect_timeout=None,即鏈接爲blocking的模式
                # connect
                sock.connect(socket_address)
                # set the socket_timeout now that we're connected
                sock.settimeout(self.socket_timeout)  #構造函數中默認socket_timeout=None
                return sock
            except socket.error as _:
                err = _
                if sock is not None:
                    sock.close()
.....

關閉連接的方法:
ide

    def disconnect(self):
        "Disconnects from the Redis server"
        self._parser.on_disconnect()
        if self._sock is None:
            return
        try:
            self._sock.shutdown(socket.SHUT_RDWR)  #先shutdown再close
            self._sock.close()
        except socket.error:
            pass
        self._sock = None

        
能夠小結以下
1)默認狀況下每建立一個Redis實例都會構造出一個ConnectionPool實例,每一次訪問redis都會從這個鏈接池獲得一個鏈接,操做完成後會把該鏈接放回鏈接池(鏈接並無釋放),能夠構造一個統一的ConnectionPool,在建立Redis實例時,能夠將該ConnectionPool傳入,那麼後續的操做會從給定的ConnectionPool得到鏈接,不會再重複建立ConnectionPool。
2)默認狀況下沒有設置keepalive和timeout,創建的鏈接是blocking模式的短鏈接。
3)不考慮底層tcp的狀況下,鏈接池中的鏈接會在ConnectionPool.disconnect中統一銷燬。
函數

相關文章
相關標籤/搜索