在軟件開發中常常要管理各類「鏈接」資源,一般咱們會使用對應的鏈接池來管理,好比mysql數據庫鏈接能夠用sqlalchemy中的池來管理,thrift鏈接能夠經過thriftpool管理,redis-py中的StrictRedis實現自己就是基於鏈接池的,等等。 而今天介紹的socketpool是一個通用的python鏈接池庫,經過它能夠實現任意類型鏈接的管理,雖然不是很完美,但在一些找不到合適鏈接池實現、而又不想本身造輪子的時候使用起來會節省不少精力。html
ConnectionPool的初始化函數python
def __init__(self, factory, retry_max=3, retry_delay=.1, timeout=-1, max_lifetime=600., max_size=10, options=None, reap_connections=True, reap_delay=1, backend="thread"): if isinstance(backend, str): self.backend_mod = load_backend(backend) self.backend = backend else: self.backend_mod = backend self.backend = str(getattr(backend, '__name__', backend)) self.max_size = max_size self.pool = getattr(self.backend_mod, 'PriorityQueue')() self._free_conns = 0 self.factory = factory self.retry_max = retry_max self.retry_delay = retry_delay self.timeout = timeout self.max_lifetime = max_lifetime if options is None: self.options = {"backend_mod": self.backend_mod, "pool": self} else: self.options = options self.options["backend_mod"] = self.backend_mod self.options["pool"] = self # bounded semaphore to make self._alive 'safe' self._sem = self.backend_mod.Semaphore(1) self._reaper = None if reap_connections: self.reap_delay = reap_delay self.start_reaper()
這裏幾個參數的意義:mysql
被啓動的reap就是一個單獨的線程,定時調用下面的方法把過時的conn回收掉:git
def murder_connections(self): current_pool_size = self.pool.qsize() if current_pool_size > 0: for priority, candidate in self.pool: current_pool_size -= 1 if not self.too_old(candidate): self.pool.put((priority, candidate)) else: self._reap_connection(candidate) if current_pool_size <= 0: break
_reap_connection最終會回調conn對象的invalidate方法(Connector的接口)進行銷燬。每次使用完conn後會調用release_connection, 它的邏輯是github
def release_connection(self, conn): if self._reaper is not None: self._reaper.ensure_started() with self._sem: if self.pool.qsize() < self.max_size: connected = conn.is_connected() if connected and not self.too_old(conn): self.pool.put((conn.get_lifetime(), conn)) else: self._reap_connection(conn) else: self._reap_connection(conn)
若是鏈接還沒過時或斷開,就會被從新放入優先級隊列中,用戶能夠經過實現Connector接口的get_lifetime來控制這裏放回的conn的優先級,priority最小的conn下次會被優先取出redis
Connector定義了哪些接口呢?sql
class Connector(object): def matches(self, **match_options): raise NotImplementedError() def is_connected(self): raise NotImplementedError() def handle_exception(self, exception): raise NotImplementedError() def get_lifetime(self): raise NotImplementedError() def invalidate(self): raise NotImplementedError()
matches方法主要用在pool取出一個conn時,除了優先選擇priority最小的conn,還須要這個conn和get(**options)傳入的參數match,這個match就是回調conn的matches方法。其餘幾個接口前面都涉及到了。數據庫
來看一下socketpool自帶的TcpConnector的實現,實現tcp socket的工廠後端
class TcpConnector(Connector): def __init__(self, host, port, backend_mod, pool=None): self._s = backend_mod.Socket(socket.AF_INET, socket.SOCK_STREAM) self._s.connect((host, port)) self.host = host self.port = port self.backend_mod = backend_mod self._connected = True # use a 'jiggle' value to make sure there is some # randomization to expiry, to avoid many conns expiring very # closely together. self._life = time.time() - random.randint(0, 10) self._pool = pool def __del__(self): self.release() def matches(self, **match_options): target_host = match_options.get('host') target_port = match_options.get('port') return target_host == self.host and target_port == self.port def is_connected(self): if self._connected: return util.is_connected(self._s) return False def handle_exception(self, exception): print('got an exception') print(str(exception)) def get_lifetime(self): return self._life def invalidate(self): self._s.close() self._connected = False self._life = -1 def release(self): if self._pool is not None: if self._connected: self._pool.release_connection(self) else: self._pool = None def send(self, data): return self._s.send(data) def recv(self, size=1024): return self._s.recv(size)
不須要太多額外解釋。併發
根據自身項目須要,我用pyhs2實現了一個hive鏈接池
class HiveConnector(Connector): def __init__(self, host, port, backend_mod, pool=None, authMechanism='NOSASL', **options): self.host = host self.port = port self.backend_mod = backend_mod self._pool = pool self._connected = False self._conn = pyhs2.connect(host=host, port=port, authMechanism=authMechanism ) self._connected = True # use a 'jiggle' value to make sure there is some # randomization to expiry, to avoid many conns expiring very # closely together. self._life = time.time() - random.randint(0, 10) def __del__(self): self.release() def matches(self, **match_options): target_host = match_options.get('host') target_port = match_options.get('port') return target_host == self.host and target_port == self.port def is_connected(self): return self._connected def handle_exception(self, exception): logger.exception("error: %s" % str(exception)) def get_lifetime(self): return self._life def invalidate(self): try: self._conn.close() except: pass finally: self._connected = False self._life = -1 def release(self): if self._pool is not None: if self._connected: self._pool.release_connection(self) else: self._pool = None def cursor(self): return self._conn.cursor() def execute(self, hql): with self.curosr() as cur: return cur.execute(hql) hive_pool = ConnectionPool(factory=HiveConnector, **HIVE_CONNECTOR_CONFIG)
使用這個hive_pool去執行hql語句很是容易:
with hive_pool.connection() as conn: with conn.cursor() as cur: print cur.getDatabases()
簡紹了socketpool的內部實現,以及如何使用它構造本身的鏈接池。