實現一個簡單的ConnectionPool

看了一圈, 沒看到稍微好用的ConnectionPool, 除了一個aiomysql, 可是這個是異步的, 我暫時沒有用到這麼高版本的Python, 因此就動手造一個輪子.mysql

原理比較簡單, 先造一個線程安全的集合, 無非就是Lock+Set, 而後修改PyMySQL的close方法, 把實例對象和個人這個集合關聯起來, close的時候丟進集合裏面git

這裏是代碼:github

import threading
import pymysql


def new_close(conn):
    if conn.pooling != None:
        conn.pooling.put(conn)
    elif conn.old_close != None:
        conn.old_close()

class Pool(object):
    def __init__(self, create_instance, max_count=10, timeout=10):
        self.lock = threading.Lock()
        self.condition = threading.Condition(self.lock)
        self.in_use_list = set()
        self.free_list = set()
        self.max_count = max_count or 10
        self.timeout = timeout or 10
        self.new_instance = create_instance
        assert (create_instance != None)
        self.__change_pymysql_close()

    def __change_pymysql_close(self):
        old_close = pymysql.connections.Connection.close
        if old_close == new_close:
            return
        pymysql.connections.Connection.close = new_close
        pymysql.connections.Connection.old_close = old_close

    def get(self):
        """get one from free list"""
        with self.lock:
            if len(self.free_list) > 0:
                one = self.free_list.pop()
                self.in_use_list.add(one)
                return one
            if len(self.in_use_list) < self.max_count:
                one = self.new_instance()
                one.pooling = self
                self.free_list.add(one)
            if len(self.free_list) <= 0:
                self.condition.wait(self.timeout)
                if len(self.free_list) <= 0:
                    raise TimeoutError()
            one = self.free_list.pop()
            self.in_use_list.add(one)
            return one

    def put(self, value):
        """put one into free list"""
        with self.lock:
            self.in_use_list.remove(value)
            self.free_list.add(value)
            self.condition.notify_all()

    def size(self):
        with self.lock:
            return len(self.free_list) + len(self.in_use_list)

    def max_size(self):
        return self.max_count

 

這裏是使用的代碼, 你只須要像往常同樣寫代碼, 不須要調用額外的put函數把connection還回去:sql

def create_conn():
    return pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, password=mysql_password, database=mysql_db,
                    charset='utf8', autocommit=True)


pool = pymysqlpool.Pool(create_instance=create_conn)

conn = pool.get()
cur = conn.cursor()
cur.execute("select 1")
for x in cur:
    print(x)

cur.close()
conn.close()

 

https://github.com/egmkang/PyMySQLPool安全

https://github.com/aio-libs/aiomysql異步

相關文章
相關標籤/搜索