源碼剖析.Python.redis模塊兒鏈接池實現代碼剖析?

安裝方法

pip install --upgrade redis

代碼文件

/usr/lib/python2.7/site-packages/redis

長短鏈接

短連接

客戶端向服務端發送請求,服務端收到請求,雙方創建鏈接,客戶端給服務端發送消息,服務端迴應客戶端,而後一次讀寫就完成了,這時雙方任何一個均可以發送關閉請求,不過通常是客戶端,短連接管理起來比較方便,存在的鏈接都是有用的,不須要額外的控制手段python

長鏈接

客戶端向服務端發送請求,服務器接收客戶端請求,雙方創建鏈接,客戶端和服務端之間完成一次讀寫以後,它們之間的的鏈接不會主動關閉,後續的操做會繼續使用這個鏈接,可是一旦開啓此鏈接將不會主動斷開,除非客戶端掛掉/主動斷開,可是服務端依然保持半開式鏈接等待客戶端數據,長用TCP保活功能keepalive和超時時間timeout來關閉這些半開式鏈接redis

鏈接池原理

一個數據庫鏈接對象均爲一個物理數據庫鏈接,每次操做都打開一個物理鏈接,使用完後都關閉鏈接,這樣會形成系統的性能低下,數據庫鏈接池的解決方案是在應用程序啓動時創建足夠多的數據庫鏈接,並將這些鏈接放在一個鏈接池中,由應用程序動態對池中的鏈接進行申請,使用和釋放,對於多餘鏈接池中鏈接數的併發請求,會放在請求中排列等待,而且應用程序能夠根據鏈接池中的鏈接使用率,動態的增長或是減小池中的鏈接數,鏈接池技術儘量多地重用了消耗內存的資源(避免了重複性的鏈接關閉的消耗),大大節省了內存,提升了服務效率,支持更多的客戶端,同時能夠經過其自身的管理機制來監視數據庫的鏈接數和使用狀況數據庫

1.最小鏈接數是鏈接池一直保持的數據庫鏈接,因此全部應用程序對數據庫鏈接的使用量不大,將會形成大量的數據庫鏈接資源被浪費服務器

2.最大鏈接數是鏈接池能申請的最大鏈接數,若是數據庫鏈接請求超過此數,後面的請求將被加入到等待隊列,這會影響以後的數據庫操做​​​​​併發

源碼剖析

/usr/lib/python2.7/site-packages/redis/client.pyapp

# Redis是StrictRedis的子類
class Redis(StrictRedis):
    """
    Provides backwards compatibility with older versions of redis-py that
    changed arguments to some commands to be more Pythonic, sane, or by
    accident.
    """

說明:從如上代碼能夠看出Redis是StrictRedis的子類,兩個類下的方法相似,後者用於實現大部分官方的命令,而前者爲了向後兼容舊版模塊兒,因此官方推薦使用StrictRedis,Redis子類下的LREM/ZADD/SETEX絕對是個坑~要麼本身嘗試,要麼乖乖用StrictRedis~python2.7

