解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: dbbase.py
4# Description:
5# Author: xaoyaoo
6# Date: 2024/04/15
7# -------------------------------------------------------------------------------
8import importlib
9import os
10import sqlite3
11import time
12
13from .utils import db_loger
14from dbutils.pooled_db import PooledDB
15
16
17# import logging
18#
19# db_loger = logging.getLogger("db_prepare")
20
21
22class DatabaseSingletonBase:
23 # _singleton_instances = {} # 使用字典存储不同db_path对应的单例实例
24 _class_name = "DatabaseSingletonBase"
25 _db_pool = {} # 使用字典存储不同db_path对应的连接池
26
27 # def __new__(cls, *args, **kwargs):
28 # if cls._class_name not in cls._singleton_instances:
29 # cls._singleton_instances[cls._class_name] = super().__new__(cls)
30 # return cls._singleton_instances[cls._class_name]
31
32 @classmethod
33 def connect(cls, db_config):
34 """
35 连接数据库,如果增加其他数据库连接,则重写该方法
36 :param db_config: 数据库配置
37 :return: 连接池
38 """
39 if not db_config:
40 raise ValueError("db_config 不能为空")
41 db_key = db_config.get("key", "xaoyaoo_741852963")
42 db_type = db_config.get("type", "sqlite")
43 if db_key in cls._db_pool and cls._db_pool[db_key] is not None:
44 return cls._db_pool[db_key]
45
46 if db_type == "sqlite":
47 db_path = db_config.get("path", "")
48 if not os.path.exists(db_path):
49 raise FileNotFoundError(f"文件不存在: {db_path}")
50 pool = PooledDB(
51 creator=sqlite3, # 使用 sqlite3 作为连接创建者
52 maxconnections=0, # 连接池最大连接数
53 mincached=4, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
54 maxusage=1, # 一个链接最多被重复使用的次数,None表示无限制
55 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
56 ping=0, # ping 数据库判断是否服务正常
57 database=db_path
58 )
59 elif db_type == "mysql":
60 mysql_config = {
61 'user': db_config['user'],
62 'host': db_config['host'],
63 'password': db_config['password'],
64 'database': db_config['database'],
65 'port': db_config['port']
66 }
67 pool = PooledDB(
68 creator=importlib.import_module('pymysql'), # 使用 mysql 作为连接创建者
69 ping=1, # ping 数据库判断是否服务正常
70 **mysql_config
71 )
72 else:
73 raise ValueError(f"不支持的数据库类型: {db_type}")
74
75 db_loger.info(f"{pool} 连接句柄创建 {db_config}")
76 cls._db_pool[db_key] = pool
77 return pool
78
79
80class DatabaseBase(DatabaseSingletonBase):
81 _class_name = "DatabaseBase"
82 existed_tables = []
83
84 def __init__(self, db_config):
85 """
86 db_config = {
87 "key": "test1",
88 "type": "sqlite",
89 "path": r"C:\***\wxdump_work\merge_all.db"
90 }
91 """
92 self.config = db_config
93 self.pool = self.connect(self.config)
94 self.__get_existed_tables()
95
96 def __get_existed_tables(self):
97 sql = "SELECT tbl_name FROM sqlite_master WHERE type = 'table' and tbl_name!='sqlite_sequence';"
98 existing_tables = self.execute(sql)
99 if existing_tables:
100 self.existed_tables = [row[0].lower() for row in existing_tables]
101 return self.existed_tables
102 else:
103 return None
104
105 def tables_exist(self, required_tables: str or list):
106 """
107 判断该类所需要的表是否存在
108 Check if all required tables exist in the database.
109 Args:
110 required_tables (list or str): A list of table names or a single table name string.
111 Returns:
112 bool: True if all required tables exist, False otherwise.
113 """
114 if isinstance(required_tables, str):
115 required_tables = [required_tables]
116 rbool = all(table.lower() in self.existed_tables for table in (required_tables or []))
117 if not rbool: db_loger.warning(f"{required_tables=}\n{self.existed_tables=}\n{rbool=}")
118 return rbool
119
120 def execute(self, sql, params=None):
121 """
122 执行SQL语句
123 :param sql: SQL语句 (str)
124 :param params: 参数 (tuple)
125 :return: 查询结果 (list)
126 """
127 connection = self.pool.connection()
128 try:
129 # connection.text_factory = bytes
130 cursor = connection.cursor()
131 if params:
132 cursor.execute(sql, params)
133 else:
134 cursor.execute(sql)
135 return cursor.fetchall()
136 except Exception as e1:
137 try:
138 connection.text_factory = bytes
139 cursor = connection.cursor()
140 if params:
141 cursor.execute(sql, params)
142 else:
143 cursor.execute(sql)
144 rdata = cursor.fetchall()
145 connection.text_factory = str
146 return rdata
147 except Exception as e2:
148 db_loger.error(f"{sql=}\n{params=}\n{e1=}\n{e2=}\n", exc_info=True)
149 return None
150 finally:
151 connection.close()
152
153 def close(self):
154 self.pool.close()
155 db_loger.info(f"关闭数据库 - {self.config}")
156
157 def __del__(self):
158 self.close()
159
160# class MsgDb(DatabaseBase):
161#
162# def p(self, *args, **kwargs):
163# sel = "select tbl_name from sqlite_master where type='table'"
164# data = self.execute(sel)
165# # print([i[0] for i in data])
166# return data
167#
168#
169# class MsgDb1(DatabaseBase):
170# _class_name = "MsgDb1"
171#
172# def p(self, *args, **kwargs):
173# sel = "select tbl_name from sqlite_master where type='table'"
174# data = self.execute(sel)
175# # print([i[0] for i in data])
176# return data
177#
178#
179# if __name__ == '__main__':
180# logging.basicConfig(level=logging.INFO,
181# style='{',
182# datefmt='%Y-%m-%d %H:%M:%S',
183# format='[{levelname[0]}] {asctime} [{name}:{levelno}] {pathname}:{lineno} {message}'
184# )
185#
186# config1 = {
187# "key": "test1",
188# "type": "sqlite",
189# "path": r"D:\e_all.db"
190# }
191# config2 = {
192# "key": "test2",
193# "type": "sqlite",
194# "path": r"D:\_call.db"
195# }
196#
197# t1 = MsgDb(config1)
198# t1.p()
199# t2 = MsgDb(config2)
200# t2.p()
201# t3 = MsgDb1(config1)
202# t3.p()
203# t4 = MsgDb1(config2)
204# t4.p()
205#
206# print(t4._db_pool)
207# # 销毁t1
208# del t1
209# # 销毁t2
210# del t2
211# del t3
212#
213# # 销毁t4
214# del t4
215# import time
216# time.sleep(1)
217#
218# t1 = MsgDb(config1)
219# t1.p()
220# t2 = MsgDb(config2)
221# t2.p()
222#
223#
224# print(t2._db_pool)