{"contents":"# -*- coding: utf-8 -*-#\n# -------------------------------------------------------------------------------\n# Name: main.py.py\n# Description: \n# Author: xaoyaoo\n# Date: 2023/10/14\n# -------------------------------------------------------------------------------\nimport argparse\nimport os\nimport sys\n\nfrom wxdump_linux import *\nimport wxdump_linux\nfrom wxdump_linux.linux import find_wechat_files_path, extract_all_keys, extract_image_key\n\nwxdump_ascii = r\"\"\"\n██╗ ██╗██╗ ██╗██████╗ ██╗ ██╗███╗ ███╗██████╗ ██╗ ██╗███╗ ██╗██╗ ██╗██╗ ██╗\n██║ ██║╚██╗██╔╝██╔══██╗██║ ██║████╗ ████║██╔══██╗ ██║ ██║████╗ ██║██║ ██║╚██╗██╔╝\n██║ █╗ ██║ ╚███╔╝ ██║ ██║██║ ██║██╔████╔██║██████╔╝█████╗██║ ██║██╔██╗ ██║██║ ██║ ╚███╔╝\n██║███╗██║ ██╔██╗ ██║ ██║██║ ██║██║╚██╔╝██║██╔═══╝ ╚════╝██║ ██║██║╚██╗██║██║ ██║ ██╔██╗\n╚███╔███╔╝██╔╝ ██╗██████╔╝╚██████╔╝██║ ╚═╝ ██║██║ ███████╗██║██║ ╚████║╚██████╔╝██╔╝ ██╗\n ╚══╝╚══╝ ╚═╝ ╚═╝╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚══════╝╚═╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝ ╚═╝\n\"\"\"\nWXDUMP_VERSION = wxdump_linux.__version__\n\nmodels = {}\n\n\ndef create_parser():\n class CustomArgumentParser(argparse.ArgumentParser):\n def format_help(self):\n # 首先显示软件简介\n # 定义软件简介文本并进行格式化\n line_len = 70\n WXDUMP_VERSION = wxdump_linux.__version__\n wxdump_line = '\\n'.join([f'\\033[36m{line:^{line_len}}\\033[0m' for line in wxdump_ascii.split('\\n') if line])\n first_line = f'\\033[36m{\" wxdump-linux v\" + WXDUMP_VERSION + \" \":=^{line_len}}\\033[0m'\n brief = 'wxdump-linux: Linux 版微信数据库解密工具(密钥提取、解密、合并、Web 查看)'\n other = '基于 PyWxDump,感谢原作者 xaoyaoo'\n\n separator = f'\\033[36m{\" options \":-^{line_len}}\\033[0m'\n\n # 获取帮助信息并添加到软件简介下方\n help_text = super().format_help().strip()\n\n return f'\\n{wxdump_line}\\n\\n{first_line}\\n{brief}\\n{separator}\\n{help_text}\\n{separator}\\n{other}\\n{first_line}\\n'\n\n # 创建命令行参数解析器\n parser = CustomArgumentParser(formatter_class=argparse.RawTextHelpFormatter)\n parser.add_argument('-V', '--version', action='version', version=f\"wxdump-linux v{WXDUMP_VERSION}\")\n\n # 添加子命令解析器\n subparsers = parser.add_subparsers(dest=\"mode\", help=\"\"\"运行模式:\"\"\", required=True, metavar=\"mode\")\n\n return parser, subparsers\n\n\nmain_parser, sub_parsers = create_parser()\n\n\nclass SubMainMetaclass(type):\n\n def is_implemented_method(cls, name: str, method: str):\n if not hasattr(cls, method) or not callable(getattr(cls, method)):\n raise NotImplementedError(\"{} NotImplemented [{}]\".format(name, method))\n\n def __init__(cls, name, bases, kwargs):\n super(SubMainMetaclass, cls).__init__(name, bases, kwargs)\n\n if name in [\"BaseSubMainClass\"]:\n return\n\n mode = getattr(cls, \"mode\")\n if mode in models:\n raise TypeError(\"mode[{}] is used...\".format(mode))\n\n cls.is_implemented_method(name, \"init_parses\")\n cls.is_implemented_method(name, \"run\")\n\n c = cls()\n models[mode] = c\n c.init_parses(sub_parsers.add_parser(mode, **getattr(c, \"parser_kwargs\")))\n\n\nclass BaseSubMainClass(metaclass=SubMainMetaclass):\n parser_kwargs = {}\n\n @property\n def mode(self) -\u003e str:\n raise NotImplementedError()\n\n def init_parses(self, parser):\n raise NotImplementedError()\n\n def run(self, args: argparse.Namespace):\n raise NotImplementedError()\n\n\nclass MainWxInfo(BaseSubMainClass):\n mode = \"info\"\n parser_kwargs = {\"help\": \"获取微信信息(Linux 版:从进程内存提取)\"}\n\n def init_parses(self, parser):\n parser.add_argument(\"-s\", '--save_path', metavar=\"\", type=str, help=\"(可选)保存路径【json文件】\")\n return parser\n\n def run(self, args):\n print(f\"[*] wxdump-linux v{wxdump_linux.__version__}\")\n save_path = args.save_path\n result = get_wx_info(is_print=True, save_path=save_path)\n return result\n\n\nclass MainWxDbPath(BaseSubMainClass):\n mode = \"wx_path\"\n parser_kwargs = {\"help\": \"获取微信文件夹路径\"}\n\n def init_parses(self, parser):\n # 添加 'wx_db_path' 子命令解析器\n parser.add_argument(\"-r\", \"--db_types\", type=str,\n help=\"(可选)需要的数据库名称(eg: -r MediaMSG;MicroMsg;FTSMSG;MSG;Sns;Emotion )\",\n default=None, metavar=\"\")\n parser.add_argument(\"-wf\", \"--wx_files\", type=str, help=\"(可选)'WeChat Files'路径\", default=None,\n metavar=\"\")\n parser.add_argument(\"-id\", \"--wxid\", type=str, help=\"(可选)wxid_,用于确认用户文件夹\",\n default=None, metavar=\"\")\n return parser\n\n def run(self, args):\n print(f\"[*] wxdump-linux v{wxdump_linux.__version__}\")\n # 从命令行参数获取值\n db_types = args.db_types\n msg_dir = args.wx_files\n wxid = args.wxid\n ret = get_wx_db(msg_dir=msg_dir, db_types=db_types, wxids=wxid)\n for i in ret: print(i)\n return ret\n\n\nclass MainLinuxKey(BaseSubMainClass):\n mode = \"linux_key\"\n parser_kwargs = {\"help\": \"Linux 版微信: 从进程内存提取数据库密钥\"}\n\n def init_parses(self, parser):\n parser.add_argument(\"--wx-dir\", type=str, default=None, metavar=\"\",\n help=\"(可选)微信数据目录,默认自动检测\")\n return parser\n\n def run(self, args):\n print(f\"[*] wxdump-linux v{wxdump_linux.__version__}\")\n wx_dir = args.wx_dir or find_wechat_files_path()\n print(f\"[+] wx_dir: {wx_dir if wx_dir else 'None'}\")\n if not wx_dir:\n print(\"[-] 未找到微信数据目录\")\n return None\n try:\n db_keys = extract_all_keys(wx_dir)\n except RuntimeError as e:\n print(f\"[-] {e}\")\n return None\n print(f\"[+] 从进程内存提取到 {len(db_keys)} 个数据库密钥:\")\n for db_path, (key, salt) in sorted(db_keys.items()):\n rel = os.path.relpath(db_path, wx_dir)\n print(f\" {rel}: key={key}\")\n # Extract image AES key\n image_key = extract_image_key(wx_dir)\n if image_key:\n print(f\"[+] 图片 AES key: {image_key}\")\n else:\n print(\"[-] 未找到图片 AES key(可能没有 V2 格式图片)\")\n return {\"wx_dir\": wx_dir, \"db_keys\": db_keys, \"image_key\": image_key}\n\n\nclass MainDecrypt(BaseSubMainClass):\n mode = \"decrypt\"\n parser_kwargs = {\"help\": \"解密微信数据库\"}\n\n def init_parses(self, parser):\n parser.add_argument(\"-i\", \"--db_path\", type=str, help=\"数据库路径(目录)\", required=False, metavar=\"\")\n parser.add_argument(\"-o\", \"--out_path\", type=str, default=os.path.join(os.getcwd(), \"decrypted\"),\n help=\"输出路径(目录)[默认为当前路径下decrypted文件夹]\", required=False,\n metavar=\"\")\n return parser\n\n def run(self, args):\n print(f\"[*] wxdump-linux v{wxdump_linux.__version__}\")\n db_path = args.db_path or find_wechat_files_path()\n out_path = args.out_path\n\n if not os.path.exists(out_path):\n os.makedirs(out_path)\n print(f\"[+] 创建输出文件夹:{out_path}\")\n\n if not db_path or not os.path.exists(db_path):\n print(f\"[-] 数据库路径不存在:{db_path}\")\n return\n\n try:\n print(\"[*] 从进程内存提取密钥...\")\n db_keys = extract_all_keys(db_path)\n print(f\"[+] 提取到 {len(db_keys)} 个数据库密钥,开始解密...\")\n result = batch_decrypt(db_keys, out_path, True)\n return result\n except RuntimeError as e:\n print(f\"[-] {e}\")\n return\n\n\nclass MainMerge(BaseSubMainClass):\n mode = \"merge\"\n parser_kwargs = {\"help\": \"[测试功能]合并微信数据库(MSG.db or MediaMSG.db)\"}\n\n def init_parses(self, parser):\n # 添加 'merge' 子命令解析器\n parser.add_argument(\"-i\", \"--db_path\", type=str, help=\"数据库路径(文件路径,使用英文[,]分割)\", required=True,\n metavar=\"\")\n parser.add_argument(\"-o\", \"--out_path\", type=str, default=os.path.join(os.getcwd(), \"decrypted\"),\n help=\"输出路径(目录或文件名)[默认为当前路径下decrypted文件夹下merge_***.db]\",\n required=False,\n metavar=\"\")\n return parser\n\n def run(self, args):\n print(f\"[*] wxdump-linux v{wxdump_linux.__version__}\")\n # 从命令行参数获取值\n db_path = args.db_path\n out_path = args.out_path\n\n db_path = db_path.split(\",\")\n db_path = [i.strip() for i in db_path]\n dbpaths = []\n for i in db_path:\n if not os.path.exists(i): # 判断路径是否存在\n print(f\"[-] 数据库路径不存在:{i}\")\n return\n if os.path.isdir(i): # 如果是文件夹,则获取文件夹下所有的db文件\n dbpaths += [os.path.join(i, j) for j in os.listdir(i) if j.endswith(\".db\")]\n else: # 如果是文件,则直接添加\n dbpaths.append(i)\n\n if (not out_path.endswith(\".db\")) and (not os.path.exists(out_path)):\n os.makedirs(out_path)\n print(f\"[+] 创建输出文件夹:{out_path}\")\n\n print(f\"[*] 合并中...(用时较久,耐心等待)\")\n dbpaths = [{\"db_path\": i} for i in dbpaths if os.path.exists(i)] # 去除不存在的路径\n result = merge_db(dbpaths, out_path)\n\n print(f\"[+] 合并完成:{result}\")\n return result\n\n\nclass MainShowChatRecords(BaseSubMainClass):\n mode = \"dbshow\"\n parser_kwargs = {\"help\": \"聊天记录查看\"}\n\n def init_parses(self, parser):\n # 添加 'dbshow' 子命令解析器\n parser.add_argument(\"-merge\", \"--merge_path\", type=str, help=\"解密并合并后的 merge_all.db 的路径\",\n required=False, metavar=\"\")\n parser.add_argument(\"-wid\", \"--wx_path\", type=str,\n help=\"(可选)微信文件夹的路径(用于显示图片)\", required=False,\n metavar=\"\")\n parser.add_argument(\"-myid\", \"--my_wxid\", type=str, help=\"(可选)微信账号(本人微信id)\", required=False,\n default=\"\", metavar=\"\")\n parser.add_argument(\"--online\", action='store_true', help=\"(可选)是否在线查看(局域网查看)\", required=False,\n default=False)\n # parser.add_argument(\"-k\", \"--key\", type=str, help=\"(可选)密钥\", required=False, metavar=\"\")\n return parser\n\n def run(self, args):\n print(f\"[*] wxdump-linux v{wxdump_linux.__version__}\")\n # (merge)和(msg_path,micro_path,media_path) 二选一\n # if not args.merge_path and not (args.msg_path and args.micro_path and args.media_path):\n # print(\"[-] 请输入数据库路径([merge_path] or [msg_path, micro_path, media_path])\")\n # return\n\n # 目前仅能支持merge database\n if not args.merge_path:\n print(\"[-] 请输入数据库路径([merge_path])\")\n return\n\n # 从命令行参数获取值\n merge_path = args.merge_path\n\n online = args.online\n\n if not os.path.exists(merge_path):\n print(\"[-] 输入数据库路径不存在\")\n return\n\n start_server(merge_path=merge_path, wx_path=args.wx_path, my_wxid=args.my_wxid, online=online)\n\n\nclass MainExportChatRecords(BaseSubMainClass):\n mode = \"export\"\n parser_kwargs = {\"help\": \"[已废弃]聊天记录导出为html\"}\n\n def init_parses(self, parser):\n # 添加 'export' 子命令解析器\n return parser\n\n def run(self, args):\n print(f\"[*] wxdump-linux v{wxdump_linux.__version__}\")\n print(\"[+] export命令已废弃,请使用ui命令[wxdump ui]或api命令[wxdump api]启动服务\")\n\n\nclass MainAll(BaseSubMainClass):\n mode = \"all\"\n parser_kwargs = {\"help\": \"[已废弃]获取微信信息,解密微信数据库,查看聊天记录\"}\n\n def init_parses(self, parser):\n # 添加 'all' 子命令解析器\n return parser\n\n def run(self, args):\n print(f\"[*] wxdump-linux v{wxdump_linux.__version__}\")\n print(\"[+] all命令已废弃,请使用ui命令[wxdump ui]或api命令[wxdump api]启动服务\")\n\n\nclass MainUi(BaseSubMainClass):\n mode = \"ui\"\n parser_kwargs = {\"help\": \"启动UI界面\"}\n\n def init_parses(self, parser):\n # 添加 'ui' 子命令解析器\n parser.add_argument(\"-p\", '--port', metavar=\"\", type=int, help=\"(可选)端口号\", default=5000)\n parser.add_argument(\"--online\", help=\"(可选)是否在线查看(局域网查看)\", default=False, action='store_true')\n parser.add_argument(\"--debug\", help=\"(可选)是否开启debug模式\", default=False, action='store_true')\n parser.add_argument(\"--noOpenBrowser\", dest='isOpenBrowser', default=True, action='store_false',\n help=\"(可选)用于禁用自动打开浏览器\")\n return parser\n\n def run(self, args):\n print(f\"[*] wxdump-linux v{wxdump_linux.__version__}\")\n # 从命令行参数获取值\n online = args.online\n port = args.port\n debug = args.debug\n isopenBrowser = args.isOpenBrowser\n\n start_server(port=port, online=online, debug=debug, isopenBrowser=isopenBrowser)\n\n\nclass MainApi(BaseSubMainClass):\n mode = \"api\"\n parser_kwargs = {\"help\": \"启动api,不打开浏览器\"}\n\n def init_parses(self, parser):\n # 添加 'api' 子命令解析器\n parser.add_argument(\"-p\", '--port', metavar=\"\", type=int, help=\"(可选)端口号\", default=5000)\n parser.add_argument(\"--online\", help=\"(可选)是否在线查看(局域网查看)\", default=False, action='store_true')\n parser.add_argument(\"--debug\", action='store_true', help=\"(可选)是否开启debug模式\", default=False)\n return parser\n\n def run(self, args):\n print(f\"[*] wxdump-linux v{wxdump_linux.__version__}\")\n # 从命令行参数获取值\n online = args.online\n port = args.port\n debug = args.debug\n\n start_server(port=port, online=online, debug=debug, isopenBrowser=False)\n\n\ndef console_run():\n # 检查是否需要显示帮助信息\n if len(sys.argv) == 1:\n sys.argv.append(MainUi.mode)\n elif len(sys.argv) == 2 and sys.argv[1] not in models.keys():\n sys.argv.append('-h')\n main_parser.print_help()\n return\n\n args = main_parser.parse_args() # 解析命令行参数\n\n if not any(vars(args).values()):\n main_parser.print_help()\n return\n\n # 根据不同的 'mode' 参数,执行不同的操作\n models[args.mode].run(args)\n\n\nif __name__ == '__main__':\n console_run()\n","is_binary":false,"path":"wxdump_linux/cli.py","ref":""}