# StrictRedis基類
class StrictRedis(object):
    """
    Implementation of the Redis protocol.

    This abstract class provides a Python interface to all Redis commands
    and an implementation of the Redis protocol.

    Connection and Pipeline derive from this, implementing how
    the commands are sent and received to the Redis server
    """
    
    def __init__(self, host='localhost', port=6379,
                 db=0, password=None, socket_timeout=None,
                 socket_connect_timeout=None,
                 socket_keepalive=None, socket_keepalive_options=None,
                 connection_pool=None, unix_socket_path=None,
                 encoding='utf-8', encoding_errors='strict',
                 charset=None, errors=None,
                 decode_responses=False, retry_on_timeout=False,
                 ssl=False, ssl_keyfile=None, ssl_certfile=None,
                 ssl_cert_reqs=None, ssl_ca_certs=None):
        # 若是connection_pool沒有定義的話就嘗試獲取其它設置後並從新建立鏈接池
        if not connection_pool:
            if charset is not None:
                warnings.warn(DeprecationWarning(
                    '"charset" is deprecated. Use "encoding" instead'))
                encoding = charset
            if errors is not None:
                warnings.warn(DeprecationWarning(
                    '"errors" is deprecated. Use "encoding_errors" instead'))
                encoding_errors = errors

            kwargs = {
                'db': db,
                'password': password,
                'socket_timeout': socket_timeout,
                'encoding': encoding,
                'encoding_errors': encoding_errors,
                'decode_responses': decode_responses,
                'retry_on_timeout': retry_on_timeout
            }
            # based on input, setup appropriate connection args
            if unix_socket_path is not None:
                kwargs.update({
                    'path': unix_socket_path,
                    'connection_class': UnixDomainSocketConnection
                })
            else:
                # TCP specific options
                kwargs.update({
                    'host': host,
                    'port': port,
                    'socket_connect_timeout': socket_connect_timeout,
                    'socket_keepalive': socket_keepalive,
                    'socket_keepalive_options': socket_keepalive_options,
                })

                if ssl:
                    kwargs.update({
                        'connection_class': SSLConnection,
                        'ssl_keyfile': ssl_keyfile,
                        'ssl_certfile': ssl_certfile,
                        'ssl_cert_reqs': ssl_cert_reqs,
                        'ssl_ca_certs': ssl_ca_certs,
                    })
            connection_pool = ConnectionPool(**kwargs)
        # 若是沒有已經建立鏈接池則使用已經建立的鏈接池若是沒有鏈接池則默認也會建立鏈接池
        self.connection_pool = connection_pool
        self._use_lua_lock = None

        self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy()
        
    # COMMAND EXECUTION AND PROTOCOL PARSING
    def execute_command(self, *args, **options):
        "Execute a command and return a parsed response"
        pool = self.connection_pool
        command_name = args[0]
        # 從鏈接池中獲取鏈接執行command_name
        connection = pool.get_connection(command_name, **options)
        try:
            connection.send_command(*args)
            return self.parse_response(connection, command_name, **options)
        except (ConnectionError, TimeoutError) as e:
            connection.disconnect()
            if not connection.retry_on_timeout and isinstance(e, TimeoutError):
                raise
            connection.send_command(*args)
            return self.parse_response(connection, command_name, **options)
        finally:
            pool.release(connection)

/usr/lib/python2.7/site-packages/redis/connection.py:socket

