解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: merge_db.py
4# Description:
5# Author: xaoyaoo
6# Date: 2023/12/03
7# -------------------------------------------------------------------------------
8import os
9import shutil
10import sqlite3
11import time
12from typing import List
13
14from .decryption import batch_decrypt
15from .utils import wx_core_loger, wx_core_error
16
17
18@wx_core_error
19def execute_sql(connection, sql, params=None):
20 """
21 执行给定的SQL语句,返回结果。
22 参数:
23 - connection: SQLite连接
24 - sql:要执行的SQL语句
25 - params:SQL语句中的参数
26 """
27 try:
28 # connection.text_factory = bytes
29 cursor = connection.cursor()
30 if params:
31 cursor.execute(sql, params)
32 else:
33 cursor.execute(sql)
34 return cursor.fetchall()
35 except Exception as e:
36 try:
37 connection.text_factory = bytes
38 cursor = connection.cursor()
39 if params:
40 cursor.execute(sql, params)
41 else:
42 cursor.execute(sql)
43 rdata = cursor.fetchall()
44 connection.text_factory = str
45 return rdata
46 except Exception as e:
47 wx_core_loger.error(f"**********\nSQL: {sql}\nparams: {params}\n{e}\n**********", exc_info=True)
48 return None
49
50
51@wx_core_error
52def check_create_sync_log(connection):
53 """
54 检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间
55 :param connection: SQLite连接
56 :return: True or False
57 """
58
59 out_cursor = connection.cursor()
60 # 检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间
61 sync_log_status = execute_sql(connection, "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_log'")
62 if len(sync_log_status) < 1:
63 # db_path 微信数据库路径,tbl_name 表名,src_count 源数据库记录数,current_count 当前合并后的数据库对应表记录数
64 sync_record_create_sql = ("CREATE TABLE sync_log ("
65 "id INTEGER PRIMARY KEY AUTOINCREMENT,"
66 "db_path TEXT NOT NULL,"
67 "tbl_name TEXT NOT NULL,"
68 "src_count INT,"
69 "current_count INT,"
70 "createTime INT DEFAULT (strftime('%s', 'now')), "
71 "updateTime INT DEFAULT (strftime('%s', 'now'))"
72 ");")
73 out_cursor.execute(sync_record_create_sql)
74 # 创建索引
75 out_cursor.execute("CREATE INDEX idx_sync_log_db_path ON sync_log (db_path);")
76 out_cursor.execute("CREATE INDEX idx_sync_log_tbl_name ON sync_log (tbl_name);")
77 # 创建联合索引,防止重复
78 out_cursor.execute("CREATE UNIQUE INDEX idx_sync_log_db_tbl ON sync_log (db_path, tbl_name);")
79 connection.commit()
80 out_cursor.close()
81 return True
82
83
84@wx_core_error
85def check_create_file_md5(connection):
86 """
87 检查是否存在表 file_md5,用于记录文件信息,后续用于去重等操作,暂时闲置
88 """
89 pass
90
91
92@wx_core_error
93def merge_db(db_paths: List[dict], save_path: str = "merge.db", is_merge_data: bool = True,
94 startCreateTime: int = 0, endCreateTime: int = 0):
95 """
96 合并数据库 会忽略主键以及重复的行。
97 :param db_paths: [{"db_path": "xxx", "de_path": "xxx"},...]
98 db_path表示初始路径,de_path表示解密后的路径;初始路径用于保存合并的日志情况,解密后的路径用于读取数据
99 :param save_path: str 输出文件路径
100 :param is_merge_data: bool 是否合并数据(如果为False,则只解密,并创建表,不插入数据)
101 :param startCreateTime: 开始时间戳 主要用于MSG数据库的合并
102 :param endCreateTime: 结束时间戳 主要用于MSG数据库的合并
103 :return:
104 """
105 if os.path.isdir(save_path):
106 save_path = os.path.join(save_path, f"merge_{int(time.time())}.db")
107
108 if isinstance(db_paths, list):
109 # alias, file_path
110 databases = {f"dbi_{i}": (db['db_path'],
111 db.get('de_path', db['db_path'])
112 ) for i, db in enumerate(db_paths)
113 }
114 else:
115 raise TypeError("db_paths 类型错误")
116 outdb = sqlite3.connect(save_path)
117
118 is_sync_log = check_create_sync_log(outdb)
119 if not is_sync_log:
120 wx_core_loger.warning("创建同步记录表失败")
121
122 out_cursor = outdb.cursor()
123
124 # 将MSG_db_paths中的数据合并到out_db_path中
125 for alias, db in databases.items():
126 db_path = db[0]
127 de_path = db[1]
128
129 # 附加数据库
130 sql_attach = f"ATTACH DATABASE '{de_path}' AS {alias}"
131 out_cursor.execute(sql_attach)
132 outdb.commit()
133 sql_query_tbl_name = f"SELECT tbl_name, sql FROM {alias}.sqlite_master WHERE type='table' ORDER BY tbl_name;"
134 tables = execute_sql(outdb, sql_query_tbl_name)
135 for table in tables:
136 table, init_create_sql = table[0], table[1]
137 table = table if isinstance(table, str) else table.decode()
138 init_create_sql = init_create_sql if isinstance(init_create_sql, str) else init_create_sql.decode()
139 if table == "sqlite_sequence":
140 continue
141 if "CREATE TABLE".lower() not in str(init_create_sql).lower():
142 continue
143 # 获取表中的字段名
144 sql_query_columns = f"PRAGMA table_info({table})"
145 columns = execute_sql(outdb, sql_query_columns)
146 if table == "ChatInfo" and len(columns) > 12: # bizChat中的ChatInfo表与MicroMsg中的ChatInfo表字段不同
147 continue
148 col_type = {
149 (i[1] if isinstance(i[1], str) else i[1].decode(),
150 i[2] if isinstance(i[2], str) else i[2].decode())
151 for i in columns}
152 columns = [i[0] for i in col_type]
153 if not columns or len(columns) < 1:
154 continue
155 # 创建表table
156 sql_create_tbl = f"CREATE TABLE IF NOT EXISTS {table} AS SELECT * FROM {alias}.{table} WHERE 0 = 1;"
157 out_cursor.execute(sql_create_tbl)
158 # 创建包含 NULL 值比较的 UNIQUE 索引
159 index_name = f"{table}_unique_index"
160 coalesce_columns = ','.join(f"COALESCE({column}, '')" for column in columns)
161 sql = f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})"
162 out_cursor.execute(sql)
163
164 # 插入sync_log
165 sql_query_sync_log = f"SELECT src_count FROM sync_log WHERE db_path=? AND tbl_name=?"
166 sync_log = execute_sql(outdb, sql_query_sync_log, (db_path, table))
167 if not sync_log or len(sync_log) < 1:
168 sql_insert_sync_log = "INSERT INTO sync_log (db_path, tbl_name, src_count, current_count) VALUES (?, ?, ?, ?)"
169 out_cursor.execute(sql_insert_sync_log, (db_path, table, 0, 0))
170 outdb.commit()
171
172 if is_merge_data:
173 # 比较源数据库和合并后的数据库记录数
174 log_src_count = execute_sql(outdb, sql_query_sync_log, (db_path, table))[0][0]
175 src_count = execute_sql(outdb, f"SELECT COUNT(*) FROM {alias}.{table}")[0][0]
176 if src_count <= log_src_count:
177 wx_core_loger.info(f"忽略 {db_path} {de_path} {table} {src_count} {log_src_count}")
178 continue
179
180 # 构建数据查询sql
181 sql_base = f"SELECT {','.join([i for i in columns])} FROM {alias}.{table} "
182 where_clauses, params = [], []
183 if "CreateTime" in columns:
184 if startCreateTime > 0:
185 where_clauses.append("CreateTime > ?")
186 params.append(startCreateTime)
187 if endCreateTime > 0:
188 where_clauses.append("CreateTime < ?")
189 params.append(endCreateTime)
190 # 如果有WHERE子句,将其添加到SQL语句中,并添加ORDER BY子句
191 sql = f"{sql_base} WHERE {' AND '.join(where_clauses)} ORDER BY CreateTime" if where_clauses else sql_base
192 src_data = execute_sql(outdb, sql, tuple(params))
193 if not src_data or len(src_data) < 1:
194 continue
195 # 插入数据
196 sql = f"INSERT OR IGNORE INTO {table} ({','.join([i for i in columns])}) VALUES ({','.join(['?'] * len(columns))})"
197 try:
198 out_cursor.executemany(sql, src_data)
199
200 # update sync_log
201 sql_update_sync_log = ("UPDATE sync_log "
202 "SET src_count = ? ,"
203 f"current_count=(SELECT COUNT(*) FROM {table}) "
204 "WHERE db_path=? AND tbl_name=?")
205 out_cursor.execute(sql_update_sync_log, (src_count, db_path, table))
206
207 except Exception as e:
208 wx_core_loger.error(
209 f"error: {db_path}\n{de_path}\n{table}\n{sql}\n{src_data}\n{len(src_data)}\n{e}\n",
210 exc_info=True)
211 # 分离数据库
212 sql_detach = f"DETACH DATABASE {alias}"
213 out_cursor.execute(sql_detach)
214 outdb.commit()
215 out_cursor.close()
216 outdb.close()
217 return save_path
218
219
220@wx_core_error
221def decrypt_merge(wx_path: str, outpath: str = "",
222 merge_save_path: str = None,
223 is_merge_data=True, is_del_decrypted: bool = True,
224 startCreateTime: int = 0, endCreateTime: int = 0,
225 db_type=None) -> (bool, str):
226 """
227 解密合并数据库(Linux 版)。自动从进程内存提取密钥,解密后合并。
228 :param wx_path: 微信数据目录
229 :param outpath: 输出路径
230 :param merge_save_path: 合并后的数据库路径
231 :param is_merge_data: 是否合并数据(如果为False,则只解密,并创建表,不插入数据)
232 :param is_del_decrypted: 是否删除解密后的数据库(除了合并后的数据库)
233 :param startCreateTime: 开始时间戳 主要用于MSG数据库的合并
234 :param endCreateTime: 结束时间戳 主要用于MSG数据库的合并
235 :param db_type: 数据库类型,从核心数据库中选择
236 :return: (true,合并后的数据库路径) or (false,错误信息)
237 """
238 from ..linux.memscan import extract_all_keys
239
240 if db_type is None:
241 db_type = []
242
243 outpath = outpath if outpath else "decrypt_merge_tmp"
244 merge_save_path = os.path.join(outpath,
245 f"merge_{int(time.time())}.db") if merge_save_path is None else merge_save_path
246 decrypted_path = os.path.join(outpath, "decrypted")
247
248 if not wx_path or not os.path.exists(wx_path):
249 wx_core_loger.error("参数错误: wx_path 不存在", exc_info=True)
250 return False, "参数错误: wx_path 不存在"
251
252 # 从进程内存提取密钥
253 try:
254 db_keys = extract_all_keys(wx_path)
255 except RuntimeError as e:
256 wx_core_loger.error(f"提取密钥失败: {e}", exc_info=True)
257 return False, f"提取密钥失败: {e}"
258
259 if not db_keys:
260 wx_core_loger.error("未提取到任何数据库密钥")
261 return False, "未提取到任何数据库密钥"
262
263 # 判断out_path是否为空目录
264 if os.path.exists(decrypted_path) and os.listdir(decrypted_path):
265 for root, dirs, files in os.walk(decrypted_path, topdown=False):
266 for name in files:
267 os.remove(os.path.join(root, name))
268 for name in dirs:
269 os.rmdir(os.path.join(root, name))
270
271 if not os.path.exists(decrypted_path):
272 os.makedirs(decrypted_path)
273
274 # 解密
275 code, ret = batch_decrypt(db_keys, decrypted_path)
276 if not code:
277 wx_core_loger.error(f"解密失败{ret}", exc_info=True)
278 return False, ret
279
280 out_dbs = []
281 for code1, ret1 in ret:
282 if code1:
283 out_dbs.append(ret1)
284
285 parpare_merge_db_path = []
286 for db_path, out_path, _ in out_dbs:
287 parpare_merge_db_path.append({"db_path": db_path, "de_path": out_path})
288 merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, is_merge_data=is_merge_data,
289 startCreateTime=startCreateTime, endCreateTime=endCreateTime)
290 if is_del_decrypted:
291 shutil.rmtree(decrypted_path, True)
292 if isinstance(merge_save_path, str):
293 return True, merge_save_path
294 else:
295 return False, "未知错误"