解锁并提取Linux客户端微信数据库 (vibe coded)
1# Extract SQLCipher raw keys from a running WeChat process by scanning /proc/<pid>/mem.
2#
3# WeChat 4.x (Linux) uses SQLCipher 4 (AES-256-CBC, HMAC-SHA512, 256K PBKDF2 iterations).
4# WCDB caches the derived raw key in process memory as an ASCII string:
5# x'<64 hex enc_key><32 hex salt>'
6# where the 32-hex salt matches the first 16 bytes of the corresponding .db file.
7#
8# This module reads /proc/<pid>/maps + /proc/<pid>/mem to find these patterns,
9# then returns a mapping of {db_path: (raw_key_hex, salt_hex)}.
10
11import glob
12import os
13import re
14import subprocess
15from typing import Dict, List, Optional, Tuple
16
17
18def find_wechat_pids() -> List[int]:
19 """Return PIDs of running WeChat main processes."""
20 pids = []
21 try:
22 result = subprocess.run(
23 ["pgrep", "-f", r"/wechat\b"],
24 capture_output=True, text=True, check=False,
25 )
26 if result.returncode == 0:
27 for line in result.stdout.splitlines():
28 pid = line.strip()
29 if pid.isdigit():
30 pids.append(int(pid))
31 except Exception:
32 pass
33
34 if not pids:
35 # fallback: scan /proc manually
36 for entry in os.listdir("/proc"):
37 if not entry.isdigit():
38 continue
39 try:
40 exe = os.readlink(f"/proc/{entry}/exe")
41 if "wechat" in exe.lower() and "crashpad" not in exe.lower():
42 pids.append(int(entry))
43 except Exception:
44 continue
45 return pids
46
47
48def _read_process_memory(pid: int, max_region_bytes: int = 200 * 1024 * 1024) -> List[bytes]:
49 """Read all readable memory regions of a process via /proc."""
50 chunks = []
51 try:
52 with open(f"/proc/{pid}/maps", "r") as f:
53 maps = f.readlines()
54 except Exception:
55 return chunks
56
57 with open(f"/proc/{pid}/mem", "rb") as mem:
58 for line in maps:
59 parts = line.split()
60 if len(parts) < 2 or "r" not in parts[1]:
61 continue
62 addr_range = parts[0].split("-")
63 start = int(addr_range[0], 16)
64 end = int(addr_range[1], 16)
65 if end - start > max_region_bytes:
66 continue
67 try:
68 mem.seek(start)
69 chunks.append(mem.read(end - start))
70 except (OSError, ValueError):
71 continue
72 return chunks
73
74
75def _collect_db_salts(wx_dir: str) -> Dict[str, str]:
76 """Walk wx_dir and return {db_path: salt_hex} for every .db file."""
77 salts = {}
78 for root, _dirs, files in os.walk(wx_dir):
79 for name in files:
80 if not name.endswith(".db"):
81 continue
82 path = os.path.join(root, name)
83 try:
84 with open(path, "rb") as f:
85 header = f.read(16)
86 if len(header) == 16 and header != b"SQLite format 3\x00":
87 salts[path] = header.hex()
88 except Exception:
89 continue
90 return salts
91
92
93_HEX_SET = set("0123456789abcdef")
94
95
96def scan_memory_for_raw_keys(
97 pid: int,
98 wx_dir: str,
99) -> Dict[str, Tuple[str, str]]:
100 """
101 Scan process memory for SQLCipher raw keys matching database salts.
102
103 Returns {db_path: (enc_key_hex, salt_hex)} for each database whose salt
104 was found in the WCDB raw-key cache pattern x'<64hex><32hex>'.
105 """
106 db_salts = _collect_db_salts(wx_dir)
107 if not db_salts:
108 return {}
109
110 memory = _read_process_memory(pid)
111 if not memory:
112 return {}
113
114 results: Dict[str, Tuple[str, str]] = {}
115
116 for db_path, salt_hex in db_salts.items():
117 if db_path in results:
118 continue
119 salt_bytes = salt_hex.encode("ascii")
120 found = False
121 for chunk in memory:
122 if found:
123 break
124 pos = 0
125 while True:
126 idx = chunk.find(salt_bytes, pos)
127 if idx == -1:
128 break
129 # Check for x'<64hex_key><32hex_salt>' pattern
130 if idx >= 66:
131 candidate = chunk[idx - 66 : idx + 32 + 1]
132 try:
133 s = candidate.decode("ascii")
134 except (UnicodeDecodeError, ValueError):
135 pos = idx + 1
136 continue
137 if (
138 len(s) == 99
139 and s.startswith("x'")
140 and s.endswith("'")
141 and all(c in _HEX_SET for c in s[2:66])
142 ):
143 enc_key = s[2:66]
144 results[db_path] = (enc_key, salt_hex)
145 found = True
146 break
147 pos = idx + 1
148
149 return results
150
151
152def extract_all_keys(wx_dir: str) -> Dict[str, Tuple[str, str]]:
153 """
154 High-level: find WeChat PID, scan memory, return raw keys for all databases.
155
156 Returns {db_path: (enc_key_hex, salt_hex)}.
157 Raises RuntimeError if WeChat is not running.
158 """
159 pids = find_wechat_pids()
160 if not pids:
161 raise RuntimeError("WeChat 进程未运行,无法提取密钥")
162
163 for pid in pids:
164 results = scan_memory_for_raw_keys(pid, wx_dir)
165 if results:
166 return results
167
168 raise RuntimeError("未能从 WeChat 进程内存中提取到密钥")
169
170
171def _find_v2_sample(wx_dir: str) -> Optional[str]:
172 """Find a V2-encrypted .dat file in wx_dir to use as a test sample."""
173 from .decode_image import V2_MAGIC
174
175 # Search Sns/Img and msg/attach for V2 .dat files
176 search_patterns = [
177 os.path.join(wx_dir, "cache", "*", "Sns", "Img", "*", "*"),
178 os.path.join(wx_dir, "msg", "attach", "*", "*", "Img", "*.dat"),
179 ]
180 for pattern in search_patterns:
181 for path in glob.iglob(pattern):
182 if not os.path.isfile(path):
183 continue
184 try:
185 with open(path, "rb") as f:
186 magic = f.read(6)
187 if magic == V2_MAGIC:
188 return path
189 except OSError:
190 continue
191 return None
192
193
194def scan_memory_for_image_key(pid: int, wx_dir: str) -> Optional[str]:
195 """Scan process memory for the 16-byte AES key used to decrypt V2 images.
196
197 Strategy: find a V2 .dat sample file, then scan memory for 16-byte
198 candidate keys (32 hex char sequences). For each candidate, try
199 decrypting the first AES block of the sample and check if the result
200 starts with valid image magic bytes.
201
202 Returns hex string (32 chars) or None.
203 """
204 from .decode_image import decrypt_image_header, is_valid_image_header
205
206 sample_path = _find_v2_sample(wx_dir)
207 if not sample_path:
208 return None
209
210 with open(sample_path, "rb") as f:
211 sample_data = f.read(64) # Only need header + first AES block
212
213 memory = _read_process_memory(pid)
214 if not memory:
215 return None
216
217 hex_chars = set(b'0123456789abcdef')
218
219 for chunk in memory:
220 i = 0
221 while i <= len(chunk) - 32:
222 if chunk[i] not in hex_chars:
223 i += 1
224 continue
225 candidate = chunk[i:i + 32]
226 if not all(b in hex_chars for b in candidate):
227 i += 1
228 continue
229
230 # We have a 32-hex-char candidate
231 try:
232 key_hex = candidate.decode('ascii')
233 key_bytes = bytes.fromhex(key_hex)
234 except (UnicodeDecodeError, ValueError):
235 i += 1
236 continue
237
238 dec_header = decrypt_image_header(sample_data, key_bytes)
239 if dec_header and is_valid_image_header(dec_header):
240 return key_hex
241
242 i += 32 # Skip past this candidate
243
244 return None
245
246
247def extract_image_key(wx_dir: str) -> Optional[str]:
248 """High-level: find WeChat PID and extract the image AES key.
249
250 Returns 32-char hex string or None.
251 """
252 pids = find_wechat_pids()
253 if not pids:
254 return None
255
256 for pid in pids:
257 key = scan_memory_for_image_key(pid, wx_dir)
258 if key:
259 return key
260 return None