赞
踩
# -*- coding: utf-8 -*-
#import os
import sys
import datetime
import logging
import sqlite3
class SqliteTool(object):
#def __init__(self, host, port, user, password, database):
def __init__(self, host, database):
self._host = host
#self._port = port
#self._user = user
#self._password = password
self._database = database
self._pool = None
self._maxconns = 6 # 连接池中最多有多少个连接
print("__init__", self._database)
def init_pool(self): '''初始化连接池''' try: logging.info('Begin to create {0} postgresql pool on:{1}.\n'.format(self._host, datetime.datetime.now())) pool = [] for _ in range(self._maxconns): # check_same_thread=False 支持多线程 conn = sqlite3.connect(self._database, check_same_thread=False) pool.append(conn) self._pool = pool #print("init_pool", self._maxconns, len(self._pool), self._pool) logging.info('SUCCESS: create {0} postgresql pool success on {1}.\n'.format(self._host, datetime.datetime.now())) except Exception as e: logging.error('ERROR: create {0} postgresql pool failed on {1}.\n'.format(self._host, datetime.datetime.now())) self.close_pool() sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e))) def close_pool(self): '''关闭 pool''' if self._pool != None: for conn in self._pool: conn.close() def get_conn(self): if not self._pool: self.init_pool() return self._pool.pop() def close_conn(self, conn): if self._pool: self._pool.append(conn)
# 创建数据表 def create_table(self, sql: str): """ 创建表 :param sql: create sql语句 :return: True表示创建表成功 """ print("create_table", sql) result = False try: conn = self.get_conn() cursor = conn.cursor() cursor.execute(sql) print("[create table success]") result = True except Exception as e: logging.error('ERROR: execute {0} causes error {1} in create_table'.format(sql, str(e))) sys.exit('ERROR: create table from database error caused {0}'.format(str(e))) finally: cursor.close() conn.commit() #conn.close() self.close_conn(conn) return result # 删除数据表 def drop_table(self, sql: str): """ 删除表 :param sql: drop sql语句 :return: True表示删除成功 """ result = False try: conn = self.get_conn() cursor = conn.cursor() cursor.execute(sql) print("[drop table success]") result = True except Exception as e: logging.error('ERROR: execute {0} causes error {1} in drop_table'.format(sql, str(e))) sys.exit('ERROR: drop table from database error caused {0}'.format(str(e))) finally: cursor.close() conn.commit() #conn.close() self.close_conn(conn) return result def exec_insert(self, sql): '''执行插入''' result = False try: conn = self.get_conn() cursor = conn.cursor() cursor.execute(sql) result = True except Exception as e: logging.error('ERROR: execute {0} causes error {1} in exec_insert'.format(sql, str(e))) sys.exit('ERROR: insert data from database error caused {0}'.format(str(e))) finally: cursor.close() conn.commit() #conn.close() self.close_conn(conn) return result def exec_insert_plus(self, table: str, params: dict): '''执行插入''' result = False try: key_tup = tuple(params.keys()) key_str = ",".join(key_tup) # different with psql val_str = ",".join(("?",)*len(key_tup)) sql_str = "insert into " + table + " (" + key_str + ") values (" + val_str + ")" #val_tup = tuple(params.values()) val_tup = () for item in params.values(): if type(item) == list: val_tup += (json.dumps(item),) elif type(item) == str: val_tup += (item,) elif type(item) == int: val_tup += (item,) else: val_tup += (item,) #val_tup.append(str(item)) #print("exec_insert_plus", sql_str, val_tup) conn = self.get_conn() cursor = conn.cursor() cursor.execute(sql_str, val_tup) result = True except Exception as e: logging.error('ERROR: execute {0} causes error {1} in exec_insert_plus'.format(table, str(e))) sys.exit('ERROR: insert data from database error caused {0}'.format(str(e))) finally: cursor.close() conn.commit() #conn.close() self.close_conn(conn) return result # sql = "DELETE from users where user_id='83f7d86b594e4b26a7196ab761afcc7c';" def exec_delete(self, sql): '''执行查询''' result = False try: conn = self.get_conn() cursor = conn.cursor() cursor.execute(sql) result = True except Exception as e: logging.error('ERROR: execute {0} causes error {1} in exec_delete'.format(sql, str(e))) sys.exit('ERROR: delete data from database error caused {0}'.format(str(e))) finally: cursor.close() conn.commit() #conn.close() self.close_conn(conn) return result # 修改单个值 # update tasks set status='running' where task_id='0791216839b04d5c88846817f78280cc'; # 修改多个值 # update tasks set status='running',score='10' where task_id='0791216839b04d5c88846817f78280cc'; def exec_update(self, sql): '''执行更新''' result = False try: conn = self.get_conn() cursor = conn.cursor() cursor.execute(sql) result = True except Exception as e: logging.error('ERROR: execute {0} causes error {1} in exec_update'.format(sql, str(e))) sys.exit('ERROR: update data from database error caused {0}'.format(str(e))) finally: cursor.close() conn.commit() #conn.close() self.close_conn(conn) return result # select * from users where user_name='hello'; def exec_select(self, sql): '''执行查询''' try: conn = self.get_conn() cursor = conn.cursor() cursor.execute(sql) result = cursor.fetchall() #result = cursor.fetchone() except Exception as e: logging.error('ERROR: execute {0} causes error {1} in exec_select'.format(sql, str(e))) sys.exit('ERROR: load data from database error caused {0}'.format(str(e))) finally: cursor.close() #conn.close() #print("exec_select", len(self._pool), self._pool) print("init_pool", self._maxconns, len(self._pool), self._pool) self.close_conn(conn) return result def test_select(self, sql): result = self.exec_select(sql) print("test_select", result) return result
# for test from typing import Optional, List, Dict, Union from pydantic import BaseModel, Field class ...... if __name__ == '__main__': dbhost = "" dbdatabase = "./test.db" db = SqliteTool(dbhost, dbdatabase) class TaskInDB(BaseModel): task_id: str disabled: int def create_tests_table(db): sql_str = "create table if not exists tests(" sql_str += "task_id char(32) primary key," sql_str += "disabled int not null" sql_str += ");" return db.create_table(sql_str) def drop_tests_table(db): sql_str = "drop table if exists tests;" return db.drop_table(sql_str) def get_tests_indb(db): sql_str = "select * from tests;" ret_tasks = db.exec_select(sql_str) return ret_tasks def create_tests_indb(db, task_indb: TaskInDB): #return db.exec_insert_plus("tests", task_indb.model_dump()) return db.exec_insert_plus("tests", task_indb.dict()) # 重建数据库 if not drop_tests_table(db) or not create_tests_table(db): print("ERROR") # 创建两条记录 task_indb = TaskInDB(task_id="11111111", disabled=1) create_tests_indb(db, task_indb) task_indb = TaskInDB(task_id="22222222", disabled=0) create_tests_indb(db, task_indb) # 查询记录 #key_tup = tuple(TaskInDB.model_fields.keys()) key_tup = tuple(TaskInDB.__fields__.keys()) #key_str = ",".join(key_tup) ret_tasks = get_tests_indb(db) for ret_task in ret_tasks: print(ret_task) task_indb = TaskInDB(**{key: ret_task[i] for i,key in enumerate(key_tup)}) print(task_indb) print("OK")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。