一 概述
一般python 鏈接redis數據庫用的redis-py 這個三方庫,最近研究了一下它的源碼python
它鏈接服務器的函數是這個 _connect 函數redis
def _connect(self****): """建立一個tcp socket 鏈接""" # we want to mimic wha****t socket.create_connection does to support # ipv4/ipv6, but we want to set options prior to calling # socket.connect() err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP 長鏈接 if self.socket_keepalive: sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in iteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) # 咱們鏈接以前設置socket_connect_timeout超時時間 sock.settimeout(self.socket_connect_timeout) # 鏈接 sock.connect(socket_address) # 鏈接成功後設置 socket_timeout 時間 sock.settimeout(self.socket_timeout) return sock except socket.error as _: err = _ if sock is not None: sock.close() if err is not None: raise err raise socket.error("socket.getaddrinfo returned an empty list")
on_connect 函數,實列化鏈接,有密碼則發送密碼,選擇數據表數據庫
def on_connect(self): "Initialize the connection, authenticate and select a database" self._parser.on_connect(self) # if a password is specified, authenticate if self.password: # 若是redis服務作了進一步的安全加固,也是在這裏進行鑑權 self.send_command('AUTH', self.password) if nativestr(self.read_response()) != 'OK': raise AuthenticationError('Invalid Password') # if a database is specified, switch to it if self.db: self.send_command('SELECT', self.db) if nativestr(self.read_response()) != 'OK': raise ConnectionError('Invalid Database')
執行命令函數 execute_command,從鏈接池獲取鏈接併發送命令安全
def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] connection = pool.get_connection(command_name, **options) try: connection.send_command(*args) return self.parse_response(connection, command_name, **options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance(e, TimeoutError): raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: pool.release(connection)