# 建立鏈接池類
class ConnectionPool(object):
    "Generic connection pool"
    @classmethod
    def from_url(cls, url, db=None, **kwargs):
        """
        Return a connection pool configured from the given URL.

        For example::

            redis://[:password]@localhost:6379/0
            rediss://[:password]@localhost:6379/0
            unix://[:password]@/path/to/socket.sock?db=0

        Three URL schemes are supported:
            redis:// creates a normal TCP socket connection
            rediss:// creates a SSL wrapped TCP socket connection
            unix:// creates a Unix Domain Socket connection

        There are several ways to specify a database number. The parse function
        will return the first specified option:
            1. A ``db`` querystring option, e.g. redis://localhost?db=0
            2. If using the redis:// scheme, the path argument of the url, e.g.
               redis://localhost/0
            3. The ``db`` argument to this function.

        If none of these options are specified, db=0 is used.

        Any additional querystring arguments and keyword arguments will be
        passed along to the ConnectionPool class's initializer. In the case
        of conflicting arguments, querystring arguments always win.
        """

    def __init__(self, connection_class=Connection, max_connections=None,
                 **connection_kwargs):
        """
        Create a connection pool. If max_connections is set, then this
        object raises redis.ConnectionError when the pool's limit is reached.

        By default, TCP connections are created connection_class is specified.
        Use redis.UnixDomainSocketConnection for unix sockets.

        Any additional keyword arguments are passed to the constructor of
        connection_class.
        """
        # 最大鏈接數默認爲62
        max_connections = max_connections or 2 ** 31
        if not isinstance(max_connections, (int, long)) or max_connections < 0:
            raise ValueError('"max_connections" must be a positive integer')

        self.connection_class = connection_class
        self.connection_kwargs = connection_kwargs
        self.max_connections = max_connections
        # 初始化線程池
        self.reset()

    def __repr__(self):
        return "%s<%s>" % (
            type(self).__name__,
            self.connection_class.description_format % self.connection_kwargs,
        )
    # 初始化線程池
    def reset(self):
        # 獲取當前的進程號,後面判斷進程是否掛掉
        self.pid = os.getpid()
        # 存放已經建立的鏈接(計數)
        self._created_connections = 0
        # 存放可用的鏈接對象(列表)
        self._available_connections = []
        # 存放正在使用的鏈接對象(集合)
        self._in_use_connections = set()
        # 建立線程鎖
        self._check_lock = threading.Lock()

    def _checkpid(self):
        if self.pid != os.getpid():
            with self._check_lock:
                if self.pid == os.getpid():
                    # another thread already did the work while we waited
                    # on the lock.
                    return
                self.disconnect()
                self.reset()
    # 從鏈接池中獲取鏈接
    def get_connection(self, command_name, *keys, **options):
        "Get a connection from the pool"
        self._checkpid()
        try:
            # 從鏈接池中pop出一個鏈接對象
            connection = self._available_connections.pop()
        except IndexError:
            # 若是鏈接池中已經沒有鏈接的話從新建立鏈接
            connection = self.make_connection()
        # 當前正在使用的鏈接集合中添加此鏈接
        self._in_use_connections.add(connection)
        # 並返回此鏈接對象
        return connection
    # 建立鏈接
    def make_connection(self):
        "Create a new connection"
        # 若是大於最大鏈接數則拋出異常
        if self._created_connections >= self.max_connections:
            raise ConnectionError("Too many connections")
        # 不然建立的鏈接數++
        self._created_connections += 1
        # 利用建立鏈接類實例化一個鏈接
        return self.connection_class(**self.connection_kwargs)
    # 釋放鏈接
    def release(self, connection):
        "Releases the connection back to the pool"
        self._checkpid()
        if connection.pid != self.pid:
            return
        # 並無關閉鏈接而是從在使用的鏈接列表中刪除此鏈接,回收鏈接對象
        self._in_use_connections.remove(connection)
        # 可用鏈接就回收了一個鏈接對象
        self._available_connections.append(connection)
    # 關閉鏈接
    def disconnect(self):
        "Disconnects all connections in the pool"
        # 關聯獲取全部可迭代對象獲取全部鏈接對象
        all_conns = chain(self._available_connections,
                          self._in_use_connections)
        # 便歷關閉全部鏈接
        for connection in all_conns:
            connection.disconnect()
