pip install --upgrade redis
/usr/lib/python2.7/site-packages/redis
客戶端向服務端發送請求,服務端收到請求,雙方創建鏈接,客戶端給服務端發送消息,服務端迴應客戶端,而後一次讀寫就完成了,這時雙方任何一個均可以發送關閉請求,不過通常是客戶端,短連接管理起來比較方便,存在的鏈接都是有用的,不須要額外的控制手段python
客戶端向服務端發送請求,服務器接收客戶端請求,雙方創建鏈接,客戶端和服務端之間完成一次讀寫以後,它們之間的的鏈接不會主動關閉,後續的操做會繼續使用這個鏈接,可是一旦開啓此鏈接將不會主動斷開,除非客戶端掛掉/主動斷開,可是服務端依然保持半開式鏈接等待客戶端數據,長用TCP保活功能keepalive和超時時間timeout來關閉這些半開式鏈接redis
一個數據庫鏈接對象均爲一個物理數據庫鏈接,每次操做都打開一個物理鏈接,使用完後都關閉鏈接,這樣會形成系統的性能低下,數據庫鏈接池的解決方案是在應用程序啓動時創建足夠多的數據庫鏈接,並將這些鏈接放在一個鏈接池中,由應用程序動態對池中的鏈接進行申請,使用和釋放,對於多餘鏈接池中鏈接數的併發請求,會放在請求中排列等待,而且應用程序能夠根據鏈接池中的鏈接使用率,動態的增長或是減小池中的鏈接數,鏈接池技術儘量多地重用了消耗內存的資源(避免了重複性的鏈接關閉的消耗),大大節省了內存,提升了服務效率,支持更多的客戶端,同時能夠經過其自身的管理機制來監視數據庫的鏈接數和使用狀況數據庫
1.最小鏈接數是鏈接池一直保持的數據庫鏈接,因此全部應用程序對數據庫鏈接的使用量不大,將會形成大量的數據庫鏈接資源被浪費服務器
2.最大鏈接數是鏈接池能申請的最大鏈接數,若是數據庫鏈接請求超過此數,後面的請求將被加入到等待隊列,這會影響以後的數據庫操做併發
/usr/lib/python2.7/site-packages/redis/client.pyapp
# Redis是StrictRedis的子類 class Redis(StrictRedis): """ Provides backwards compatibility with older versions of redis-py that changed arguments to some commands to be more Pythonic, sane, or by accident. """
說明:從如上代碼能夠看出Redis是StrictRedis的子類,兩個類下的方法相似,後者用於實現大部分官方的命令,而前者爲了向後兼容舊版模塊兒,因此官方推薦使用StrictRedis,Redis子類下的LREM/ZADD/SETEX絕對是個坑~要麼本身嘗試,要麼乖乖用StrictRedis~python2.7
# StrictRedis基類 class StrictRedis(object): """ Implementation of the Redis protocol. This abstract class provides a Python interface to all Redis commands and an implementation of the Redis protocol. Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server """ def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None): # 若是connection_pool沒有定義的話就嘗試獲取其它設置後並從新建立鏈接池 if not connection_pool: if charset is not None: warnings.warn(DeprecationWarning( '"charset" is deprecated. Use "encoding" instead')) encoding = charset if errors is not None: warnings.warn(DeprecationWarning( '"errors" is deprecated. Use "encoding_errors" instead')) encoding_errors = errors kwargs = { 'db': db, 'password': password, 'socket_timeout': socket_timeout, 'encoding': encoding, 'encoding_errors': encoding_errors, 'decode_responses': decode_responses, 'retry_on_timeout': retry_on_timeout } # based on input, setup appropriate connection args if unix_socket_path is not None: kwargs.update({ 'path': unix_socket_path, 'connection_class': UnixDomainSocketConnection }) else: # TCP specific options kwargs.update({ 'host': host, 'port': port, 'socket_connect_timeout': socket_connect_timeout, 'socket_keepalive': socket_keepalive, 'socket_keepalive_options': socket_keepalive_options, }) if ssl: kwargs.update({ 'connection_class': SSLConnection, 'ssl_keyfile': ssl_keyfile, 'ssl_certfile': ssl_certfile, 'ssl_cert_reqs': ssl_cert_reqs, 'ssl_ca_certs': ssl_ca_certs, }) connection_pool = ConnectionPool(**kwargs) # 若是沒有已經建立鏈接池則使用已經建立的鏈接池若是沒有鏈接池則默認也會建立鏈接池 self.connection_pool = connection_pool self._use_lua_lock = None self.response_callbacks = self.__class__.RESPONSE_CALLBACKS.copy() # COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] # 從鏈接池中獲取鏈接執行command_name connection = pool.get_connection(command_name, **options) try: connection.send_command(*args) return self.parse_response(connection, command_name, **options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance(e, TimeoutError): raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: pool.release(connection)
/usr/lib/python2.7/site-packages/redis/connection.py:socket
# 建立鏈接池類 class ConnectionPool(object): "Generic connection pool" @classmethod def from_url(cls, url, db=None, **kwargs): """ Return a connection pool configured from the given URL. For example:: redis://[:password]@localhost:6379/0 rediss://[:password]@localhost:6379/0 unix://[:password]@/path/to/socket.sock?db=0 Three URL schemes are supported: redis:// creates a normal TCP socket connection rediss:// creates a SSL wrapped TCP socket connection unix:// creates a Unix Domain Socket connection There are several ways to specify a database number. The parse function will return the first specified option: 1. A ``db`` querystring option, e.g. redis://localhost?db=0 2. If using the redis:// scheme, the path argument of the url, e.g. redis://localhost/0 3. The ``db`` argument to this function. If none of these options are specified, db=0 is used. Any additional querystring arguments and keyword arguments will be passed along to the ConnectionPool class's initializer. In the case of conflicting arguments, querystring arguments always win. """ def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs): """ Create a connection pool. If max_connections is set, then this object raises redis.ConnectionError when the pool's limit is reached. By default, TCP connections are created connection_class is specified. Use redis.UnixDomainSocketConnection for unix sockets. Any additional keyword arguments are passed to the constructor of connection_class. """ # 最大鏈接數默認爲62 max_connections = max_connections or 2 ** 31 if not isinstance(max_connections, (int, long)) or max_connections < 0: raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class self.connection_kwargs = connection_kwargs self.max_connections = max_connections # 初始化線程池 self.reset() def __repr__(self): return "%s<%s>" % ( type(self).__name__, self.connection_class.description_format % self.connection_kwargs, ) # 初始化線程池 def reset(self): # 獲取當前的進程號,後面判斷進程是否掛掉 self.pid = os.getpid() # 存放已經建立的鏈接(計數) self._created_connections = 0 # 存放可用的鏈接對象(列表) self._available_connections = [] # 存放正在使用的鏈接對象(集合) self._in_use_connections = set() # 建立線程鎖 self._check_lock = threading.Lock() def _checkpid(self): if self.pid != os.getpid(): with self._check_lock: if self.pid == os.getpid(): # another thread already did the work while we waited # on the lock. return self.disconnect() self.reset() # 從鏈接池中獲取鏈接 def get_connection(self, command_name, *keys, **options): "Get a connection from the pool" self._checkpid() try: # 從鏈接池中pop出一個鏈接對象 connection = self._available_connections.pop() except IndexError: # 若是鏈接池中已經沒有鏈接的話從新建立鏈接 connection = self.make_connection() # 當前正在使用的鏈接集合中添加此鏈接 self._in_use_connections.add(connection) # 並返回此鏈接對象 return connection # 建立鏈接 def make_connection(self): "Create a new connection" # 若是大於最大鏈接數則拋出異常 if self._created_connections >= self.max_connections: raise ConnectionError("Too many connections") # 不然建立的鏈接數++ self._created_connections += 1 # 利用建立鏈接類實例化一個鏈接 return self.connection_class(**self.connection_kwargs) # 釋放鏈接 def release(self, connection): "Releases the connection back to the pool" self._checkpid() if connection.pid != self.pid: return # 並無關閉鏈接而是從在使用的鏈接列表中刪除此鏈接,回收鏈接對象 self._in_use_connections.remove(connection) # 可用鏈接就回收了一個鏈接對象 self._available_connections.append(connection) # 關閉鏈接 def disconnect(self): "Disconnects all connections in the pool" # 關聯獲取全部可迭代對象獲取全部鏈接對象 all_conns = chain(self._available_connections, self._in_use_connections) # 便歷關閉全部鏈接 for connection in all_conns: connection.disconnect()
# 建立鏈接類 class Connection(object): "Manages TCP communication to and from a Redis server" description_format = "Connection<host=%(host)s,port=%(port)s,db=%(db)s>" def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=False, socket_keepalive_options=None, retry_on_timeout=False, encoding='utf-8', encoding_errors='strict', decode_responses=False, parser_class=DefaultParser, socket_read_size=65536): self.pid = os.getpid() self.host = host self.port = int(port) self.db = db self.password = password self.socket_timeout = socket_timeout self.socket_connect_timeout = socket_connect_timeout or socket_timeout self.socket_keepalive = socket_keepalive self.socket_keepalive_options = socket_keepalive_options or {} self.retry_on_timeout = retry_on_timeout self.encoding = encoding self.encoding_errors = encoding_errors self.decode_responses = decode_responses self._sock = None self._parser = parser_class(socket_read_size=socket_read_size) self._description_args = { 'host': self.host, 'port': self.port, 'db': self.db, } self._connect_callbacks = [] def __repr__(self): return self.description_format % self._description_args # 對象刪除時調用disconnect方法關閉對象 def __del__(self): try: self.disconnect() except Exception: pass def register_connect_callback(self, callback): self._connect_callbacks.append(callback) def clear_connect_callbacks(self): self._connect_callbacks = [] # 核心的鏈接方法仍是經過socket模塊兒實現 def connect(self): "Connects to the Redis server if not already connected" if self._sock: return try: # 私有方法建立鏈接 sock = self._connect() except socket.error: e = sys.exc_info()[1] raise ConnectionError(self._error_message(e)) self._sock = sock try: self.on_connect() except RedisError: # clean up after any error in on_connect self.disconnect() raise # run any user callbacks. right now the only internal callback # is for pubsub channel/pattern resubscription for callback in self._connect_callbacks: callback(self) # 建立tcp socket def _connect(self): "Create a TCP socket connection" # we want to mimic what socket.create_connection does to support # ipv4/ipv6, but we want to set options prior to calling # socket.connect() err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP_KEEPALIVE # 默認使用的是短鏈接,設置socket_keepalive=True保持長鏈接 if self.socket_keepalive: sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in iteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) # 設置超市時間,默認不設置爲阻塞模式 # set the socket_connect_timeout before we connect sock.settimeout(self.socket_connect_timeout) # connect sock.connect(socket_address) # set the socket_timeout now that we're connected sock.settimeout(self.socket_timeout) return sock except socket.error as _: err = _ if sock is not None: sock.close() if err is not None: raise err raise socket.error("socket.getaddrinfo returned an empty list") def _error_message(self, exception): # args for socket.error can either be (errno, "message") # or just "message" if len(exception.args) == 1: return "Error connecting to %s:%s. %s." % \ (self.host, self.port, exception.args[0]) else: return "Error %s connecting to %s:%s. %s." % \ (exception.args[0], self.host, self.port, exception.args[1]) def on_connect(self): "Initialize the connection, authenticate and select a database" self._parser.on_connect(self) # if a password is specified, authenticate if self.password: self.send_command('AUTH', self.password) if nativestr(self.read_response()) != 'OK': raise AuthenticationError('Invalid Password') # if a database is specified, switch to it if self.db: self.send_command('SELECT', self.db) if nativestr(self.read_response()) != 'OK': raise ConnectionError('Invalid Database') # 關閉鏈接 def disconnect(self): "Disconnects from the Redis server" self._parser.on_disconnect() if self._sock is None: return try: # 先shutdown而後再close self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() except socket.error: pass self._sock = None def send_packed_command(self, command): "Send an already packed command to the Redis server" if not self._sock: self.connect() try: if isinstance(command, str): command = [command] for item in command: self._sock.sendall(item) except socket.timeout: self.disconnect() raise TimeoutError("Timeout writing to socket") except socket.error: e = sys.exc_info()[1] self.disconnect() if len(e.args) == 1: _errno, errmsg = 'UNKNOWN', e.args[0] else: _errno, errmsg = e.args raise ConnectionError("Error %s while writing to socket. %s." % (_errno, errmsg)) except: self.disconnect() raise def send_command(self, *args): "Pack and send a command to the Redis server" self.send_packed_command(self.pack_command(*args)) def can_read(self): "Poll the socket to see if there's data that can be read." sock = self._sock if not sock: self.connect() sock = self._sock return bool(select([sock], [], [], 0)[0]) or self._parser.can_read() def read_response(self): "Read the response from a previously sent command" try: response = self._parser.read_response() except: self.disconnect() raise if isinstance(response, ResponseError): raise response return response def encode(self, value): "Return a bytestring representation of the value" if isinstance(value, Token): return b(value.value) elif isinstance(value, bytes): return value elif isinstance(value, (int, long)): value = b(str(value)) elif isinstance(value, float): value = b(repr(value)) elif not isinstance(value, basestring): value = str(value) if isinstance(value, unicode): value = value.encode(self.encoding, self.encoding_errors) return value def pack_command(self, *args): "Pack a series of arguments into the Redis protocol" output = [] # the client might have included 1 or more literal arguments in # the command name, e.g., 'CONFIG GET'. The Redis server expects these # arguments to be sent separately, so split the first argument # manually. All of these arguements get wrapped in the Token class # to prevent them from being encoded. command = args[0] if ' ' in command: args = tuple([Token(s) for s in command.split(' ')]) + args[1:] else: args = (Token(command),) + args[1:] buff = SYM_EMPTY.join( (SYM_STAR, b(str(len(args))), SYM_CRLF)) for arg in imap(self.encode, args): # to avoid large string mallocs, chunk the command into the # output list if we're sending large values if len(buff) > 6000 or len(arg) > 6000: buff = SYM_EMPTY.join( (buff, SYM_DOLLAR, b(str(len(arg))), SYM_CRLF)) output.append(buff) output.append(arg) buff = SYM_CRLF else: buff = SYM_EMPTY.join((buff, SYM_DOLLAR, b(str(len(arg))), SYM_CRLF, arg, SYM_CRLF)) output.append(buff) return output def pack_commands(self, commands): "Pack multiple commands into the Redis protocol" output = [] pieces = [] buffer_length = 0 for cmd in commands: for chunk in self.pack_command(*cmd): pieces.append(chunk) buffer_length += len(chunk) if buffer_length > 6000: output.append(SYM_EMPTY.join(pieces)) buffer_length = 0 pieces = [] if pieces: output.append(SYM_EMPTY.join(pieces)) return output
1.默認狀況下每建立一個Redis實例都會構造出一個ConnectionPool實例,每一次redis都會從這個鏈接池中拿到一個鏈接,操做完成後把該鏈接放回鏈接池(鏈接並無釋放)tcp
2.能夠構造一個ConnectionPool,在建立實例時,能夠將ConnectionPool傳入,後續的操做將會從給定的ConnectionPool得到鏈接,不會再重複建立ConnectionPoolide
3.若是不考慮底層TCP的狀況,鏈接池中的鏈接會在ConnectionPool.disconnect中統一銷燬