解锁并提取Linux客户端微信数据库 (vibe coded)
at 260 lines 8.2 kB view raw
1# Extract SQLCipher raw keys from a running WeChat process by scanning /proc/<pid>/mem. 2# 3# WeChat 4.x (Linux) uses SQLCipher 4 (AES-256-CBC, HMAC-SHA512, 256K PBKDF2 iterations). 4# WCDB caches the derived raw key in process memory as an ASCII string: 5# x'<64 hex enc_key><32 hex salt>' 6# where the 32-hex salt matches the first 16 bytes of the corresponding .db file. 7# 8# This module reads /proc/<pid>/maps + /proc/<pid>/mem to find these patterns, 9# then returns a mapping of {db_path: (raw_key_hex, salt_hex)}. 10 11import glob 12import os 13import re 14import subprocess 15from typing import Dict, List, Optional, Tuple 16 17 18def find_wechat_pids() -> List[int]: 19 """Return PIDs of running WeChat main processes.""" 20 pids = [] 21 try: 22 result = subprocess.run( 23 ["pgrep", "-f", r"/wechat\b"], 24 capture_output=True, text=True, check=False, 25 ) 26 if result.returncode == 0: 27 for line in result.stdout.splitlines(): 28 pid = line.strip() 29 if pid.isdigit(): 30 pids.append(int(pid)) 31 except Exception: 32 pass 33 34 if not pids: 35 # fallback: scan /proc manually 36 for entry in os.listdir("/proc"): 37 if not entry.isdigit(): 38 continue 39 try: 40 exe = os.readlink(f"/proc/{entry}/exe") 41 if "wechat" in exe.lower() and "crashpad" not in exe.lower(): 42 pids.append(int(entry)) 43 except Exception: 44 continue 45 return pids 46 47 48def _read_process_memory(pid: int, max_region_bytes: int = 200 * 1024 * 1024) -> List[bytes]: 49 """Read all readable memory regions of a process via /proc.""" 50 chunks = [] 51 try: 52 with open(f"/proc/{pid}/maps", "r") as f: 53 maps = f.readlines() 54 except Exception: 55 return chunks 56 57 with open(f"/proc/{pid}/mem", "rb") as mem: 58 for line in maps: 59 parts = line.split() 60 if len(parts) < 2 or "r" not in parts[1]: 61 continue 62 addr_range = parts[0].split("-") 63 start = int(addr_range[0], 16) 64 end = int(addr_range[1], 16) 65 if end - start > max_region_bytes: 66 continue 67 try: 68 mem.seek(start) 69 chunks.append(mem.read(end - start)) 70 except (OSError, ValueError): 71 continue 72 return chunks 73 74 75def _collect_db_salts(wx_dir: str) -> Dict[str, str]: 76 """Walk wx_dir and return {db_path: salt_hex} for every .db file.""" 77 salts = {} 78 for root, _dirs, files in os.walk(wx_dir): 79 for name in files: 80 if not name.endswith(".db"): 81 continue 82 path = os.path.join(root, name) 83 try: 84 with open(path, "rb") as f: 85 header = f.read(16) 86 if len(header) == 16 and header != b"SQLite format 3\x00": 87 salts[path] = header.hex() 88 except Exception: 89 continue 90 return salts 91 92 93_HEX_SET = set("0123456789abcdef") 94 95 96def scan_memory_for_raw_keys( 97 pid: int, 98 wx_dir: str, 99) -> Dict[str, Tuple[str, str]]: 100 """ 101 Scan process memory for SQLCipher raw keys matching database salts. 102 103 Returns {db_path: (enc_key_hex, salt_hex)} for each database whose salt 104 was found in the WCDB raw-key cache pattern x'<64hex><32hex>'. 105 """ 106 db_salts = _collect_db_salts(wx_dir) 107 if not db_salts: 108 return {} 109 110 memory = _read_process_memory(pid) 111 if not memory: 112 return {} 113 114 results: Dict[str, Tuple[str, str]] = {} 115 116 for db_path, salt_hex in db_salts.items(): 117 if db_path in results: 118 continue 119 salt_bytes = salt_hex.encode("ascii") 120 found = False 121 for chunk in memory: 122 if found: 123 break 124 pos = 0 125 while True: 126 idx = chunk.find(salt_bytes, pos) 127 if idx == -1: 128 break 129 # Check for x'<64hex_key><32hex_salt>' pattern 130 if idx >= 66: 131 candidate = chunk[idx - 66 : idx + 32 + 1] 132 try: 133 s = candidate.decode("ascii") 134 except (UnicodeDecodeError, ValueError): 135 pos = idx + 1 136 continue 137 if ( 138 len(s) == 99 139 and s.startswith("x'") 140 and s.endswith("'") 141 and all(c in _HEX_SET for c in s[2:66]) 142 ): 143 enc_key = s[2:66] 144 results[db_path] = (enc_key, salt_hex) 145 found = True 146 break 147 pos = idx + 1 148 149 return results 150 151 152def extract_all_keys(wx_dir: str) -> Dict[str, Tuple[str, str]]: 153 """ 154 High-level: find WeChat PID, scan memory, return raw keys for all databases. 155 156 Returns {db_path: (enc_key_hex, salt_hex)}. 157 Raises RuntimeError if WeChat is not running. 158 """ 159 pids = find_wechat_pids() 160 if not pids: 161 raise RuntimeError("WeChat 进程未运行,无法提取密钥") 162 163 for pid in pids: 164 results = scan_memory_for_raw_keys(pid, wx_dir) 165 if results: 166 return results 167 168 raise RuntimeError("未能从 WeChat 进程内存中提取到密钥") 169 170 171def _find_v2_sample(wx_dir: str) -> Optional[str]: 172 """Find a V2-encrypted .dat file in wx_dir to use as a test sample.""" 173 from .decode_image import V2_MAGIC 174 175 # Search Sns/Img and msg/attach for V2 .dat files 176 search_patterns = [ 177 os.path.join(wx_dir, "cache", "*", "Sns", "Img", "*", "*"), 178 os.path.join(wx_dir, "msg", "attach", "*", "*", "Img", "*.dat"), 179 ] 180 for pattern in search_patterns: 181 for path in glob.iglob(pattern): 182 if not os.path.isfile(path): 183 continue 184 try: 185 with open(path, "rb") as f: 186 magic = f.read(6) 187 if magic == V2_MAGIC: 188 return path 189 except OSError: 190 continue 191 return None 192 193 194def scan_memory_for_image_key(pid: int, wx_dir: str) -> Optional[str]: 195 """Scan process memory for the 16-byte AES key used to decrypt V2 images. 196 197 Strategy: find a V2 .dat sample file, then scan memory for 16-byte 198 candidate keys (32 hex char sequences). For each candidate, try 199 decrypting the first AES block of the sample and check if the result 200 starts with valid image magic bytes. 201 202 Returns hex string (32 chars) or None. 203 """ 204 from .decode_image import decrypt_image_header, is_valid_image_header 205 206 sample_path = _find_v2_sample(wx_dir) 207 if not sample_path: 208 return None 209 210 with open(sample_path, "rb") as f: 211 sample_data = f.read(64) # Only need header + first AES block 212 213 memory = _read_process_memory(pid) 214 if not memory: 215 return None 216 217 hex_chars = set(b'0123456789abcdef') 218 219 for chunk in memory: 220 i = 0 221 while i <= len(chunk) - 32: 222 if chunk[i] not in hex_chars: 223 i += 1 224 continue 225 candidate = chunk[i:i + 32] 226 if not all(b in hex_chars for b in candidate): 227 i += 1 228 continue 229 230 # We have a 32-hex-char candidate 231 try: 232 key_hex = candidate.decode('ascii') 233 key_bytes = bytes.fromhex(key_hex) 234 except (UnicodeDecodeError, ValueError): 235 i += 1 236 continue 237 238 dec_header = decrypt_image_header(sample_data, key_bytes) 239 if dec_header and is_valid_image_header(dec_header): 240 return key_hex 241 242 i += 32 # Skip past this candidate 243 244 return None 245 246 247def extract_image_key(wx_dir: str) -> Optional[str]: 248 """High-level: find WeChat PID and extract the image AES key. 249 250 Returns 32-char hex string or None. 251 """ 252 pids = find_wechat_pids() 253 if not pids: 254 return None 255 256 for pid in pids: 257 key = scan_memory_for_image_key(pid, wx_dir) 258 if key: 259 return key 260 return None