{"contents":"# -*- coding: utf-8 -*-#\n# Linux-specific wx_info implementation for native WeChat (AppImage) builds.\n\nimport json\nimport os\nimport re\nfrom typing import List, Union, Optional, Dict, Tuple\n\nfrom .utils import verify_raw_key, wx_core_error, wx_core_loger, CORE_DB_TYPE\nfrom ..linux import find_wechat_files_path\nfrom ..linux.memscan import find_wechat_pids, extract_all_keys\n\n\n@wx_core_error\ndef get_wx_dir_by_linux(wxid: str = \"all\") -\u003e Optional[str]:\n base = find_wechat_files_path()\n if not base:\n return None\n\n if wxid and wxid != \"all\":\n # if base already is wxid dir, accept it\n if os.path.basename(base).startswith(\"wxid_\") and os.path.exists(base):\n return base\n candidate = os.path.join(base, wxid)\n return candidate if os.path.exists(candidate) else None\n return base\n\n\n@wx_core_error\ndef get_wx_raw_keys(wx_dir: str = \"\") -\u003e Dict[str, Tuple[str, str]]:\n \"\"\"\n 从运行中的微信进程内存中提取所有数据库的 raw key。\n 返回 {db_path: (enc_key_hex, salt_hex)}。\n \"\"\"\n if not wx_dir:\n wx_dir = get_wx_dir_by_linux(\"all\")\n if not wx_dir:\n wx_core_loger.error(\"未找到微信数据目录\")\n return {}\n try:\n return extract_all_keys(wx_dir)\n except RuntimeError as e:\n wx_core_loger.error(str(e))\n return {}\n\n\n@wx_core_error\ndef get_wx_info(WX_OFFS: dict = None, is_print: bool = False, save_path: str = None):\n \"\"\"\n Linux: return minimal info (wx_dir + raw keys from process memory).\n \"\"\"\n if WX_OFFS is None:\n WX_OFFS = {}\n\n wx_dir = get_wx_dir_by_linux(\"all\")\n raw_keys = get_wx_raw_keys(wx_dir or \"\")\n\n # Pick any key for backward compat\n key = None\n if raw_keys:\n key, _ = next(iter(raw_keys.values()))\n\n pids = find_wechat_pids()\n\n result = [{\n \"pid\": pids[0] if pids else None,\n \"version\": \"linux\",\n \"account\": None,\n \"mobile\": None,\n \"nickname\": None,\n \"mail\": None,\n \"wxid\": None,\n \"key\": key,\n \"wx_dir\": wx_dir,\n }]\n\n if is_print:\n print(\"=\" * 32)\n for rlt in result:\n for k, v in rlt.items():\n print(f\"[+] {k:\u003e8}: {v if v else 'None'}\")\n print(\"=\" * 32)\n\n if save_path:\n try:\n infos = json.load(open(save_path, \"r\", encoding=\"utf-8\")) if os.path.exists(save_path) else []\n except Exception:\n infos = []\n with open(save_path, \"w\", encoding=\"utf-8\") as f:\n infos += result\n json.dump(infos, f, ensure_ascii=False, indent=4)\n return result\n\n\n@wx_core_error\ndef get_wx_db(msg_dir: str = None,\n db_types: Union[List[str], str] = None,\n wxids: Union[List[str], str] = None) -\u003e List[dict]:\n r\"\"\"\n Linux: 获取微信数据库路径\n \"\"\"\n result = []\n\n if not msg_dir or not os.path.exists(msg_dir):\n wx_core_loger.warning(f\"[-] 微信文件目录不存在: {msg_dir}, 将使用默认路径\")\n msg_dir = get_wx_dir_by_linux(\"all\")\n\n if not msg_dir or not os.path.exists(msg_dir):\n wx_core_loger.error(f\"[-] 目录不存在: {msg_dir}\", exc_info=True)\n return result\n\n wxids = wxids.split(\";\") if isinstance(wxids, str) else wxids\n if not isinstance(wxids, list) or len(wxids) \u003c= 0:\n wxids = None\n db_types = db_types.split(\";\") if isinstance(db_types, str) and db_types else db_types\n if not isinstance(db_types, list) or len(db_types) \u003c= 0:\n db_types = None\n\n wxid_dirs = {}\n if wxids or \"all_users\" in os.listdir(msg_dir) or \"wmpf\" in os.listdir(msg_dir) or \"applet\" in os.listdir(msg_dir):\n for sub_dir in os.listdir(msg_dir):\n if os.path.isdir(os.path.join(msg_dir, sub_dir)) and sub_dir.lower() not in [\"all users\", \"all_users\", \"applet\", \"wmpf\"]:\n wxid_dirs[os.path.basename(sub_dir)] = os.path.join(msg_dir, sub_dir)\n else:\n wxid_dirs[os.path.basename(msg_dir)] = msg_dir\n\n for wxid, wxid_dir in wxid_dirs.items():\n if wxids and wxid not in wxids:\n continue\n for root, _, files in os.walk(wxid_dir):\n for file_name in files:\n if not file_name.endswith(\".db\"):\n continue\n db_type = re.sub(r\"\\d*\\.db$\", \"\", file_name)\n if db_types and db_type not in db_types:\n continue\n db_path = os.path.join(root, file_name)\n result.append({\"wxid\": wxid, \"db_type\": db_type, \"db_path\": db_path, \"wxid_dir\": wxid_dir})\n return result\n\n\n@wx_core_error\ndef get_core_db(wx_path: str, db_types: list = None):\n if not os.path.exists(wx_path):\n return False, f\"[-] 目录不存在: {wx_path}\"\n\n if not db_types:\n db_types = CORE_DB_TYPE\n db_types = [dt for dt in db_types if dt in CORE_DB_TYPE]\n msg_dir = os.path.dirname(wx_path)\n my_wxid = os.path.basename(wx_path)\n wxdbpaths = get_wx_db(msg_dir=msg_dir, db_types=db_types, wxids=my_wxid)\n\n if len(wxdbpaths) == 0:\n wx_core_loger.error(f\"[-] get_core_db 未获取到数据库路径\")\n return False, \"未获取到数据库路径\"\n return True, wxdbpaths\n","is_binary":false,"path":"wxdump_linux/wx_core/wx_info_linux.py","ref":""}