# 建立鏈接類
class Connection(object):
    "Manages TCP communication to and from a Redis server"
    description_format = "Connection<host=%(host)s,port=%(port)s,db=%(db)s>"

    def __init__(self, host='localhost', port=6379, db=0, password=None,
                 socket_timeout=None, socket_connect_timeout=None,
                 socket_keepalive=False, socket_keepalive_options=None,
                 retry_on_timeout=False, encoding='utf-8',
                 encoding_errors='strict', decode_responses=False,
                 parser_class=DefaultParser, socket_read_size=65536):
        self.pid = os.getpid()
        self.host = host
        self.port = int(port)
        self.db = db
        self.password = password
        self.socket_timeout = socket_timeout
        self.socket_connect_timeout = socket_connect_timeout or socket_timeout
        self.socket_keepalive = socket_keepalive
        self.socket_keepalive_options = socket_keepalive_options or {}
        self.retry_on_timeout = retry_on_timeout
        self.encoding = encoding
        self.encoding_errors = encoding_errors
        self.decode_responses = decode_responses
        self._sock = None
        self._parser = parser_class(socket_read_size=socket_read_size)
        self._description_args = {
            'host': self.host,
            'port': self.port,
            'db': self.db,
        }
        self._connect_callbacks = []

    def __repr__(self):
        return self.description_format % self._description_args
    # 對象刪除時調用disconnect方法關閉對象
    def __del__(self):
        try:
            self.disconnect()
        except Exception:
            pass

    def register_connect_callback(self, callback):
        self._connect_callbacks.append(callback)

    def clear_connect_callbacks(self):
        self._connect_callbacks = []
    # 核心的鏈接方法仍是經過socket模塊兒實現
    def connect(self):
        "Connects to the Redis server if not already connected"
        if self._sock:
            return
        try:
            # 私有方法建立鏈接
            sock = self._connect()
        except socket.error:
            e = sys.exc_info()[1]
            raise ConnectionError(self._error_message(e))

        self._sock = sock
        try:
            self.on_connect()
        except RedisError:
            # clean up after any error in on_connect
            self.disconnect()
            raise

        # run any user callbacks. right now the only internal callback
        # is for pubsub channel/pattern resubscription
        for callback in self._connect_callbacks:
            callback(self)
    # 建立tcp socket
    def _connect(self):
        "Create a TCP socket connection"
        # we want to mimic what socket.create_connection does to support
        # ipv4/ipv6, but we want to set options prior to calling
        # socket.connect()
        err = None
        for res in socket.getaddrinfo(self.host, self.port, 0,
                                      socket.SOCK_STREAM):
            family, socktype, proto, canonname, socket_address = res
            sock = None
            try:
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

                # TCP_KEEPALIVE
                # 默認使用的是短鏈接,設置socket_keepalive=True保持長鏈接
                if self.socket_keepalive:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in iteritems(self.socket_keepalive_options):
                        sock.setsockopt(socket.SOL_TCP, k, v)
                # 設置超市時間,默認不設置爲阻塞模式
                # set the socket_connect_timeout before we connect
                sock.settimeout(self.socket_connect_timeout)

                # connect
                sock.connect(socket_address)

                # set the socket_timeout now that we're connected
                sock.settimeout(self.socket_timeout)
                return sock

            except socket.error as _:
                err = _
                if sock is not None:
                    sock.close()

        if err is not None:
            raise err
        raise socket.error("socket.getaddrinfo returned an empty list")

    def _error_message(self, exception):
        # args for socket.error can either be (errno, "message")
        # or just "message"
        if len(exception.args) == 1:
            return "Error connecting to %s:%s. %s." % \
                (self.host, self.port, exception.args[0])
        else:
            return "Error %s connecting to %s:%s. %s." % \
                (exception.args[0], self.host, self.port, exception.args[1])

    def on_connect(self):
        "Initialize the connection, authenticate and select a database"
        self._parser.on_connect(self)

        # if a password is specified, authenticate
        if self.password:
            self.send_command('AUTH', self.password)
            if nativestr(self.read_response()) != 'OK':
                raise AuthenticationError('Invalid Password')

        # if a database is specified, switch to it
        if self.db:
            self.send_command('SELECT', self.db)
            if nativestr(self.read_response()) != 'OK':
                raise ConnectionError('Invalid Database')
    # 關閉鏈接
    def disconnect(self):
        "Disconnects from the Redis server"
        self._parser.on_disconnect()
        if self._sock is None:
            return
        try:
            # 先shutdown而後再close
            self._sock.shutdown(socket.SHUT_RDWR)
            self._sock.close()
        except socket.error:
            pass
        self._sock = None

    def send_packed_command(self, command):
        "Send an already packed command to the Redis server"
        if not self._sock:
            self.connect()
        try:
            if isinstance(command, str):
                command = [command]
            for item in command:
                self._sock.sendall(item)
        except socket.timeout:
            self.disconnect()
            raise TimeoutError("Timeout writing to socket")
        except socket.error:
            e = sys.exc_info()[1]
            self.disconnect()
            if len(e.args) == 1:
                _errno, errmsg = 'UNKNOWN', e.args[0]
            else:
                _errno, errmsg = e.args
            raise ConnectionError("Error %s while writing to socket. %s." %
                                  (_errno, errmsg))
        except:
            self.disconnect()
            raise

    def send_command(self, *args):
        "Pack and send a command to the Redis server"
        self.send_packed_command(self.pack_command(*args))

    def can_read(self):
        "Poll the socket to see if there's data that can be read."
        sock = self._sock
        if not sock:
            self.connect()
            sock = self._sock
        return bool(select([sock], [], [], 0)[0]) or self._parser.can_read()

    def read_response(self):
        "Read the response from a previously sent command"
        try:
            response = self._parser.read_response()
        except:
            self.disconnect()
            raise
        if isinstance(response, ResponseError):
            raise response
        return response

    def encode(self, value):
        "Return a bytestring representation of the value"
        if isinstance(value, Token):
            return b(value.value)
        elif isinstance(value, bytes):
            return value
        elif isinstance(value, (int, long)):
            value = b(str(value))
        elif isinstance(value, float):
            value = b(repr(value))
        elif not isinstance(value, basestring):
            value = str(value)
        if isinstance(value, unicode):
            value = value.encode(self.encoding, self.encoding_errors)
        return value

    def pack_command(self, *args):
        "Pack a series of arguments into the Redis protocol"
        output = []
        # the client might have included 1 or more literal arguments in
        # the command name, e.g., 'CONFIG GET'. The Redis server expects these
        # arguments to be sent separately, so split the first argument
        # manually. All of these arguements get wrapped in the Token class
        # to prevent them from being encoded.
        command = args[0]
        if ' ' in command:
            args = tuple([Token(s) for s in command.split(' ')]) + args[1:]
        else:
            args = (Token(command),) + args[1:]

        buff = SYM_EMPTY.join(
            (SYM_STAR, b(str(len(args))), SYM_CRLF))

        for arg in imap(self.encode, args):
            # to avoid large string mallocs, chunk the command into the
            # output list if we're sending large values
            if len(buff) > 6000 or len(arg) > 6000:
                buff = SYM_EMPTY.join(
                    (buff, SYM_DOLLAR, b(str(len(arg))), SYM_CRLF))
                output.append(buff)
                output.append(arg)
                buff = SYM_CRLF
            else:
                buff = SYM_EMPTY.join((buff, SYM_DOLLAR, b(str(len(arg))),
                                       SYM_CRLF, arg, SYM_CRLF))
        output.append(buff)
        return output

    def pack_commands(self, commands):
        "Pack multiple commands into the Redis protocol"
        output = []
        pieces = []
        buffer_length = 0

        for cmd in commands:
            for chunk in self.pack_command(*cmd):
                pieces.append(chunk)
                buffer_length += len(chunk)

            if buffer_length > 6000:
                output.append(SYM_EMPTY.join(pieces))
                buffer_length = 0
                pieces = []

        if pieces:
            output.append(SYM_EMPTY.join(pieces))
        return output

總結說明

1.默認狀況下每建立一個Redis實例都會構造出一個ConnectionPool實例,每一次redis都會從這個鏈接池中拿到一個鏈接,操做完成後把該鏈接放回鏈接池(鏈接並無釋放)tcp

2.能夠構造一個ConnectionPool,在建立實例時,能夠將ConnectionPool傳入,後續的操做將會從給定的ConnectionPool得到鏈接,不會再重複建立ConnectionPoolide

3.若是不考慮底層TCP的狀況,鏈接池中的鏈接會在ConnectionPool.disconnect中統一銷燬

相關文章
相關標籤/搜索