解锁并提取Linux客户端微信数据库 (vibe coded)
at 295 lines 13 kB view raw
1# -*- coding: utf-8 -*-# 2# ------------------------------------------------------------------------------- 3# Name: merge_db.py 4# Description: 5# Author: xaoyaoo 6# Date: 2023/12/03 7# ------------------------------------------------------------------------------- 8import os 9import shutil 10import sqlite3 11import time 12from typing import List 13 14from .decryption import batch_decrypt 15from .utils import wx_core_loger, wx_core_error 16 17 18@wx_core_error 19def execute_sql(connection, sql, params=None): 20 """ 21 执行给定的SQL语句,返回结果。 22 参数: 23 - connection: SQLite连接 24 - sql:要执行的SQL语句 25 - params:SQL语句中的参数 26 """ 27 try: 28 # connection.text_factory = bytes 29 cursor = connection.cursor() 30 if params: 31 cursor.execute(sql, params) 32 else: 33 cursor.execute(sql) 34 return cursor.fetchall() 35 except Exception as e: 36 try: 37 connection.text_factory = bytes 38 cursor = connection.cursor() 39 if params: 40 cursor.execute(sql, params) 41 else: 42 cursor.execute(sql) 43 rdata = cursor.fetchall() 44 connection.text_factory = str 45 return rdata 46 except Exception as e: 47 wx_core_loger.error(f"**********\nSQL: {sql}\nparams: {params}\n{e}\n**********", exc_info=True) 48 return None 49 50 51@wx_core_error 52def check_create_sync_log(connection): 53 """ 54 检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间 55 :param connection: SQLite连接 56 :return: True or False 57 """ 58 59 out_cursor = connection.cursor() 60 # 检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间 61 sync_log_status = execute_sql(connection, "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_log'") 62 if len(sync_log_status) < 1: 63 # db_path 微信数据库路径,tbl_name 表名,src_count 源数据库记录数,current_count 当前合并后的数据库对应表记录数 64 sync_record_create_sql = ("CREATE TABLE sync_log (" 65 "id INTEGER PRIMARY KEY AUTOINCREMENT," 66 "db_path TEXT NOT NULL," 67 "tbl_name TEXT NOT NULL," 68 "src_count INT," 69 "current_count INT," 70 "createTime INT DEFAULT (strftime('%s', 'now')), " 71 "updateTime INT DEFAULT (strftime('%s', 'now'))" 72 ");") 73 out_cursor.execute(sync_record_create_sql) 74 # 创建索引 75 out_cursor.execute("CREATE INDEX idx_sync_log_db_path ON sync_log (db_path);") 76 out_cursor.execute("CREATE INDEX idx_sync_log_tbl_name ON sync_log (tbl_name);") 77 # 创建联合索引,防止重复 78 out_cursor.execute("CREATE UNIQUE INDEX idx_sync_log_db_tbl ON sync_log (db_path, tbl_name);") 79 connection.commit() 80 out_cursor.close() 81 return True 82 83 84@wx_core_error 85def check_create_file_md5(connection): 86 """ 87 检查是否存在表 file_md5,用于记录文件信息,后续用于去重等操作,暂时闲置 88 """ 89 pass 90 91 92@wx_core_error 93def merge_db(db_paths: List[dict], save_path: str = "merge.db", is_merge_data: bool = True, 94 startCreateTime: int = 0, endCreateTime: int = 0): 95 """ 96 合并数据库 会忽略主键以及重复的行。 97 :param db_paths: [{"db_path": "xxx", "de_path": "xxx"},...] 98 db_path表示初始路径,de_path表示解密后的路径;初始路径用于保存合并的日志情况,解密后的路径用于读取数据 99 :param save_path: str 输出文件路径 100 :param is_merge_data: bool 是否合并数据(如果为False,则只解密,并创建表,不插入数据) 101 :param startCreateTime: 开始时间戳 主要用于MSG数据库的合并 102 :param endCreateTime: 结束时间戳 主要用于MSG数据库的合并 103 :return: 104 """ 105 if os.path.isdir(save_path): 106 save_path = os.path.join(save_path, f"merge_{int(time.time())}.db") 107 108 if isinstance(db_paths, list): 109 # alias, file_path 110 databases = {f"dbi_{i}": (db['db_path'], 111 db.get('de_path', db['db_path']) 112 ) for i, db in enumerate(db_paths) 113 } 114 else: 115 raise TypeError("db_paths 类型错误") 116 outdb = sqlite3.connect(save_path) 117 118 is_sync_log = check_create_sync_log(outdb) 119 if not is_sync_log: 120 wx_core_loger.warning("创建同步记录表失败") 121 122 out_cursor = outdb.cursor() 123 124 # 将MSG_db_paths中的数据合并到out_db_path中 125 for alias, db in databases.items(): 126 db_path = db[0] 127 de_path = db[1] 128 129 # 附加数据库 130 sql_attach = f"ATTACH DATABASE '{de_path}' AS {alias}" 131 out_cursor.execute(sql_attach) 132 outdb.commit() 133 sql_query_tbl_name = f"SELECT tbl_name, sql FROM {alias}.sqlite_master WHERE type='table' ORDER BY tbl_name;" 134 tables = execute_sql(outdb, sql_query_tbl_name) 135 for table in tables: 136 table, init_create_sql = table[0], table[1] 137 table = table if isinstance(table, str) else table.decode() 138 init_create_sql = init_create_sql if isinstance(init_create_sql, str) else init_create_sql.decode() 139 if table == "sqlite_sequence": 140 continue 141 if "CREATE TABLE".lower() not in str(init_create_sql).lower(): 142 continue 143 # 获取表中的字段名 144 sql_query_columns = f"PRAGMA table_info({table})" 145 columns = execute_sql(outdb, sql_query_columns) 146 if table == "ChatInfo" and len(columns) > 12: # bizChat中的ChatInfo表与MicroMsg中的ChatInfo表字段不同 147 continue 148 col_type = { 149 (i[1] if isinstance(i[1], str) else i[1].decode(), 150 i[2] if isinstance(i[2], str) else i[2].decode()) 151 for i in columns} 152 columns = [i[0] for i in col_type] 153 if not columns or len(columns) < 1: 154 continue 155 # 创建表table 156 sql_create_tbl = f"CREATE TABLE IF NOT EXISTS {table} AS SELECT * FROM {alias}.{table} WHERE 0 = 1;" 157 out_cursor.execute(sql_create_tbl) 158 # 创建包含 NULL 值比较的 UNIQUE 索引 159 index_name = f"{table}_unique_index" 160 coalesce_columns = ','.join(f"COALESCE({column}, '')" for column in columns) 161 sql = f"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})" 162 out_cursor.execute(sql) 163 164 # 插入sync_log 165 sql_query_sync_log = f"SELECT src_count FROM sync_log WHERE db_path=? AND tbl_name=?" 166 sync_log = execute_sql(outdb, sql_query_sync_log, (db_path, table)) 167 if not sync_log or len(sync_log) < 1: 168 sql_insert_sync_log = "INSERT INTO sync_log (db_path, tbl_name, src_count, current_count) VALUES (?, ?, ?, ?)" 169 out_cursor.execute(sql_insert_sync_log, (db_path, table, 0, 0)) 170 outdb.commit() 171 172 if is_merge_data: 173 # 比较源数据库和合并后的数据库记录数 174 log_src_count = execute_sql(outdb, sql_query_sync_log, (db_path, table))[0][0] 175 src_count = execute_sql(outdb, f"SELECT COUNT(*) FROM {alias}.{table}")[0][0] 176 if src_count <= log_src_count: 177 wx_core_loger.info(f"忽略 {db_path} {de_path} {table} {src_count} {log_src_count}") 178 continue 179 180 # 构建数据查询sql 181 sql_base = f"SELECT {','.join([i for i in columns])} FROM {alias}.{table} " 182 where_clauses, params = [], [] 183 if "CreateTime" in columns: 184 if startCreateTime > 0: 185 where_clauses.append("CreateTime > ?") 186 params.append(startCreateTime) 187 if endCreateTime > 0: 188 where_clauses.append("CreateTime < ?") 189 params.append(endCreateTime) 190 # 如果有WHERE子句,将其添加到SQL语句中,并添加ORDER BY子句 191 sql = f"{sql_base} WHERE {' AND '.join(where_clauses)} ORDER BY CreateTime" if where_clauses else sql_base 192 src_data = execute_sql(outdb, sql, tuple(params)) 193 if not src_data or len(src_data) < 1: 194 continue 195 # 插入数据 196 sql = f"INSERT OR IGNORE INTO {table} ({','.join([i for i in columns])}) VALUES ({','.join(['?'] * len(columns))})" 197 try: 198 out_cursor.executemany(sql, src_data) 199 200 # update sync_log 201 sql_update_sync_log = ("UPDATE sync_log " 202 "SET src_count = ? ," 203 f"current_count=(SELECT COUNT(*) FROM {table}) " 204 "WHERE db_path=? AND tbl_name=?") 205 out_cursor.execute(sql_update_sync_log, (src_count, db_path, table)) 206 207 except Exception as e: 208 wx_core_loger.error( 209 f"error: {db_path}\n{de_path}\n{table}\n{sql}\n{src_data}\n{len(src_data)}\n{e}\n", 210 exc_info=True) 211 # 分离数据库 212 sql_detach = f"DETACH DATABASE {alias}" 213 out_cursor.execute(sql_detach) 214 outdb.commit() 215 out_cursor.close() 216 outdb.close() 217 return save_path 218 219 220@wx_core_error 221def decrypt_merge(wx_path: str, outpath: str = "", 222 merge_save_path: str = None, 223 is_merge_data=True, is_del_decrypted: bool = True, 224 startCreateTime: int = 0, endCreateTime: int = 0, 225 db_type=None) -> (bool, str): 226 """ 227 解密合并数据库(Linux 版)。自动从进程内存提取密钥,解密后合并。 228 :param wx_path: 微信数据目录 229 :param outpath: 输出路径 230 :param merge_save_path: 合并后的数据库路径 231 :param is_merge_data: 是否合并数据(如果为False,则只解密,并创建表,不插入数据) 232 :param is_del_decrypted: 是否删除解密后的数据库(除了合并后的数据库) 233 :param startCreateTime: 开始时间戳 主要用于MSG数据库的合并 234 :param endCreateTime: 结束时间戳 主要用于MSG数据库的合并 235 :param db_type: 数据库类型,从核心数据库中选择 236 :return: (true,合并后的数据库路径) or (false,错误信息) 237 """ 238 from ..linux.memscan import extract_all_keys 239 240 if db_type is None: 241 db_type = [] 242 243 outpath = outpath if outpath else "decrypt_merge_tmp" 244 merge_save_path = os.path.join(outpath, 245 f"merge_{int(time.time())}.db") if merge_save_path is None else merge_save_path 246 decrypted_path = os.path.join(outpath, "decrypted") 247 248 if not wx_path or not os.path.exists(wx_path): 249 wx_core_loger.error("参数错误: wx_path 不存在", exc_info=True) 250 return False, "参数错误: wx_path 不存在" 251 252 # 从进程内存提取密钥 253 try: 254 db_keys = extract_all_keys(wx_path) 255 except RuntimeError as e: 256 wx_core_loger.error(f"提取密钥失败: {e}", exc_info=True) 257 return False, f"提取密钥失败: {e}" 258 259 if not db_keys: 260 wx_core_loger.error("未提取到任何数据库密钥") 261 return False, "未提取到任何数据库密钥" 262 263 # 判断out_path是否为空目录 264 if os.path.exists(decrypted_path) and os.listdir(decrypted_path): 265 for root, dirs, files in os.walk(decrypted_path, topdown=False): 266 for name in files: 267 os.remove(os.path.join(root, name)) 268 for name in dirs: 269 os.rmdir(os.path.join(root, name)) 270 271 if not os.path.exists(decrypted_path): 272 os.makedirs(decrypted_path) 273 274 # 解密 275 code, ret = batch_decrypt(db_keys, decrypted_path) 276 if not code: 277 wx_core_loger.error(f"解密失败{ret}", exc_info=True) 278 return False, ret 279 280 out_dbs = [] 281 for code1, ret1 in ret: 282 if code1: 283 out_dbs.append(ret1) 284 285 parpare_merge_db_path = [] 286 for db_path, out_path, _ in out_dbs: 287 parpare_merge_db_path.append({"db_path": db_path, "de_path": out_path}) 288 merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, is_merge_data=is_merge_data, 289 startCreateTime=startCreateTime, endCreateTime=endCreateTime) 290 if is_del_decrypted: 291 shutil.rmtree(decrypted_path, True) 292 if isinstance(merge_save_path, str): 293 return True, merge_save_path 294 else: 295 return False, "未知错误"