解锁并提取Linux客户端微信数据库 (vibe coded)
at 386 lines 16 kB view raw
1# -*- coding: utf-8 -*-# 2# ------------------------------------------------------------------------------- 3# Name: main.py.py 4# Description: 5# Author: xaoyaoo 6# Date: 2023/10/14 7# ------------------------------------------------------------------------------- 8import argparse 9import os 10import sys 11 12from wxdump_linux import * 13import wxdump_linux 14from wxdump_linux.linux import find_wechat_files_path, extract_all_keys, extract_image_key 15 16wxdump_ascii = r""" 17██╗ ██╗██╗ ██╗██████╗ ██╗ ██╗███╗ ███╗██████╗ ██╗ ██╗███╗ ██╗██╗ ██╗██╗ ██╗ 18██║ ██║╚██╗██╔╝██╔══██╗██║ ██║████╗ ████║██╔══██╗ ██║ ██║████╗ ██║██║ ██║╚██╗██╔╝ 19██║ █╗ ██║ ╚███╔╝ ██║ ██║██║ ██║██╔████╔██║██████╔╝█████╗██║ ██║██╔██╗ ██║██║ ██║ ╚███╔╝ 20██║███╗██║ ██╔██╗ ██║ ██║██║ ██║██║╚██╔╝██║██╔═══╝ ╚════╝██║ ██║██║╚██╗██║██║ ██║ ██╔██╗ 21╚███╔███╔╝██╔╝ ██╗██████╔╝╚██████╔╝██║ ╚═╝ ██║██║ ███████╗██║██║ ╚████║╚██████╔╝██╔╝ ██╗ 22 ╚══╝╚══╝ ╚═╝ ╚═╝╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚══════╝╚═╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝ ╚═╝ 23""" 24WXDUMP_VERSION = wxdump_linux.__version__ 25 26models = {} 27 28 29def create_parser(): 30 class CustomArgumentParser(argparse.ArgumentParser): 31 def format_help(self): 32 # 首先显示软件简介 33 # 定义软件简介文本并进行格式化 34 line_len = 70 35 WXDUMP_VERSION = wxdump_linux.__version__ 36 wxdump_line = '\n'.join([f'\033[36m{line:^{line_len}}\033[0m' for line in wxdump_ascii.split('\n') if line]) 37 first_line = f'\033[36m{" wxdump-linux v" + WXDUMP_VERSION + " ":=^{line_len}}\033[0m' 38 brief = 'wxdump-linux: Linux 版微信数据库解密工具(密钥提取、解密、合并、Web 查看)' 39 other = '基于 PyWxDump,感谢原作者 xaoyaoo' 40 41 separator = f'\033[36m{" options ":-^{line_len}}\033[0m' 42 43 # 获取帮助信息并添加到软件简介下方 44 help_text = super().format_help().strip() 45 46 return f'\n{wxdump_line}\n\n{first_line}\n{brief}\n{separator}\n{help_text}\n{separator}\n{other}\n{first_line}\n' 47 48 # 创建命令行参数解析器 49 parser = CustomArgumentParser(formatter_class=argparse.RawTextHelpFormatter) 50 parser.add_argument('-V', '--version', action='version', version=f"wxdump-linux v{WXDUMP_VERSION}") 51 52 # 添加子命令解析器 53 subparsers = parser.add_subparsers(dest="mode", help="""运行模式:""", required=True, metavar="mode") 54 55 return parser, subparsers 56 57 58main_parser, sub_parsers = create_parser() 59 60 61class SubMainMetaclass(type): 62 63 def is_implemented_method(cls, name: str, method: str): 64 if not hasattr(cls, method) or not callable(getattr(cls, method)): 65 raise NotImplementedError("{} NotImplemented [{}]".format(name, method)) 66 67 def __init__(cls, name, bases, kwargs): 68 super(SubMainMetaclass, cls).__init__(name, bases, kwargs) 69 70 if name in ["BaseSubMainClass"]: 71 return 72 73 mode = getattr(cls, "mode") 74 if mode in models: 75 raise TypeError("mode[{}] is used...".format(mode)) 76 77 cls.is_implemented_method(name, "init_parses") 78 cls.is_implemented_method(name, "run") 79 80 c = cls() 81 models[mode] = c 82 c.init_parses(sub_parsers.add_parser(mode, **getattr(c, "parser_kwargs"))) 83 84 85class BaseSubMainClass(metaclass=SubMainMetaclass): 86 parser_kwargs = {} 87 88 @property 89 def mode(self) -> str: 90 raise NotImplementedError() 91 92 def init_parses(self, parser): 93 raise NotImplementedError() 94 95 def run(self, args: argparse.Namespace): 96 raise NotImplementedError() 97 98 99class MainWxInfo(BaseSubMainClass): 100 mode = "info" 101 parser_kwargs = {"help": "获取微信信息(Linux 版:从进程内存提取)"} 102 103 def init_parses(self, parser): 104 parser.add_argument("-s", '--save_path', metavar="", type=str, help="(可选)保存路径【json文件】") 105 return parser 106 107 def run(self, args): 108 print(f"[*] wxdump-linux v{wxdump_linux.__version__}") 109 save_path = args.save_path 110 result = get_wx_info(is_print=True, save_path=save_path) 111 return result 112 113 114class MainWxDbPath(BaseSubMainClass): 115 mode = "wx_path" 116 parser_kwargs = {"help": "获取微信文件夹路径"} 117 118 def init_parses(self, parser): 119 # 添加 'wx_db_path' 子命令解析器 120 parser.add_argument("-r", "--db_types", type=str, 121 help="(可选)需要的数据库名称(eg: -r MediaMSG;MicroMsg;FTSMSG;MSG;Sns;Emotion )", 122 default=None, metavar="") 123 parser.add_argument("-wf", "--wx_files", type=str, help="(可选)'WeChat Files'路径", default=None, 124 metavar="") 125 parser.add_argument("-id", "--wxid", type=str, help="(可选)wxid_,用于确认用户文件夹", 126 default=None, metavar="") 127 return parser 128 129 def run(self, args): 130 print(f"[*] wxdump-linux v{wxdump_linux.__version__}") 131 # 从命令行参数获取值 132 db_types = args.db_types 133 msg_dir = args.wx_files 134 wxid = args.wxid 135 ret = get_wx_db(msg_dir=msg_dir, db_types=db_types, wxids=wxid) 136 for i in ret: print(i) 137 return ret 138 139 140class MainLinuxKey(BaseSubMainClass): 141 mode = "linux_key" 142 parser_kwargs = {"help": "Linux 版微信: 从进程内存提取数据库密钥"} 143 144 def init_parses(self, parser): 145 parser.add_argument("--wx-dir", type=str, default=None, metavar="", 146 help="(可选)微信数据目录,默认自动检测") 147 return parser 148 149 def run(self, args): 150 print(f"[*] wxdump-linux v{wxdump_linux.__version__}") 151 wx_dir = args.wx_dir or find_wechat_files_path() 152 print(f"[+] wx_dir: {wx_dir if wx_dir else 'None'}") 153 if not wx_dir: 154 print("[-] 未找到微信数据目录") 155 return None 156 try: 157 db_keys = extract_all_keys(wx_dir) 158 except RuntimeError as e: 159 print(f"[-] {e}") 160 return None 161 print(f"[+] 从进程内存提取到 {len(db_keys)} 个数据库密钥:") 162 for db_path, (key, salt) in sorted(db_keys.items()): 163 rel = os.path.relpath(db_path, wx_dir) 164 print(f" {rel}: key={key}") 165 # Extract image AES key 166 image_key = extract_image_key(wx_dir) 167 if image_key: 168 print(f"[+] 图片 AES key: {image_key}") 169 else: 170 print("[-] 未找到图片 AES key(可能没有 V2 格式图片)") 171 return {"wx_dir": wx_dir, "db_keys": db_keys, "image_key": image_key} 172 173 174class MainDecrypt(BaseSubMainClass): 175 mode = "decrypt" 176 parser_kwargs = {"help": "解密微信数据库"} 177 178 def init_parses(self, parser): 179 parser.add_argument("-i", "--db_path", type=str, help="数据库路径(目录)", required=False, metavar="") 180 parser.add_argument("-o", "--out_path", type=str, default=os.path.join(os.getcwd(), "decrypted"), 181 help="输出路径(目录)[默认为当前路径下decrypted文件夹]", required=False, 182 metavar="") 183 return parser 184 185 def run(self, args): 186 print(f"[*] wxdump-linux v{wxdump_linux.__version__}") 187 db_path = args.db_path or find_wechat_files_path() 188 out_path = args.out_path 189 190 if not os.path.exists(out_path): 191 os.makedirs(out_path) 192 print(f"[+] 创建输出文件夹:{out_path}") 193 194 if not db_path or not os.path.exists(db_path): 195 print(f"[-] 数据库路径不存在:{db_path}") 196 return 197 198 try: 199 print("[*] 从进程内存提取密钥...") 200 db_keys = extract_all_keys(db_path) 201 print(f"[+] 提取到 {len(db_keys)} 个数据库密钥,开始解密...") 202 result = batch_decrypt(db_keys, out_path, True) 203 return result 204 except RuntimeError as e: 205 print(f"[-] {e}") 206 return 207 208 209class MainMerge(BaseSubMainClass): 210 mode = "merge" 211 parser_kwargs = {"help": "[测试功能]合并微信数据库(MSG.db or MediaMSG.db)"} 212 213 def init_parses(self, parser): 214 # 添加 'merge' 子命令解析器 215 parser.add_argument("-i", "--db_path", type=str, help="数据库路径(文件路径,使用英文[,]分割)", required=True, 216 metavar="") 217 parser.add_argument("-o", "--out_path", type=str, default=os.path.join(os.getcwd(), "decrypted"), 218 help="输出路径(目录或文件名)[默认为当前路径下decrypted文件夹下merge_***.db]", 219 required=False, 220 metavar="") 221 return parser 222 223 def run(self, args): 224 print(f"[*] wxdump-linux v{wxdump_linux.__version__}") 225 # 从命令行参数获取值 226 db_path = args.db_path 227 out_path = args.out_path 228 229 db_path = db_path.split(",") 230 db_path = [i.strip() for i in db_path] 231 dbpaths = [] 232 for i in db_path: 233 if not os.path.exists(i): # 判断路径是否存在 234 print(f"[-] 数据库路径不存在:{i}") 235 return 236 if os.path.isdir(i): # 如果是文件夹,则获取文件夹下所有的db文件 237 dbpaths += [os.path.join(i, j) for j in os.listdir(i) if j.endswith(".db")] 238 else: # 如果是文件,则直接添加 239 dbpaths.append(i) 240 241 if (not out_path.endswith(".db")) and (not os.path.exists(out_path)): 242 os.makedirs(out_path) 243 print(f"[+] 创建输出文件夹:{out_path}") 244 245 print(f"[*] 合并中...(用时较久,耐心等待)") 246 dbpaths = [{"db_path": i} for i in dbpaths if os.path.exists(i)] # 去除不存在的路径 247 result = merge_db(dbpaths, out_path) 248 249 print(f"[+] 合并完成:{result}") 250 return result 251 252 253class MainShowChatRecords(BaseSubMainClass): 254 mode = "dbshow" 255 parser_kwargs = {"help": "聊天记录查看"} 256 257 def init_parses(self, parser): 258 # 添加 'dbshow' 子命令解析器 259 parser.add_argument("-merge", "--merge_path", type=str, help="解密并合并后的 merge_all.db 的路径", 260 required=False, metavar="") 261 parser.add_argument("-wid", "--wx_path", type=str, 262 help="(可选)微信文件夹的路径(用于显示图片)", required=False, 263 metavar="") 264 parser.add_argument("-myid", "--my_wxid", type=str, help="(可选)微信账号(本人微信id)", required=False, 265 default="", metavar="") 266 parser.add_argument("--online", action='store_true', help="(可选)是否在线查看(局域网查看)", required=False, 267 default=False) 268 # parser.add_argument("-k", "--key", type=str, help="(可选)密钥", required=False, metavar="") 269 return parser 270 271 def run(self, args): 272 print(f"[*] wxdump-linux v{wxdump_linux.__version__}") 273 # (merge)和(msg_path,micro_path,media_path) 二选一 274 # if not args.merge_path and not (args.msg_path and args.micro_path and args.media_path): 275 # print("[-] 请输入数据库路径([merge_path] or [msg_path, micro_path, media_path])") 276 # return 277 278 # 目前仅能支持merge database 279 if not args.merge_path: 280 print("[-] 请输入数据库路径([merge_path])") 281 return 282 283 # 从命令行参数获取值 284 merge_path = args.merge_path 285 286 online = args.online 287 288 if not os.path.exists(merge_path): 289 print("[-] 输入数据库路径不存在") 290 return 291 292 start_server(merge_path=merge_path, wx_path=args.wx_path, my_wxid=args.my_wxid, online=online) 293 294 295class MainExportChatRecords(BaseSubMainClass): 296 mode = "export" 297 parser_kwargs = {"help": "[已废弃]聊天记录导出为html"} 298 299 def init_parses(self, parser): 300 # 添加 'export' 子命令解析器 301 return parser 302 303 def run(self, args): 304 print(f"[*] wxdump-linux v{wxdump_linux.__version__}") 305 print("[+] export命令已废弃,请使用ui命令[wxdump ui]或api命令[wxdump api]启动服务") 306 307 308class MainAll(BaseSubMainClass): 309 mode = "all" 310 parser_kwargs = {"help": "[已废弃]获取微信信息,解密微信数据库,查看聊天记录"} 311 312 def init_parses(self, parser): 313 # 添加 'all' 子命令解析器 314 return parser 315 316 def run(self, args): 317 print(f"[*] wxdump-linux v{wxdump_linux.__version__}") 318 print("[+] all命令已废弃,请使用ui命令[wxdump ui]或api命令[wxdump api]启动服务") 319 320 321class MainUi(BaseSubMainClass): 322 mode = "ui" 323 parser_kwargs = {"help": "启动UI界面"} 324 325 def init_parses(self, parser): 326 # 添加 'ui' 子命令解析器 327 parser.add_argument("-p", '--port', metavar="", type=int, help="(可选)端口号", default=5000) 328 parser.add_argument("--online", help="(可选)是否在线查看(局域网查看)", default=False, action='store_true') 329 parser.add_argument("--debug", help="(可选)是否开启debug模式", default=False, action='store_true') 330 parser.add_argument("--noOpenBrowser", dest='isOpenBrowser', default=True, action='store_false', 331 help="(可选)用于禁用自动打开浏览器") 332 return parser 333 334 def run(self, args): 335 print(f"[*] wxdump-linux v{wxdump_linux.__version__}") 336 # 从命令行参数获取值 337 online = args.online 338 port = args.port 339 debug = args.debug 340 isopenBrowser = args.isOpenBrowser 341 342 start_server(port=port, online=online, debug=debug, isopenBrowser=isopenBrowser) 343 344 345class MainApi(BaseSubMainClass): 346 mode = "api" 347 parser_kwargs = {"help": "启动api,不打开浏览器"} 348 349 def init_parses(self, parser): 350 # 添加 'api' 子命令解析器 351 parser.add_argument("-p", '--port', metavar="", type=int, help="(可选)端口号", default=5000) 352 parser.add_argument("--online", help="(可选)是否在线查看(局域网查看)", default=False, action='store_true') 353 parser.add_argument("--debug", action='store_true', help="(可选)是否开启debug模式", default=False) 354 return parser 355 356 def run(self, args): 357 print(f"[*] wxdump-linux v{wxdump_linux.__version__}") 358 # 从命令行参数获取值 359 online = args.online 360 port = args.port 361 debug = args.debug 362 363 start_server(port=port, online=online, debug=debug, isopenBrowser=False) 364 365 366def console_run(): 367 # 检查是否需要显示帮助信息 368 if len(sys.argv) == 1: 369 sys.argv.append(MainUi.mode) 370 elif len(sys.argv) == 2 and sys.argv[1] not in models.keys(): 371 sys.argv.append('-h') 372 main_parser.print_help() 373 return 374 375 args = main_parser.parse_args() # 解析命令行参数 376 377 if not any(vars(args).values()): 378 main_parser.print_help() 379 return 380 381 # 根据不同的 'mode' 参数,执行不同的操作 382 models[args.mode].run(args) 383 384 385if __name__ == '__main__': 386 console_run()