赞
踩
使用数据库连接池,需要安装三方库 dbutils:
pip install pymysql
pip install dbutils
import threading import pymysql from dbutils.pooled_db import PooledDB POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 mincached=2, # 初始化时,连接池中最少创建的连接个数,0表示不创建 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 blocking=True, # 连接池中如果没有可用连接,是否阻塞等待。True=等待,False=不等待然后报错 ping=0, # ping mysql服务器,检查服务是否可用。0=None=never host='localhost', port=3306, user="xxx", password='xxx', database="database_name", charset="utf8" ) def search_all(idx): conn = POOL.connection() # 去连接池中获取一个连接 cursor = conn.cursor() cursor.execute("SELECT SLEEP(3)") result = cursor.fetchall() cursor.close() conn.close() # 将连接放回到连接池中 print(f"{idx} =========> {result}") for i in range(50): td = threading.Thread(target=search_all, args=(i, )) td.start() 结果打印输出: 1 =========> ((0,),) 0 =========> ((0,),) 2 =========> ((0,),) 3 =========> ((0,),) 4 =========> ((0,),) 5 =========> ((0,),) 等三秒后继续打印 6 =========> ((0,),) 7 =========> ((0,),) 8 =========> ((0,),) 9 =========> ((0,),) 10 =========> ((0,),) 11 =========> ((0,),) ...
common_db.py
# 使用:单独作为一个文件,别的py文件要操作数据库时,需要 import 本文件的db import pymysql from dbutils.pooled_db import PooledDB class SqlHelper: def __init__(self): self.pool = PooledDB( creator=pymysql, # 使用连接数据库的模块 mincached=2, # 初始化时,连接池中最少创建的连接个数,0表示不创建 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 blocking=True, # 连接池中如果没有可用连接,是否阻塞等待。True=等待,False=不等待然后报错 ping=0, # ping mysql服务器,检查服务是否可用。0=None=never host='localhost', port=3306, user="xxx", password='xxx', database="database_name", charset="utf8" ) def open(self): """从连接池获取一个连接并创建cursor""" conn = self.pool.connection() cursor = conn.cursor() return conn, cursor def close(self, cursor, conn): """关闭cursor,并把连接放回到连接池中""" cursor.close() conn.close() def search_one(self, sql, *args): """查询单条数据""" conn, cursor = self.open() cursor.execute(sql, args) result = cursor.fetchone() self.close(cursor, conn) return result def search_all(self, sql, *args): """查询全部数据""" conn, cursor = self.open() cursor.execute(sql, args) result = cursor.fetchall() self.close(cursor, conn) return result db = SqlHelper()
# 在其他文件中要使用先导入db
from common_db import db
search_res = db.search_one("SELECT * FROM test_primary_key WHERE value = %s", 114)
print(search_res)
common_db.py
import threading import pymysql from dbutils.pooled_db import PooledDB class SqlHelper: def __init__(self): self.pool = PooledDB( creator=pymysql, # 使用连接数据库的模块 mincached=2, # 初始化时,连接池中最少创建的连接个数,0表示不创建 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 blocking=True, # 连接池中如果没有可用连接,是否阻塞等待。True=等待,False=不等待然后报错 ping=0, # ping mysql服务器,检查服务是否可用。0=None=never host='localhost', port=3306, user="xxx", password='xxx', database="database_name", charset="utf8" ) self.local = threading.local() def open(self): """从连接池获取一个连接并创建cursor""" conn = self.pool.connection() cursor = conn.cursor() return conn, cursor def close(self, cursor, conn): """关闭cursor,并把连接放回到连接池中""" cursor.close() conn.close() def __enter__(self): """利用threading.local特性(为每一个线程开辟一个栈 来存数据), 存放创建的连接和游标,然后返回游标""" conn, cursor = self.open() cc = getattr(self.local, "stack", None) if not cc: self.local.stack = [(conn, cursor), ] # 给对象.local添加stack属性 else: cc.append((conn, cursor)) self.local.stack = cc return cursor def __exit__(self, exc_type, exc_val, exc_tb): """利用栈特性(后进先出),弹出之前创建的连接和游标,关闭游标,连接放回到连接池""" cc = getattr(self.local, "stack", None) if not cc: return conn, cursor = cc.pop() self.close(cursor, conn) db = SqlHelper()
# 在其他文件中要使用先导入db
from common_db import db
with db as cursor:
cursor.execute("SELECT * FROM test_primary_key")
query_all = cursor.fetchall()
print(f"query all result : {query_all}")
with db as cursor:
cursor.execute("SELECT * FROM test_primary_key WHERE value = 114")
query_one = cursor.fetchone()
print(f"query one result : {query_one}")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。