解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: local_server.py
4# Description:
5# Author: xaoyaoo
6# Date: 2024/08/01
7# -------------------------------------------------------------------------------
8import os
9import time
10import shutil
11from pydantic import BaseModel
12from fastapi import APIRouter, Body
13
14from wxdump_linux import get_wx_db
15from wxdump_linux import get_wx_info, batch_decrypt, merge_db, decrypt_merge
16
17from .response import ReJson, RqJson
18from .utils import error9999, ls_loger, random_str, gc
19
20ls_api = APIRouter()
21
22
23# 以下为初始化相关 *******************************************************************************************************
24
25@ls_api.post('/init_last_local_wxid')
26@error9999
27def init_last_local_wxid():
28 """
29 初始化,包括key
30 :return:
31 """
32 local_wxid = gc.get_local_wxids()
33 local_wxid.remove(gc.at)
34 if local_wxid:
35 return ReJson(0, {"local_wxids": local_wxid})
36 return ReJson(0, {"local_wxids": []})
37
38
39@ls_api.post('/init_last')
40@error9999
41def init_last(my_wxid: str = Body(..., embed=True)):
42 """
43 是否初始化
44 :return:
45 """
46 my_wxid = my_wxid.strip().strip("'").strip('"') if isinstance(my_wxid, str) else ""
47 ls_loger.info(f"[+] init_last: {my_wxid}")
48 if not my_wxid:
49 my_wxid = gc.get_conf(gc.at, "last")
50 if not my_wxid: return ReJson(1001, body="my_wxid is required")
51 if my_wxid:
52 gc.set_conf(gc.at, "last", my_wxid)
53 merge_path = gc.get_conf(my_wxid, "merge_path")
54 wx_path = gc.get_conf(my_wxid, "wx_path")
55 key = gc.get_conf(my_wxid, "key")
56 rdata = {
57 "merge_path": merge_path,
58 "wx_path": wx_path,
59 "key": key,
60 "my_wxid": my_wxid,
61 "is_init": True,
62 }
63 if merge_path and wx_path:
64 return ReJson(0, rdata)
65 return ReJson(0, {"is_init": False, "my_wxid": ""})
66
67
68class InitKeyRequest(BaseModel):
69 wx_path: str
70 my_wxid: str
71
72
73@ls_api.post('/init_key')
74@error9999
75def init_key(request: InitKeyRequest):
76 """
77 初始化:从进程内存提取密钥,解密并合并数据库
78 :param request:
79 :return:
80 """
81 wx_path = request.wx_path.strip().strip("'").strip('"')
82 my_wxid = request.my_wxid.strip().strip("'").strip('"')
83 if not wx_path:
84 return ReJson(1002, body=f"wx_path is required: {wx_path}")
85 if not os.path.exists(wx_path):
86 return ReJson(1001, body=f"wx_path not exists: {wx_path}")
87 if not my_wxid:
88 return ReJson(1002, body=f"my_wxid is required: {my_wxid}")
89
90 out_path = os.path.join(gc.work_path, "decrypted", my_wxid) if my_wxid else os.path.join(gc.work_path, "decrypted")
91 # 检查文件夹中文件是否被占用
92 if os.path.exists(out_path):
93 try:
94 shutil.rmtree(out_path)
95 except PermissionError as e:
96 ls_loger.error(f"{e}", exc_info=True)
97 return ReJson(2001, body=str(e))
98
99 code, merge_save_path = decrypt_merge(wx_path=wx_path, outpath=str(out_path))
100 time.sleep(1)
101 if code:
102 # 移动merge_save_path到g.work_path/my_wxid
103 if not os.path.exists(os.path.join(gc.work_path, my_wxid)):
104 os.makedirs(os.path.join(gc.work_path, my_wxid))
105 merge_save_path_new = os.path.join(gc.work_path, my_wxid, "merge_all.db")
106 shutil.move(merge_save_path, str(merge_save_path_new))
107
108 # 删除out_path
109 if os.path.exists(out_path):
110 try:
111 shutil.rmtree(out_path)
112 except PermissionError as e:
113 ls_loger.error(f"{e}", exc_info=True)
114 db_config = {
115 "key": random_str(16),
116 "type": "sqlite",
117 "path": merge_save_path_new
118 }
119 gc.set_conf(my_wxid, "db_config", db_config)
120 gc.set_conf(my_wxid, "merge_path", merge_save_path_new)
121 gc.set_conf(my_wxid, "wx_path", wx_path)
122 gc.set_conf(my_wxid, "my_wxid", my_wxid)
123 gc.set_conf(gc.at, "last", my_wxid)
124 rdata = {
125 "merge_path": merge_save_path_new,
126 "wx_path": wx_path,
127 "my_wxid": my_wxid,
128 "is_init": True,
129 }
130 return ReJson(0, rdata)
131 else:
132 return ReJson(2001, body=merge_save_path)
133
134
135class InitNoKeyRequest(BaseModel):
136 merge_path: str
137 wx_path: str
138 my_wxid: str
139
140
141@ls_api.post('/init_nokey')
142@error9999
143def init_nokey(request: InitNoKeyRequest):
144 """
145 初始化,包括key
146 :return:
147 """
148 merge_path = request.merge_path.strip().strip("'").strip('"')
149 wx_path = request.wx_path.strip().strip("'").strip('"')
150 my_wxid = request.my_wxid.strip().strip("'").strip('"')
151
152 if not wx_path:
153 return ReJson(1002, body=f"wx_path is required: {wx_path}")
154 if not os.path.exists(wx_path):
155 return ReJson(1001, body=f"wx_path not exists: {wx_path}")
156 if not merge_path:
157 return ReJson(1002, body=f"merge_path is required: {merge_path}")
158 if not my_wxid:
159 return ReJson(1002, body=f"my_wxid is required: {my_wxid}")
160
161 key = gc.get_conf(my_wxid, "key")
162 db_config = {
163 "key": random_str(16),
164 "type": "sqlite",
165 "path": merge_path
166 }
167 gc.set_conf(my_wxid, "db_config", db_config)
168 gc.set_conf(my_wxid, "merge_path", merge_path)
169 gc.set_conf(my_wxid, "wx_path", wx_path)
170 gc.set_conf(my_wxid, "key", key)
171 gc.set_conf(my_wxid, "my_wxid", my_wxid)
172 gc.set_conf(gc.at, "last", my_wxid)
173 rdata = {
174 "merge_path": merge_path,
175 "wx_path": wx_path,
176 "key": "",
177 "my_wxid": my_wxid,
178 "is_init": True,
179 }
180 return ReJson(0, rdata)
181
182
183# END 以上为初始化相关 ***************************************************************************************************
184
185
186# start 这部分为专业工具的api *********************************************************************************************
187
188@ls_api.api_route('/wxinfo', methods=["GET", 'POST'])
189@error9999
190def get_wxinfo():
191 """
192 获取微信信息(Linux 版:从进程内存提取)
193 :return:
194 """
195 wxinfos = get_wx_info()
196 return ReJson(0, wxinfos)
197
198
199@ls_api.api_route('/decrypt', methods=["GET", 'POST'])
200@error9999
201def get_decrypt(wxdbPath: str, outPath: str = ""):
202 """
203 解密(Linux 版:自动从进程内存提取密钥)
204 :return:
205 """
206 from wxdump_linux.linux.memscan import extract_all_keys
207 if not outPath:
208 outPath = gc.work_path
209 db_keys = extract_all_keys(wxdbPath)
210 wxinfos = batch_decrypt(db_keys, outPath)
211 return ReJson(0, str(wxinfos))
212
213
214class MergeRequest(BaseModel):
215 dbPath: str
216 outPath: str
217
218
219@ls_api.post('/merge')
220@error9999
221def get_merge(request: MergeRequest):
222 """
223 合并
224 :return:
225 """
226 wxdb_path = request.dbPath
227 out_path = request.outPath
228 db_path = get_wx_db(wxdb_path)
229 # for i in db_path:print(i)
230 rdata = merge_db(db_path, out_path)
231 return ReJson(0, str(rdata))
232
233# END 这部分为专业工具的api ***********************************************************************************************