解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: main.py.py
4# Description:
5# Author: xaoyaoo
6# Date: 2023/10/14
7# -------------------------------------------------------------------------------
8import argparse
9import os
10import sys
11
12from wxdump_linux import *
13import wxdump_linux
14from wxdump_linux.linux import find_wechat_files_path, extract_all_keys, extract_image_key
15
16wxdump_ascii = r"""
17██╗ ██╗██╗ ██╗██████╗ ██╗ ██╗███╗ ███╗██████╗ ██╗ ██╗███╗ ██╗██╗ ██╗██╗ ██╗
18██║ ██║╚██╗██╔╝██╔══██╗██║ ██║████╗ ████║██╔══██╗ ██║ ██║████╗ ██║██║ ██║╚██╗██╔╝
19██║ █╗ ██║ ╚███╔╝ ██║ ██║██║ ██║██╔████╔██║██████╔╝█████╗██║ ██║██╔██╗ ██║██║ ██║ ╚███╔╝
20██║███╗██║ ██╔██╗ ██║ ██║██║ ██║██║╚██╔╝██║██╔═══╝ ╚════╝██║ ██║██║╚██╗██║██║ ██║ ██╔██╗
21╚███╔███╔╝██╔╝ ██╗██████╔╝╚██████╔╝██║ ╚═╝ ██║██║ ███████╗██║██║ ╚████║╚██████╔╝██╔╝ ██╗
22 ╚══╝╚══╝ ╚═╝ ╚═╝╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚══════╝╚═╝╚═╝ ╚═══╝ ╚═════╝ ╚═╝ ╚═╝
23"""
24WXDUMP_VERSION = wxdump_linux.__version__
25
26models = {}
27
28
29def create_parser():
30 class CustomArgumentParser(argparse.ArgumentParser):
31 def format_help(self):
32 # 首先显示软件简介
33 # 定义软件简介文本并进行格式化
34 line_len = 70
35 WXDUMP_VERSION = wxdump_linux.__version__
36 wxdump_line = '\n'.join([f'\033[36m{line:^{line_len}}\033[0m' for line in wxdump_ascii.split('\n') if line])
37 first_line = f'\033[36m{" wxdump-linux v" + WXDUMP_VERSION + " ":=^{line_len}}\033[0m'
38 brief = 'wxdump-linux: Linux 版微信数据库解密工具(密钥提取、解密、合并、Web 查看)'
39 other = '基于 PyWxDump,感谢原作者 xaoyaoo'
40
41 separator = f'\033[36m{" options ":-^{line_len}}\033[0m'
42
43 # 获取帮助信息并添加到软件简介下方
44 help_text = super().format_help().strip()
45
46 return f'\n{wxdump_line}\n\n{first_line}\n{brief}\n{separator}\n{help_text}\n{separator}\n{other}\n{first_line}\n'
47
48 # 创建命令行参数解析器
49 parser = CustomArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
50 parser.add_argument('-V', '--version', action='version', version=f"wxdump-linux v{WXDUMP_VERSION}")
51
52 # 添加子命令解析器
53 subparsers = parser.add_subparsers(dest="mode", help="""运行模式:""", required=True, metavar="mode")
54
55 return parser, subparsers
56
57
58main_parser, sub_parsers = create_parser()
59
60
61class SubMainMetaclass(type):
62
63 def is_implemented_method(cls, name: str, method: str):
64 if not hasattr(cls, method) or not callable(getattr(cls, method)):
65 raise NotImplementedError("{} NotImplemented [{}]".format(name, method))
66
67 def __init__(cls, name, bases, kwargs):
68 super(SubMainMetaclass, cls).__init__(name, bases, kwargs)
69
70 if name in ["BaseSubMainClass"]:
71 return
72
73 mode = getattr(cls, "mode")
74 if mode in models:
75 raise TypeError("mode[{}] is used...".format(mode))
76
77 cls.is_implemented_method(name, "init_parses")
78 cls.is_implemented_method(name, "run")
79
80 c = cls()
81 models[mode] = c
82 c.init_parses(sub_parsers.add_parser(mode, **getattr(c, "parser_kwargs")))
83
84
85class BaseSubMainClass(metaclass=SubMainMetaclass):
86 parser_kwargs = {}
87
88 @property
89 def mode(self) -> str:
90 raise NotImplementedError()
91
92 def init_parses(self, parser):
93 raise NotImplementedError()
94
95 def run(self, args: argparse.Namespace):
96 raise NotImplementedError()
97
98
99class MainWxInfo(BaseSubMainClass):
100 mode = "info"
101 parser_kwargs = {"help": "获取微信信息(Linux 版:从进程内存提取)"}
102
103 def init_parses(self, parser):
104 parser.add_argument("-s", '--save_path', metavar="", type=str, help="(可选)保存路径【json文件】")
105 return parser
106
107 def run(self, args):
108 print(f"[*] wxdump-linux v{wxdump_linux.__version__}")
109 save_path = args.save_path
110 result = get_wx_info(is_print=True, save_path=save_path)
111 return result
112
113
114class MainWxDbPath(BaseSubMainClass):
115 mode = "wx_path"
116 parser_kwargs = {"help": "获取微信文件夹路径"}
117
118 def init_parses(self, parser):
119 # 添加 'wx_db_path' 子命令解析器
120 parser.add_argument("-r", "--db_types", type=str,
121 help="(可选)需要的数据库名称(eg: -r MediaMSG;MicroMsg;FTSMSG;MSG;Sns;Emotion )",
122 default=None, metavar="")
123 parser.add_argument("-wf", "--wx_files", type=str, help="(可选)'WeChat Files'路径", default=None,
124 metavar="")
125 parser.add_argument("-id", "--wxid", type=str, help="(可选)wxid_,用于确认用户文件夹",
126 default=None, metavar="")
127 return parser
128
129 def run(self, args):
130 print(f"[*] wxdump-linux v{wxdump_linux.__version__}")
131 # 从命令行参数获取值
132 db_types = args.db_types
133 msg_dir = args.wx_files
134 wxid = args.wxid
135 ret = get_wx_db(msg_dir=msg_dir, db_types=db_types, wxids=wxid)
136 for i in ret: print(i)
137 return ret
138
139
140class MainLinuxKey(BaseSubMainClass):
141 mode = "linux_key"
142 parser_kwargs = {"help": "Linux 版微信: 从进程内存提取数据库密钥"}
143
144 def init_parses(self, parser):
145 parser.add_argument("--wx-dir", type=str, default=None, metavar="",
146 help="(可选)微信数据目录,默认自动检测")
147 return parser
148
149 def run(self, args):
150 print(f"[*] wxdump-linux v{wxdump_linux.__version__}")
151 wx_dir = args.wx_dir or find_wechat_files_path()
152 print(f"[+] wx_dir: {wx_dir if wx_dir else 'None'}")
153 if not wx_dir:
154 print("[-] 未找到微信数据目录")
155 return None
156 try:
157 db_keys = extract_all_keys(wx_dir)
158 except RuntimeError as e:
159 print(f"[-] {e}")
160 return None
161 print(f"[+] 从进程内存提取到 {len(db_keys)} 个数据库密钥:")
162 for db_path, (key, salt) in sorted(db_keys.items()):
163 rel = os.path.relpath(db_path, wx_dir)
164 print(f" {rel}: key={key}")
165 # Extract image AES key
166 image_key = extract_image_key(wx_dir)
167 if image_key:
168 print(f"[+] 图片 AES key: {image_key}")
169 else:
170 print("[-] 未找到图片 AES key(可能没有 V2 格式图片)")
171 return {"wx_dir": wx_dir, "db_keys": db_keys, "image_key": image_key}
172
173
174class MainDecrypt(BaseSubMainClass):
175 mode = "decrypt"
176 parser_kwargs = {"help": "解密微信数据库"}
177
178 def init_parses(self, parser):
179 parser.add_argument("-i", "--db_path", type=str, help="数据库路径(目录)", required=False, metavar="")
180 parser.add_argument("-o", "--out_path", type=str, default=os.path.join(os.getcwd(), "decrypted"),
181 help="输出路径(目录)[默认为当前路径下decrypted文件夹]", required=False,
182 metavar="")
183 return parser
184
185 def run(self, args):
186 print(f"[*] wxdump-linux v{wxdump_linux.__version__}")
187 db_path = args.db_path or find_wechat_files_path()
188 out_path = args.out_path
189
190 if not os.path.exists(out_path):
191 os.makedirs(out_path)
192 print(f"[+] 创建输出文件夹:{out_path}")
193
194 if not db_path or not os.path.exists(db_path):
195 print(f"[-] 数据库路径不存在:{db_path}")
196 return
197
198 try:
199 print("[*] 从进程内存提取密钥...")
200 db_keys = extract_all_keys(db_path)
201 print(f"[+] 提取到 {len(db_keys)} 个数据库密钥,开始解密...")
202 result = batch_decrypt(db_keys, out_path, True)
203 return result
204 except RuntimeError as e:
205 print(f"[-] {e}")
206 return
207
208
209class MainMerge(BaseSubMainClass):
210 mode = "merge"
211 parser_kwargs = {"help": "[测试功能]合并微信数据库(MSG.db or MediaMSG.db)"}
212
213 def init_parses(self, parser):
214 # 添加 'merge' 子命令解析器
215 parser.add_argument("-i", "--db_path", type=str, help="数据库路径(文件路径,使用英文[,]分割)", required=True,
216 metavar="")
217 parser.add_argument("-o", "--out_path", type=str, default=os.path.join(os.getcwd(), "decrypted"),
218 help="输出路径(目录或文件名)[默认为当前路径下decrypted文件夹下merge_***.db]",
219 required=False,
220 metavar="")
221 return parser
222
223 def run(self, args):
224 print(f"[*] wxdump-linux v{wxdump_linux.__version__}")
225 # 从命令行参数获取值
226 db_path = args.db_path
227 out_path = args.out_path
228
229 db_path = db_path.split(",")
230 db_path = [i.strip() for i in db_path]
231 dbpaths = []
232 for i in db_path:
233 if not os.path.exists(i): # 判断路径是否存在
234 print(f"[-] 数据库路径不存在:{i}")
235 return
236 if os.path.isdir(i): # 如果是文件夹,则获取文件夹下所有的db文件
237 dbpaths += [os.path.join(i, j) for j in os.listdir(i) if j.endswith(".db")]
238 else: # 如果是文件,则直接添加
239 dbpaths.append(i)
240
241 if (not out_path.endswith(".db")) and (not os.path.exists(out_path)):
242 os.makedirs(out_path)
243 print(f"[+] 创建输出文件夹:{out_path}")
244
245 print(f"[*] 合并中...(用时较久,耐心等待)")
246 dbpaths = [{"db_path": i} for i in dbpaths if os.path.exists(i)] # 去除不存在的路径
247 result = merge_db(dbpaths, out_path)
248
249 print(f"[+] 合并完成:{result}")
250 return result
251
252
253class MainShowChatRecords(BaseSubMainClass):
254 mode = "dbshow"
255 parser_kwargs = {"help": "聊天记录查看"}
256
257 def init_parses(self, parser):
258 # 添加 'dbshow' 子命令解析器
259 parser.add_argument("-merge", "--merge_path", type=str, help="解密并合并后的 merge_all.db 的路径",
260 required=False, metavar="")
261 parser.add_argument("-wid", "--wx_path", type=str,
262 help="(可选)微信文件夹的路径(用于显示图片)", required=False,
263 metavar="")
264 parser.add_argument("-myid", "--my_wxid", type=str, help="(可选)微信账号(本人微信id)", required=False,
265 default="", metavar="")
266 parser.add_argument("--online", action='store_true', help="(可选)是否在线查看(局域网查看)", required=False,
267 default=False)
268 # parser.add_argument("-k", "--key", type=str, help="(可选)密钥", required=False, metavar="")
269 return parser
270
271 def run(self, args):
272 print(f"[*] wxdump-linux v{wxdump_linux.__version__}")
273 # (merge)和(msg_path,micro_path,media_path) 二选一
274 # if not args.merge_path and not (args.msg_path and args.micro_path and args.media_path):
275 # print("[-] 请输入数据库路径([merge_path] or [msg_path, micro_path, media_path])")
276 # return
277
278 # 目前仅能支持merge database
279 if not args.merge_path:
280 print("[-] 请输入数据库路径([merge_path])")
281 return
282
283 # 从命令行参数获取值
284 merge_path = args.merge_path
285
286 online = args.online
287
288 if not os.path.exists(merge_path):
289 print("[-] 输入数据库路径不存在")
290 return
291
292 start_server(merge_path=merge_path, wx_path=args.wx_path, my_wxid=args.my_wxid, online=online)
293
294
295class MainExportChatRecords(BaseSubMainClass):
296 mode = "export"
297 parser_kwargs = {"help": "[已废弃]聊天记录导出为html"}
298
299 def init_parses(self, parser):
300 # 添加 'export' 子命令解析器
301 return parser
302
303 def run(self, args):
304 print(f"[*] wxdump-linux v{wxdump_linux.__version__}")
305 print("[+] export命令已废弃,请使用ui命令[wxdump ui]或api命令[wxdump api]启动服务")
306
307
308class MainAll(BaseSubMainClass):
309 mode = "all"
310 parser_kwargs = {"help": "[已废弃]获取微信信息,解密微信数据库,查看聊天记录"}
311
312 def init_parses(self, parser):
313 # 添加 'all' 子命令解析器
314 return parser
315
316 def run(self, args):
317 print(f"[*] wxdump-linux v{wxdump_linux.__version__}")
318 print("[+] all命令已废弃,请使用ui命令[wxdump ui]或api命令[wxdump api]启动服务")
319
320
321class MainUi(BaseSubMainClass):
322 mode = "ui"
323 parser_kwargs = {"help": "启动UI界面"}
324
325 def init_parses(self, parser):
326 # 添加 'ui' 子命令解析器
327 parser.add_argument("-p", '--port', metavar="", type=int, help="(可选)端口号", default=5000)
328 parser.add_argument("--online", help="(可选)是否在线查看(局域网查看)", default=False, action='store_true')
329 parser.add_argument("--debug", help="(可选)是否开启debug模式", default=False, action='store_true')
330 parser.add_argument("--noOpenBrowser", dest='isOpenBrowser', default=True, action='store_false',
331 help="(可选)用于禁用自动打开浏览器")
332 return parser
333
334 def run(self, args):
335 print(f"[*] wxdump-linux v{wxdump_linux.__version__}")
336 # 从命令行参数获取值
337 online = args.online
338 port = args.port
339 debug = args.debug
340 isopenBrowser = args.isOpenBrowser
341
342 start_server(port=port, online=online, debug=debug, isopenBrowser=isopenBrowser)
343
344
345class MainApi(BaseSubMainClass):
346 mode = "api"
347 parser_kwargs = {"help": "启动api,不打开浏览器"}
348
349 def init_parses(self, parser):
350 # 添加 'api' 子命令解析器
351 parser.add_argument("-p", '--port', metavar="", type=int, help="(可选)端口号", default=5000)
352 parser.add_argument("--online", help="(可选)是否在线查看(局域网查看)", default=False, action='store_true')
353 parser.add_argument("--debug", action='store_true', help="(可选)是否开启debug模式", default=False)
354 return parser
355
356 def run(self, args):
357 print(f"[*] wxdump-linux v{wxdump_linux.__version__}")
358 # 从命令行参数获取值
359 online = args.online
360 port = args.port
361 debug = args.debug
362
363 start_server(port=port, online=online, debug=debug, isopenBrowser=False)
364
365
366def console_run():
367 # 检查是否需要显示帮助信息
368 if len(sys.argv) == 1:
369 sys.argv.append(MainUi.mode)
370 elif len(sys.argv) == 2 and sys.argv[1] not in models.keys():
371 sys.argv.append('-h')
372 main_parser.print_help()
373 return
374
375 args = main_parser.parse_args() # 解析命令行参数
376
377 if not any(vars(args).values()):
378 main_parser.print_help()
379 return
380
381 # 根据不同的 'mode' 参数,执行不同的操作
382 models[args.mode].run(args)
383
384
385if __name__ == '__main__':
386 console_run()