{"contents":"# -*- coding: utf-8 -*-#\n# -------------------------------------------------------------------------------\n# Name: dbbase.py\n# Description: \n# Author: xaoyaoo\n# Date: 2024/04/15\n# -------------------------------------------------------------------------------\nimport importlib\nimport os\nimport sqlite3\nimport time\n\nfrom .utils import db_loger\nfrom dbutils.pooled_db import PooledDB\n\n\n# import logging\n#\n# db_loger = logging.getLogger(\"db_prepare\")\n\n\nclass DatabaseSingletonBase:\n # _singleton_instances = {} # 使用字典存储不同db_path对应的单例实例\n _class_name = \"DatabaseSingletonBase\"\n _db_pool = {} # 使用字典存储不同db_path对应的连接池\n\n # def __new__(cls, *args, **kwargs):\n # if cls._class_name not in cls._singleton_instances:\n # cls._singleton_instances[cls._class_name] = super().__new__(cls)\n # return cls._singleton_instances[cls._class_name]\n\n @classmethod\n def connect(cls, db_config):\n \"\"\"\n 连接数据库,如果增加其他数据库连接,则重写该方法\n :param db_config: 数据库配置\n :return: 连接池\n \"\"\"\n if not db_config:\n raise ValueError(\"db_config 不能为空\")\n db_key = db_config.get(\"key\", \"xaoyaoo_741852963\")\n db_type = db_config.get(\"type\", \"sqlite\")\n if db_key in cls._db_pool and cls._db_pool[db_key] is not None:\n return cls._db_pool[db_key]\n\n if db_type == \"sqlite\":\n db_path = db_config.get(\"path\", \"\")\n if not os.path.exists(db_path):\n raise FileNotFoundError(f\"文件不存在: {db_path}\")\n pool = PooledDB(\n creator=sqlite3, # 使用 sqlite3 作为连接创建者\n maxconnections=0, # 连接池最大连接数\n mincached=4, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建\n maxusage=1, # 一个链接最多被重复使用的次数,None表示无限制\n blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错\n ping=0, # ping 数据库判断是否服务正常\n database=db_path\n )\n elif db_type == \"mysql\":\n mysql_config = {\n 'user': db_config['user'],\n 'host': db_config['host'],\n 'password': db_config['password'],\n 'database': db_config['database'],\n 'port': db_config['port']\n }\n pool = PooledDB(\n creator=importlib.import_module('pymysql'), # 使用 mysql 作为连接创建者\n ping=1, # ping 数据库判断是否服务正常\n **mysql_config\n )\n else:\n raise ValueError(f\"不支持的数据库类型: {db_type}\")\n\n db_loger.info(f\"{pool} 连接句柄创建 {db_config}\")\n cls._db_pool[db_key] = pool\n return pool\n\n\nclass DatabaseBase(DatabaseSingletonBase):\n _class_name = \"DatabaseBase\"\n existed_tables = []\n\n def __init__(self, db_config):\n \"\"\"\n db_config = {\n \"key\": \"test1\",\n \"type\": \"sqlite\",\n \"path\": r\"C:\\***\\wxdump_work\\merge_all.db\"\n }\n \"\"\"\n self.config = db_config\n self.pool = self.connect(self.config)\n self.__get_existed_tables()\n\n def __get_existed_tables(self):\n sql = \"SELECT tbl_name FROM sqlite_master WHERE type = 'table' and tbl_name!='sqlite_sequence';\"\n existing_tables = self.execute(sql)\n if existing_tables:\n self.existed_tables = [row[0].lower() for row in existing_tables]\n return self.existed_tables\n else:\n return None\n\n def tables_exist(self, required_tables: str or list):\n \"\"\"\n 判断该类所需要的表是否存在\n Check if all required tables exist in the database.\n Args:\n required_tables (list or str): A list of table names or a single table name string.\n Returns:\n bool: True if all required tables exist, False otherwise.\n \"\"\"\n if isinstance(required_tables, str):\n required_tables = [required_tables]\n rbool = all(table.lower() in self.existed_tables for table in (required_tables or []))\n if not rbool: db_loger.warning(f\"{required_tables=}\\n{self.existed_tables=}\\n{rbool=}\")\n return rbool\n\n def execute(self, sql, params=None):\n \"\"\"\n 执行SQL语句\n :param sql: SQL语句 (str)\n :param params: 参数 (tuple)\n :return: 查询结果 (list)\n \"\"\"\n connection = self.pool.connection()\n try:\n # connection.text_factory = bytes\n cursor = connection.cursor()\n if params:\n cursor.execute(sql, params)\n else:\n cursor.execute(sql)\n return cursor.fetchall()\n except Exception as e1:\n try:\n connection.text_factory = bytes\n cursor = connection.cursor()\n if params:\n cursor.execute(sql, params)\n else:\n cursor.execute(sql)\n rdata = cursor.fetchall()\n connection.text_factory = str\n return rdata\n except Exception as e2:\n db_loger.error(f\"{sql=}\\n{params=}\\n{e1=}\\n{e2=}\\n\", exc_info=True)\n return None\n finally:\n connection.close()\n\n def close(self):\n self.pool.close()\n db_loger.info(f\"关闭数据库 - {self.config}\")\n\n def __del__(self):\n self.close()\n\n# class MsgDb(DatabaseBase):\n#\n# def p(self, *args, **kwargs):\n# sel = \"select tbl_name from sqlite_master where type='table'\"\n# data = self.execute(sel)\n# # print([i[0] for i in data])\n# return data\n#\n#\n# class MsgDb1(DatabaseBase):\n# _class_name = \"MsgDb1\"\n#\n# def p(self, *args, **kwargs):\n# sel = \"select tbl_name from sqlite_master where type='table'\"\n# data = self.execute(sel)\n# # print([i[0] for i in data])\n# return data\n#\n#\n# if __name__ == '__main__':\n# logging.basicConfig(level=logging.INFO,\n# style='{',\n# datefmt='%Y-%m-%d %H:%M:%S',\n# format='[{levelname[0]}] {asctime} [{name}:{levelno}] {pathname}:{lineno} {message}'\n# )\n#\n# config1 = {\n# \"key\": \"test1\",\n# \"type\": \"sqlite\",\n# \"path\": r\"D:\\e_all.db\"\n# }\n# config2 = {\n# \"key\": \"test2\",\n# \"type\": \"sqlite\",\n# \"path\": r\"D:\\_call.db\"\n# }\n#\n# t1 = MsgDb(config1)\n# t1.p()\n# t2 = MsgDb(config2)\n# t2.p()\n# t3 = MsgDb1(config1)\n# t3.p()\n# t4 = MsgDb1(config2)\n# t4.p()\n#\n# print(t4._db_pool)\n# # 销毁t1\n# del t1\n# # 销毁t2\n# del t2\n# del t3\n#\n# # 销毁t4\n# del t4\n# import time\n# time.sleep(1)\n#\n# t1 = MsgDb(config1)\n# t1.p()\n# t2 = MsgDb(config2)\n# t2.p()\n#\n#\n# print(t2._db_pool)\n","is_binary":false,"path":"wxdump_linux/db/db_base.py","ref":""}