解锁并提取Linux客户端微信数据库 (vibe coded)
at 224 lines 7.3 kB view raw
1# -*- coding: utf-8 -*-# 2# ------------------------------------------------------------------------------- 3# Name: dbbase.py 4# Description: 5# Author: xaoyaoo 6# Date: 2024/04/15 7# ------------------------------------------------------------------------------- 8import importlib 9import os 10import sqlite3 11import time 12 13from .utils import db_loger 14from dbutils.pooled_db import PooledDB 15 16 17# import logging 18# 19# db_loger = logging.getLogger("db_prepare") 20 21 22class DatabaseSingletonBase: 23 # _singleton_instances = {} # 使用字典存储不同db_path对应的单例实例 24 _class_name = "DatabaseSingletonBase" 25 _db_pool = {} # 使用字典存储不同db_path对应的连接池 26 27 # def __new__(cls, *args, **kwargs): 28 # if cls._class_name not in cls._singleton_instances: 29 # cls._singleton_instances[cls._class_name] = super().__new__(cls) 30 # return cls._singleton_instances[cls._class_name] 31 32 @classmethod 33 def connect(cls, db_config): 34 """ 35 连接数据库,如果增加其他数据库连接,则重写该方法 36 :param db_config: 数据库配置 37 :return: 连接池 38 """ 39 if not db_config: 40 raise ValueError("db_config 不能为空") 41 db_key = db_config.get("key", "xaoyaoo_741852963") 42 db_type = db_config.get("type", "sqlite") 43 if db_key in cls._db_pool and cls._db_pool[db_key] is not None: 44 return cls._db_pool[db_key] 45 46 if db_type == "sqlite": 47 db_path = db_config.get("path", "") 48 if not os.path.exists(db_path): 49 raise FileNotFoundError(f"文件不存在: {db_path}") 50 pool = PooledDB( 51 creator=sqlite3, # 使用 sqlite3 作为连接创建者 52 maxconnections=0, # 连接池最大连接数 53 mincached=4, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 54 maxusage=1, # 一个链接最多被重复使用的次数,None表示无限制 55 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 56 ping=0, # ping 数据库判断是否服务正常 57 database=db_path 58 ) 59 elif db_type == "mysql": 60 mysql_config = { 61 'user': db_config['user'], 62 'host': db_config['host'], 63 'password': db_config['password'], 64 'database': db_config['database'], 65 'port': db_config['port'] 66 } 67 pool = PooledDB( 68 creator=importlib.import_module('pymysql'), # 使用 mysql 作为连接创建者 69 ping=1, # ping 数据库判断是否服务正常 70 **mysql_config 71 ) 72 else: 73 raise ValueError(f"不支持的数据库类型: {db_type}") 74 75 db_loger.info(f"{pool} 连接句柄创建 {db_config}") 76 cls._db_pool[db_key] = pool 77 return pool 78 79 80class DatabaseBase(DatabaseSingletonBase): 81 _class_name = "DatabaseBase" 82 existed_tables = [] 83 84 def __init__(self, db_config): 85 """ 86 db_config = { 87 "key": "test1", 88 "type": "sqlite", 89 "path": r"C:\***\wxdump_work\merge_all.db" 90 } 91 """ 92 self.config = db_config 93 self.pool = self.connect(self.config) 94 self.__get_existed_tables() 95 96 def __get_existed_tables(self): 97 sql = "SELECT tbl_name FROM sqlite_master WHERE type = 'table' and tbl_name!='sqlite_sequence';" 98 existing_tables = self.execute(sql) 99 if existing_tables: 100 self.existed_tables = [row[0].lower() for row in existing_tables] 101 return self.existed_tables 102 else: 103 return None 104 105 def tables_exist(self, required_tables: str or list): 106 """ 107 判断该类所需要的表是否存在 108 Check if all required tables exist in the database. 109 Args: 110 required_tables (list or str): A list of table names or a single table name string. 111 Returns: 112 bool: True if all required tables exist, False otherwise. 113 """ 114 if isinstance(required_tables, str): 115 required_tables = [required_tables] 116 rbool = all(table.lower() in self.existed_tables for table in (required_tables or [])) 117 if not rbool: db_loger.warning(f"{required_tables=}\n{self.existed_tables=}\n{rbool=}") 118 return rbool 119 120 def execute(self, sql, params=None): 121 """ 122 执行SQL语句 123 :param sql: SQL语句 (str) 124 :param params: 参数 (tuple) 125 :return: 查询结果 (list) 126 """ 127 connection = self.pool.connection() 128 try: 129 # connection.text_factory = bytes 130 cursor = connection.cursor() 131 if params: 132 cursor.execute(sql, params) 133 else: 134 cursor.execute(sql) 135 return cursor.fetchall() 136 except Exception as e1: 137 try: 138 connection.text_factory = bytes 139 cursor = connection.cursor() 140 if params: 141 cursor.execute(sql, params) 142 else: 143 cursor.execute(sql) 144 rdata = cursor.fetchall() 145 connection.text_factory = str 146 return rdata 147 except Exception as e2: 148 db_loger.error(f"{sql=}\n{params=}\n{e1=}\n{e2=}\n", exc_info=True) 149 return None 150 finally: 151 connection.close() 152 153 def close(self): 154 self.pool.close() 155 db_loger.info(f"关闭数据库 - {self.config}") 156 157 def __del__(self): 158 self.close() 159 160# class MsgDb(DatabaseBase): 161# 162# def p(self, *args, **kwargs): 163# sel = "select tbl_name from sqlite_master where type='table'" 164# data = self.execute(sel) 165# # print([i[0] for i in data]) 166# return data 167# 168# 169# class MsgDb1(DatabaseBase): 170# _class_name = "MsgDb1" 171# 172# def p(self, *args, **kwargs): 173# sel = "select tbl_name from sqlite_master where type='table'" 174# data = self.execute(sel) 175# # print([i[0] for i in data]) 176# return data 177# 178# 179# if __name__ == '__main__': 180# logging.basicConfig(level=logging.INFO, 181# style='{', 182# datefmt='%Y-%m-%d %H:%M:%S', 183# format='[{levelname[0]}] {asctime} [{name}:{levelno}] {pathname}:{lineno} {message}' 184# ) 185# 186# config1 = { 187# "key": "test1", 188# "type": "sqlite", 189# "path": r"D:\e_all.db" 190# } 191# config2 = { 192# "key": "test2", 193# "type": "sqlite", 194# "path": r"D:\_call.db" 195# } 196# 197# t1 = MsgDb(config1) 198# t1.p() 199# t2 = MsgDb(config2) 200# t2.p() 201# t3 = MsgDb1(config1) 202# t3.p() 203# t4 = MsgDb1(config2) 204# t4.p() 205# 206# print(t4._db_pool) 207# # 销毁t1 208# del t1 209# # 销毁t2 210# del t2 211# del t3 212# 213# # 销毁t4 214# del t4 215# import time 216# time.sleep(1) 217# 218# t1 = MsgDb(config1) 219# t1.p() 220# t2 = MsgDb(config2) 221# t2.p() 222# 223# 224# print(t2._db_pool)