python socketpool:通用鏈接池(轉)

簡介

在軟件開發中常常要管理各類「鏈接」資源,一般咱們會使用對應的鏈接池來管理,好比mysql數據庫鏈接能夠用sqlalchemy中的池來管理,thrift鏈接能夠經過thriftpool管理,redis-py中的StrictRedis實現自己就是基於鏈接池的,等等。 而今天介紹的socketpool是一個通用的python鏈接池庫,經過它能夠實現任意類型鏈接的管理,雖然不是很完美,但在一些找不到合適鏈接池實現、而又不想本身造輪子的時候使用起來會節省不少精力。html

內部實現要點

  • 這個類庫的代碼其實並非特別的漂亮,但結構設計的不錯,關鍵留下了對拓展開放的鉤子,能讓使用者根據本身的須要定製本身的鏈接池
  • 內部主要的組件有ConnectionPool,Connector和backend_mod三個
    • ConnectionPool實現了一個鏈接池的通用邏輯,用一個優先級隊列管理全部鏈接,另外支持connection的生命週期定製,有一個reap機制(可選),基本思想是每一個conn有一個最大生命週期,好比600秒,過了這個時間,就必須回收掉,reap線程(也有多是greenlet或eventlet)按期檢查過時的conn並進行回收
    • Connector是一個接口,它能夠看作是一個製造conn的工廠,ConnectionPool在須要新建conn的時候,會經過這個工廠來生成conn。因此咱們只要實現Connector的接口方法就能夠定製一個本身的鏈接工廠
    • backend_mod是爲了支持不一樣的線程模型(好比python原生線程,gevent或者eventlet)抽象出來的後端模塊,它統一封裝了Socket, PriorityQueue, Semaphore等和併發模型相關的組件,在創造ConnectionPool對象時能夠經過參數控制選用哪一種backend

部分代碼閱讀

ConnectionPool的初始化函數python

複製代碼
     def __init__(self, factory,
                  retry_max=3, retry_delay=.1,
                  timeout=-1, max_lifetime=600.,
                  max_size=10, options=None,
                  reap_connections=True, reap_delay=1,
                  backend="thread"):

         if isinstance(backend, str):
             self.backend_mod = load_backend(backend)
             self.backend = backend
         else:
             self.backend_mod = backend
             self.backend = str(getattr(backend, '__name__', backend))
         self.max_size = max_size
         self.pool = getattr(self.backend_mod, 'PriorityQueue')()
         self._free_conns = 0
         self.factory = factory
         self.retry_max = retry_max
         self.retry_delay = retry_delay
         self.timeout = timeout
         self.max_lifetime = max_lifetime
         if options is None:
             self.options = {"backend_mod": self.backend_mod,
                             "pool": self}
         else:
             self.options = options
             self.options["backend_mod"] = self.backend_mod
             self.options["pool"] = self

         # bounded semaphore to make self._alive 'safe'
         self._sem = self.backend_mod.Semaphore(1)

         self._reaper = None
         if reap_connections:
             self.reap_delay = reap_delay
             self.start_reaper()
複製代碼
 

這裏幾個參數的意義:mysql

  • factory是類對象,須要實現Connector接口,用來生成conn,options是調用factory時傳入的參數
  • retry_max是獲取conn時若是出錯最多重試幾回
  • max_lifetime是規定每一個conn最大生命時間,見上面說的reap機制
  • max_size是這個pool的大小上限
  • backend是線程模型
  • reap_connections控制是否啓用reap機制

被啓動的reap就是一個單獨的線程,定時調用下面的方法把過時的conn回收掉:git

複製代碼
     def murder_connections(self):
         current_pool_size = self.pool.qsize()
         if current_pool_size > 0:
             for priority, candidate in self.pool:
                 current_pool_size -= 1
                 if not self.too_old(candidate):
                     self.pool.put((priority, candidate))
                 else:
                     self._reap_connection(candidate)
                 if current_pool_size <= 0:
                     break
複製代碼

 

_reap_connection最終會回調conn對象的invalidate方法(Connector的接口)進行銷燬。每次使用完conn後會調用release_connection, 它的邏輯是github

複製代碼
     def release_connection(self, conn):
         if self._reaper is not None:
             self._reaper.ensure_started()

         with self._sem:
             if self.pool.qsize() < self.max_size:
                 connected = conn.is_connected()
                 if connected and not self.too_old(conn):
                     self.pool.put((conn.get_lifetime(), conn))
                 else:
                     self._reap_connection(conn)
             else:
                 self._reap_connection(conn)
