解锁并提取Linux客户端微信数据库 (vibe coded)
at 157 lines 5.2 kB view raw
1# -*- coding: utf-8 -*-# 2# Linux-specific wx_info implementation for native WeChat (AppImage) builds. 3 4import json 5import os 6import re 7from typing import List, Union, Optional, Dict, Tuple 8 9from .utils import verify_raw_key, wx_core_error, wx_core_loger, CORE_DB_TYPE 10from ..linux import find_wechat_files_path 11from ..linux.memscan import find_wechat_pids, extract_all_keys 12 13 14@wx_core_error 15def get_wx_dir_by_linux(wxid: str = "all") -> Optional[str]: 16 base = find_wechat_files_path() 17 if not base: 18 return None 19 20 if wxid and wxid != "all": 21 # if base already is wxid dir, accept it 22 if os.path.basename(base).startswith("wxid_") and os.path.exists(base): 23 return base 24 candidate = os.path.join(base, wxid) 25 return candidate if os.path.exists(candidate) else None 26 return base 27 28 29@wx_core_error 30def get_wx_raw_keys(wx_dir: str = "") -> Dict[str, Tuple[str, str]]: 31 """ 32 从运行中的微信进程内存中提取所有数据库的 raw key。 33 返回 {db_path: (enc_key_hex, salt_hex)}。 34 """ 35 if not wx_dir: 36 wx_dir = get_wx_dir_by_linux("all") 37 if not wx_dir: 38 wx_core_loger.error("未找到微信数据目录") 39 return {} 40 try: 41 return extract_all_keys(wx_dir) 42 except RuntimeError as e: 43 wx_core_loger.error(str(e)) 44 return {} 45 46 47@wx_core_error 48def get_wx_info(WX_OFFS: dict = None, is_print: bool = False, save_path: str = None): 49 """ 50 Linux: return minimal info (wx_dir + raw keys from process memory). 51 """ 52 if WX_OFFS is None: 53 WX_OFFS = {} 54 55 wx_dir = get_wx_dir_by_linux("all") 56 raw_keys = get_wx_raw_keys(wx_dir or "") 57 58 # Pick any key for backward compat 59 key = None 60 if raw_keys: 61 key, _ = next(iter(raw_keys.values())) 62 63 pids = find_wechat_pids() 64 65 result = [{ 66 "pid": pids[0] if pids else None, 67 "version": "linux", 68 "account": None, 69 "mobile": None, 70 "nickname": None, 71 "mail": None, 72 "wxid": None, 73 "key": key, 74 "wx_dir": wx_dir, 75 }] 76 77 if is_print: 78 print("=" * 32) 79 for rlt in result: 80 for k, v in rlt.items(): 81 print(f"[+] {k:>8}: {v if v else 'None'}") 82 print("=" * 32) 83 84 if save_path: 85 try: 86 infos = json.load(open(save_path, "r", encoding="utf-8")) if os.path.exists(save_path) else [] 87 except Exception: 88 infos = [] 89 with open(save_path, "w", encoding="utf-8") as f: 90 infos += result 91 json.dump(infos, f, ensure_ascii=False, indent=4) 92 return result 93 94 95@wx_core_error 96def get_wx_db(msg_dir: str = None, 97 db_types: Union[List[str], str] = None, 98 wxids: Union[List[str], str] = None) -> List[dict]: 99 r""" 100 Linux: 获取微信数据库路径 101 """ 102 result = [] 103 104 if not msg_dir or not os.path.exists(msg_dir): 105 wx_core_loger.warning(f"[-] 微信文件目录不存在: {msg_dir}, 将使用默认路径") 106 msg_dir = get_wx_dir_by_linux("all") 107 108 if not msg_dir or not os.path.exists(msg_dir): 109 wx_core_loger.error(f"[-] 目录不存在: {msg_dir}", exc_info=True) 110 return result 111 112 wxids = wxids.split(";") if isinstance(wxids, str) else wxids 113 if not isinstance(wxids, list) or len(wxids) <= 0: 114 wxids = None 115 db_types = db_types.split(";") if isinstance(db_types, str) and db_types else db_types 116 if not isinstance(db_types, list) or len(db_types) <= 0: 117 db_types = None 118 119 wxid_dirs = {} 120 if wxids or "all_users" in os.listdir(msg_dir) or "wmpf" in os.listdir(msg_dir) or "applet" in os.listdir(msg_dir): 121 for sub_dir in os.listdir(msg_dir): 122 if os.path.isdir(os.path.join(msg_dir, sub_dir)) and sub_dir.lower() not in ["all users", "all_users", "applet", "wmpf"]: 123 wxid_dirs[os.path.basename(sub_dir)] = os.path.join(msg_dir, sub_dir) 124 else: 125 wxid_dirs[os.path.basename(msg_dir)] = msg_dir 126 127 for wxid, wxid_dir in wxid_dirs.items(): 128 if wxids and wxid not in wxids: 129 continue 130 for root, _, files in os.walk(wxid_dir): 131 for file_name in files: 132 if not file_name.endswith(".db"): 133 continue 134 db_type = re.sub(r"\d*\.db$", "", file_name) 135 if db_types and db_type not in db_types: 136 continue 137 db_path = os.path.join(root, file_name) 138 result.append({"wxid": wxid, "db_type": db_type, "db_path": db_path, "wxid_dir": wxid_dir}) 139 return result 140 141 142@wx_core_error 143def get_core_db(wx_path: str, db_types: list = None): 144 if not os.path.exists(wx_path): 145 return False, f"[-] 目录不存在: {wx_path}" 146 147 if not db_types: 148 db_types = CORE_DB_TYPE 149 db_types = [dt for dt in db_types if dt in CORE_DB_TYPE] 150 msg_dir = os.path.dirname(wx_path) 151 my_wxid = os.path.basename(wx_path) 152 wxdbpaths = get_wx_db(msg_dir=msg_dir, db_types=db_types, wxids=my_wxid) 153 154 if len(wxdbpaths) == 0: 155 wx_core_loger.error(f"[-] get_core_db 未获取到数据库路径") 156 return False, "未获取到数据库路径" 157 return True, wxdbpaths