{"contents":"# Extract SQLCipher raw keys from a running WeChat process by scanning /proc/\u003cpid\u003e/mem.\n#\n# WeChat 4.x (Linux) uses SQLCipher 4 (AES-256-CBC, HMAC-SHA512, 256K PBKDF2 iterations).\n# WCDB caches the derived raw key in process memory as an ASCII string:\n# x'\u003c64 hex enc_key\u003e\u003c32 hex salt\u003e'\n# where the 32-hex salt matches the first 16 bytes of the corresponding .db file.\n#\n# This module reads /proc/\u003cpid\u003e/maps + /proc/\u003cpid\u003e/mem to find these patterns,\n# then returns a mapping of {db_path: (raw_key_hex, salt_hex)}.\n\nimport glob\nimport os\nimport re\nimport subprocess\nfrom typing import Dict, List, Optional, Tuple\n\n\ndef find_wechat_pids() -\u003e List[int]:\n \"\"\"Return PIDs of running WeChat main processes.\"\"\"\n pids = []\n try:\n result = subprocess.run(\n [\"pgrep\", \"-f\", r\"/wechat\\b\"],\n capture_output=True, text=True, check=False,\n )\n if result.returncode == 0:\n for line in result.stdout.splitlines():\n pid = line.strip()\n if pid.isdigit():\n pids.append(int(pid))\n except Exception:\n pass\n\n if not pids:\n # fallback: scan /proc manually\n for entry in os.listdir(\"/proc\"):\n if not entry.isdigit():\n continue\n try:\n exe = os.readlink(f\"/proc/{entry}/exe\")\n if \"wechat\" in exe.lower() and \"crashpad\" not in exe.lower():\n pids.append(int(entry))\n except Exception:\n continue\n return pids\n\n\ndef _read_process_memory(pid: int, max_region_bytes: int = 200 * 1024 * 1024) -\u003e List[bytes]:\n \"\"\"Read all readable memory regions of a process via /proc.\"\"\"\n chunks = []\n try:\n with open(f\"/proc/{pid}/maps\", \"r\") as f:\n maps = f.readlines()\n except Exception:\n return chunks\n\n with open(f\"/proc/{pid}/mem\", \"rb\") as mem:\n for line in maps:\n parts = line.split()\n if len(parts) \u003c 2 or \"r\" not in parts[1]:\n continue\n addr_range = parts[0].split(\"-\")\n start = int(addr_range[0], 16)\n end = int(addr_range[1], 16)\n if end - start \u003e max_region_bytes:\n continue\n try:\n mem.seek(start)\n chunks.append(mem.read(end - start))\n except (OSError, ValueError):\n continue\n return chunks\n\n\ndef _collect_db_salts(wx_dir: str) -\u003e Dict[str, str]:\n \"\"\"Walk wx_dir and return {db_path: salt_hex} for every .db file.\"\"\"\n salts = {}\n for root, _dirs, files in os.walk(wx_dir):\n for name in files:\n if not name.endswith(\".db\"):\n continue\n path = os.path.join(root, name)\n try:\n with open(path, \"rb\") as f:\n header = f.read(16)\n if len(header) == 16 and header != b\"SQLite format 3\\x00\":\n salts[path] = header.hex()\n except Exception:\n continue\n return salts\n\n\n_HEX_SET = set(\"0123456789abcdef\")\n\n\ndef scan_memory_for_raw_keys(\n pid: int,\n wx_dir: str,\n) -\u003e Dict[str, Tuple[str, str]]:\n \"\"\"\n Scan process memory for SQLCipher raw keys matching database salts.\n\n Returns {db_path: (enc_key_hex, salt_hex)} for each database whose salt\n was found in the WCDB raw-key cache pattern x'\u003c64hex\u003e\u003c32hex\u003e'.\n \"\"\"\n db_salts = _collect_db_salts(wx_dir)\n if not db_salts:\n return {}\n\n memory = _read_process_memory(pid)\n if not memory:\n return {}\n\n results: Dict[str, Tuple[str, str]] = {}\n\n for db_path, salt_hex in db_salts.items():\n if db_path in results:\n continue\n salt_bytes = salt_hex.encode(\"ascii\")\n found = False\n for chunk in memory:\n if found:\n break\n pos = 0\n while True:\n idx = chunk.find(salt_bytes, pos)\n if idx == -1:\n break\n # Check for x'\u003c64hex_key\u003e\u003c32hex_salt\u003e' pattern\n if idx \u003e= 66:\n candidate = chunk[idx - 66 : idx + 32 + 1]\n try:\n s = candidate.decode(\"ascii\")\n except (UnicodeDecodeError, ValueError):\n pos = idx + 1\n continue\n if (\n len(s) == 99\n and s.startswith(\"x'\")\n and s.endswith(\"'\")\n and all(c in _HEX_SET for c in s[2:66])\n ):\n enc_key = s[2:66]\n results[db_path] = (enc_key, salt_hex)\n found = True\n break\n pos = idx + 1\n\n return results\n\n\ndef extract_all_keys(wx_dir: str) -\u003e Dict[str, Tuple[str, str]]:\n \"\"\"\n High-level: find WeChat PID, scan memory, return raw keys for all databases.\n\n Returns {db_path: (enc_key_hex, salt_hex)}.\n Raises RuntimeError if WeChat is not running.\n \"\"\"\n pids = find_wechat_pids()\n if not pids:\n raise RuntimeError(\"WeChat 进程未运行,无法提取密钥\")\n\n for pid in pids:\n results = scan_memory_for_raw_keys(pid, wx_dir)\n if results:\n return results\n\n raise RuntimeError(\"未能从 WeChat 进程内存中提取到密钥\")\n\n\ndef _find_v2_sample(wx_dir: str) -\u003e Optional[str]:\n \"\"\"Find a V2-encrypted .dat file in wx_dir to use as a test sample.\"\"\"\n from .decode_image import V2_MAGIC\n\n # Search Sns/Img and msg/attach for V2 .dat files\n search_patterns = [\n os.path.join(wx_dir, \"cache\", \"*\", \"Sns\", \"Img\", \"*\", \"*\"),\n os.path.join(wx_dir, \"msg\", \"attach\", \"*\", \"*\", \"Img\", \"*.dat\"),\n ]\n for pattern in search_patterns:\n for path in glob.iglob(pattern):\n if not os.path.isfile(path):\n continue\n try:\n with open(path, \"rb\") as f:\n magic = f.read(6)\n if magic == V2_MAGIC:\n return path\n except OSError:\n continue\n return None\n\n\ndef scan_memory_for_image_key(pid: int, wx_dir: str) -\u003e Optional[str]:\n \"\"\"Scan process memory for the 16-byte AES key used to decrypt V2 images.\n\n Strategy: find a V2 .dat sample file, then scan memory for 16-byte\n candidate keys (32 hex char sequences). For each candidate, try\n decrypting the first AES block of the sample and check if the result\n starts with valid image magic bytes.\n\n Returns hex string (32 chars) or None.\n \"\"\"\n from .decode_image import decrypt_image_header, is_valid_image_header\n\n sample_path = _find_v2_sample(wx_dir)\n if not sample_path:\n return None\n\n with open(sample_path, \"rb\") as f:\n sample_data = f.read(64) # Only need header + first AES block\n\n memory = _read_process_memory(pid)\n if not memory:\n return None\n\n hex_chars = set(b'0123456789abcdef')\n\n for chunk in memory:\n i = 0\n while i \u003c= len(chunk) - 32:\n if chunk[i] not in hex_chars:\n i += 1\n continue\n candidate = chunk[i:i + 32]\n if not all(b in hex_chars for b in candidate):\n i += 1\n continue\n\n # We have a 32-hex-char candidate\n try:\n key_hex = candidate.decode('ascii')\n key_bytes = bytes.fromhex(key_hex)\n except (UnicodeDecodeError, ValueError):\n i += 1\n continue\n\n dec_header = decrypt_image_header(sample_data, key_bytes)\n if dec_header and is_valid_image_header(dec_header):\n return key_hex\n\n i += 32 # Skip past this candidate\n\n return None\n\n\ndef extract_image_key(wx_dir: str) -\u003e Optional[str]:\n \"\"\"High-level: find WeChat PID and extract the image AES key.\n\n Returns 32-char hex string or None.\n \"\"\"\n pids = find_wechat_pids()\n if not pids:\n return None\n\n for pid in pids:\n key = scan_memory_for_image_key(pid, wx_dir)\n if key:\n return key\n return None\n","is_binary":false,"path":"wxdump_linux/linux/memscan.py","ref":""}