複製代碼

若是鏈接還沒過時或斷開,就會被從新放入優先級隊列中,用戶能夠經過實現Connector接口的get_lifetime來控制這裏放回的conn的優先級,priority最小的conn下次會被優先取出redis

Connector定義了哪些接口呢?sql

複製代碼
 class Connector(object):
     def matches(self, **match_options):
         raise NotImplementedError()

     def is_connected(self):
         raise NotImplementedError()

     def handle_exception(self, exception):
         raise NotImplementedError()

     def get_lifetime(self):
         raise NotImplementedError()

     def invalidate(self):
         raise NotImplementedError()
複製代碼

matches方法主要用在pool取出一個conn時,除了優先選擇priority最小的conn,還須要這個conn和get(**options)傳入的參數match,這個match就是回調conn的matches方法。其餘幾個接口前面都涉及到了。數據庫

TcpConnector實現

來看一下socketpool自帶的TcpConnector的實現,實現tcp socket的工廠後端

複製代碼
 class TcpConnector(Connector):

     def __init__(self, host, port, backend_mod, pool=None):
         self._s = backend_mod.Socket(socket.AF_INET, socket.SOCK_STREAM)
         self._s.connect((host, port))
         self.host = host
         self.port = port
         self.backend_mod = backend_mod
         self._connected = True
         # use a 'jiggle' value to make sure there is some
         # randomization to expiry, to avoid many conns expiring very
         # closely together.
         self._life = time.time() - random.randint(0, 10)
         self._pool = pool

     def __del__(self):
         self.release()

     def matches(self, **match_options):
         target_host = match_options.get('host')
         target_port = match_options.get('port')
         return target_host == self.host and target_port == self.port

     def is_connected(self):
         if self._connected:
             return util.is_connected(self._s)
         return False

     def handle_exception(self, exception):
         print('got an exception')
         print(str(exception))

     def get_lifetime(self):
         return self._life

     def invalidate(self):
         self._s.close()
         self._connected = False
         self._life = -1

     def release(self):
         if self._pool is not None:
             if self._connected:
                 self._pool.release_connection(self)
             else:
                 self._pool = None

     def send(self, data):
         return self._s.send(data)

     def recv(self, size=1024):
         return self._s.recv(size)
複製代碼

 

不須要太多額外解釋。併發

 

拓展實現HiveConnector

根據自身項目須要,我用pyhs2實現了一個hive鏈接池

複製代碼
 class HiveConnector(Connector):

     def __init__(self, host, port, backend_mod, pool=None, authMechanism='NOSASL',
                  **options):
         self.host = host
         self.port = port
         self.backend_mod = backend_mod
         self._pool = pool
         self._connected = False
         self._conn = pyhs2.connect(host=host,
                                    port=port,
                                    authMechanism=authMechanism
                                    )
         self._connected = True
         # use a 'jiggle' value to make sure there is some
         # randomization to expiry, to avoid many conns expiring very
         # closely together.
         self._life = time.time() - random.randint(0, 10)

     def __del__(self):
         self.release()

     def matches(self, **match_options):
         target_host = match_options.get('host')
         target_port = match_options.get('port')
         return target_host == self.host and target_port == self.port

     def is_connected(self):
         return self._connected

     def handle_exception(self, exception):
         logger.exception("error: %s" % str(exception))

     def get_lifetime(self):
         return self._life

     def invalidate(self):
         try:
             self._conn.close()
         except:
             pass
         finally:
             self._connected = False
             self._life = -1

     def release(self):
         if self._pool is not None:
             if self._connected:
                 self._pool.release_connection(self)
             else:
                 self._pool = None

     def cursor(self):
         return self._conn.cursor()

     def execute(self, hql):
         with self.curosr() as cur:
             return cur.execute(hql)

 hive_pool = ConnectionPool(factory=HiveConnector, **HIVE_CONNECTOR_CONFIG)
複製代碼

使用這個hive_pool去執行hql語句很是容易:

     with hive_pool.connection() as conn:
         with conn.cursor() as cur:
             print cur.getDatabases()

總結

簡紹了socketpool的內部實現,以及如何使用它構造本身的鏈接池。

 

轉自:http://www.javashuo.com/article/p-sjlcjbci-k.html

相關文章
相關標籤/搜索