看了一圈, 没看到稍微好用的ConnectionPool, 除了一个aiomysql, 但是这个是异步的, 我暂时没有用到这么高版本的Python, 所以就动手造一个轮子. 原理比较简单, 先造一个线程安全的集合, 无非就是Lock+Set, 然后修改PyMySQL的close方法, 把实例对象和我的这个集合关联起来, close的时候丢进集合里面 这里是代码: 复制代码
import threading
import pymysql def new_close(conn):
if conn.pooling != None:
conn.pooling.put(conn)
elif conn.old_close != None:
conn.old_close() class Pool(object):
def __init__(self, create_instance, max_count=10, timeout=10):
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.in_use_list = set()
self.free_list = set()
self.max_count = max_count or 10
self.timeout = timeout or 10
self.new_instance = create_instance
assert (create_instance != None)
self.__change_pymysql_close() def __change_pymysql_close(self):
old_close = pymysql.connections.Connection.close
if old_close == new_close:
return
pymysql.connections.Connection.close = new_close
pymysql.connections.Connection.old_close = old_close def get(self):
"""get one from free list"""
with self.lock:
if len(self.free_list) > 0:
one = self.free_list.pop()
self.in_use_list.add(one)
return one
if len(self.in_use_list) < self.max_count:
one = self.new_instance()
one.pooling = self
self.free_list.add(one)
if len(self.free_list) <= 0:
self.condition.wait(self.timeout)
if len(self.free_list) <= 0:
raise TimeoutError()
one = self.free_list.pop()
self.in_use_list.add(one)
return one def put(self, value):
"""put one into free list"""
with self.lock:
self.in_use_list.remove(value)
self.free_list.add(value)
self.condition.notify_all() def size(self):
with self.lock:
return len(self.free_list) + len(self.in_use_list) def max_size(self):
return self.max_count
复制代码 这里是使用的代码, 你只需要像往常一样写代码, 不需要调用额外的put函数把connection还回去: 复制代码
def create_conn():
return pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, password=mysql_password, database=mysql_db,
charset='utf8', autocommit=True) pool = pymysqlpool.Pool(create_instance=create_conn) conn = pool.get()
cur = conn.cursor()
cur.execute("select 1")
for x in cur:
print(x) cur.close()
conn.close()
复制代码