解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# Linux-specific wx_info implementation for native WeChat (AppImage) builds.
3
4import json
5import os
6import re
7from typing import List, Union, Optional, Dict, Tuple
8
9from .utils import verify_raw_key, wx_core_error, wx_core_loger, CORE_DB_TYPE
10from ..linux import find_wechat_files_path
11from ..linux.memscan import find_wechat_pids, extract_all_keys
12
13
14@wx_core_error
15def get_wx_dir_by_linux(wxid: str = "all") -> Optional[str]:
16 base = find_wechat_files_path()
17 if not base:
18 return None
19
20 if wxid and wxid != "all":
21 # if base already is wxid dir, accept it
22 if os.path.basename(base).startswith("wxid_") and os.path.exists(base):
23 return base
24 candidate = os.path.join(base, wxid)
25 return candidate if os.path.exists(candidate) else None
26 return base
27
28
29@wx_core_error
30def get_wx_raw_keys(wx_dir: str = "") -> Dict[str, Tuple[str, str]]:
31 """
32 从运行中的微信进程内存中提取所有数据库的 raw key。
33 返回 {db_path: (enc_key_hex, salt_hex)}。
34 """
35 if not wx_dir:
36 wx_dir = get_wx_dir_by_linux("all")
37 if not wx_dir:
38 wx_core_loger.error("未找到微信数据目录")
39 return {}
40 try:
41 return extract_all_keys(wx_dir)
42 except RuntimeError as e:
43 wx_core_loger.error(str(e))
44 return {}
45
46
47@wx_core_error
48def get_wx_info(WX_OFFS: dict = None, is_print: bool = False, save_path: str = None):
49 """
50 Linux: return minimal info (wx_dir + raw keys from process memory).
51 """
52 if WX_OFFS is None:
53 WX_OFFS = {}
54
55 wx_dir = get_wx_dir_by_linux("all")
56 raw_keys = get_wx_raw_keys(wx_dir or "")
57
58 # Pick any key for backward compat
59 key = None
60 if raw_keys:
61 key, _ = next(iter(raw_keys.values()))
62
63 pids = find_wechat_pids()
64
65 result = [{
66 "pid": pids[0] if pids else None,
67 "version": "linux",
68 "account": None,
69 "mobile": None,
70 "nickname": None,
71 "mail": None,
72 "wxid": None,
73 "key": key,
74 "wx_dir": wx_dir,
75 }]
76
77 if is_print:
78 print("=" * 32)
79 for rlt in result:
80 for k, v in rlt.items():
81 print(f"[+] {k:>8}: {v if v else 'None'}")
82 print("=" * 32)
83
84 if save_path:
85 try:
86 infos = json.load(open(save_path, "r", encoding="utf-8")) if os.path.exists(save_path) else []
87 except Exception:
88 infos = []
89 with open(save_path, "w", encoding="utf-8") as f:
90 infos += result
91 json.dump(infos, f, ensure_ascii=False, indent=4)
92 return result
93
94
95@wx_core_error
96def get_wx_db(msg_dir: str = None,
97 db_types: Union[List[str], str] = None,
98 wxids: Union[List[str], str] = None) -> List[dict]:
99 r"""
100 Linux: 获取微信数据库路径
101 """
102 result = []
103
104 if not msg_dir or not os.path.exists(msg_dir):
105 wx_core_loger.warning(f"[-] 微信文件目录不存在: {msg_dir}, 将使用默认路径")
106 msg_dir = get_wx_dir_by_linux("all")
107
108 if not msg_dir or not os.path.exists(msg_dir):
109 wx_core_loger.error(f"[-] 目录不存在: {msg_dir}", exc_info=True)
110 return result
111
112 wxids = wxids.split(";") if isinstance(wxids, str) else wxids
113 if not isinstance(wxids, list) or len(wxids) <= 0:
114 wxids = None
115 db_types = db_types.split(";") if isinstance(db_types, str) and db_types else db_types
116 if not isinstance(db_types, list) or len(db_types) <= 0:
117 db_types = None
118
119 wxid_dirs = {}
120 if wxids or "all_users" in os.listdir(msg_dir) or "wmpf" in os.listdir(msg_dir) or "applet" in os.listdir(msg_dir):
121 for sub_dir in os.listdir(msg_dir):
122 if os.path.isdir(os.path.join(msg_dir, sub_dir)) and sub_dir.lower() not in ["all users", "all_users", "applet", "wmpf"]:
123 wxid_dirs[os.path.basename(sub_dir)] = os.path.join(msg_dir, sub_dir)
124 else:
125 wxid_dirs[os.path.basename(msg_dir)] = msg_dir
126
127 for wxid, wxid_dir in wxid_dirs.items():
128 if wxids and wxid not in wxids:
129 continue
130 for root, _, files in os.walk(wxid_dir):
131 for file_name in files:
132 if not file_name.endswith(".db"):
133 continue
134 db_type = re.sub(r"\d*\.db$", "", file_name)
135 if db_types and db_type not in db_types:
136 continue
137 db_path = os.path.join(root, file_name)
138 result.append({"wxid": wxid, "db_type": db_type, "db_path": db_path, "wxid_dir": wxid_dir})
139 return result
140
141
142@wx_core_error
143def get_core_db(wx_path: str, db_types: list = None):
144 if not os.path.exists(wx_path):
145 return False, f"[-] 目录不存在: {wx_path}"
146
147 if not db_types:
148 db_types = CORE_DB_TYPE
149 db_types = [dt for dt in db_types if dt in CORE_DB_TYPE]
150 msg_dir = os.path.dirname(wx_path)
151 my_wxid = os.path.basename(wx_path)
152 wxdbpaths = get_wx_db(msg_dir=msg_dir, db_types=db_types, wxids=my_wxid)
153
154 if len(wxdbpaths) == 0:
155 wx_core_loger.error(f"[-] get_core_db 未获取到数据库路径")
156 return False, "未获取到数据库路径"
157 return True, wxdbpaths