解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: export.py
4# Description: 导出接口
5# -------------------------------------------------------------------------------
6import os
7import time
8import shutil
9
10from pydantic import BaseModel
11from fastapi import APIRouter, Body
12
13from wxdump_linux import decrypt_merge, get_core_db
14from ..export import export_csv, export_json, export_html
15from ..response import ReJson
16from ..decorators import error9999
17from ..config import gc
18
19export_router = APIRouter()
20
21
22class ExportEndbRequest(BaseModel):
23 wx_path: str = ""
24 outpath: str = ""
25 key: str = ""
26
27
28@export_router.api_route('/export_endb', methods=["GET", 'POST'])
29def get_export_endb(request: ExportEndbRequest):
30 """
31 导出加密数据库
32 :return:
33 """
34 my_wxid = gc.get_conf(gc.at, "last")
35 if not my_wxid: return ReJson(1001, body="my_wxid is required")
36
37 wx_path = request.wx_path
38 if not wx_path:
39 wx_path = gc.get_conf(my_wxid, "wx_path")
40 if not os.path.exists(wx_path if wx_path else ""):
41 return ReJson(1002, body=f"wx_path is required: {wx_path}")
42
43 code, wxdbpaths = get_core_db(wx_path)
44 if not code:
45 return ReJson(2001, body=wxdbpaths)
46
47 outpath = os.path.join(gc.work_path, "export", my_wxid, "endb")
48 if not os.path.exists(outpath):
49 os.makedirs(outpath)
50
51 for wxdb in wxdbpaths:
52 assert isinstance(outpath, str)
53 wxdb_path = wxdb.get("db_path")
54 shutil.copy(wxdb_path, os.path.join(outpath, os.path.basename(wxdb_path)))
55 return ReJson(0, body=outpath)
56
57
58class ExportDedbRequest(BaseModel):
59 wx_path: str = ""
60 outpath: str = ""
61 key: str = ""
62
63
64@export_router.api_route('/export_dedb', methods=["GET", "POST"])
65def get_export_dedb(request: ExportDedbRequest):
66 """
67 导出解密数据库
68 :return:
69 """
70 key = request.key
71 wx_path = request.wx_path
72
73 my_wxid = gc.get_conf(gc.at, "last")
74 if not my_wxid: return ReJson(1001, body="my_wxid is required")
75
76 if not key:
77 key = gc.get_conf(my_wxid, "key")
78 if not wx_path:
79 wx_path = gc.get_conf(my_wxid, "wx_path")
80
81 if not key:
82 return ReJson(1002, body=f"key is required: {key}")
83 if not wx_path:
84 return ReJson(1002, body=f"wx_path is required: {wx_path}")
85 if not os.path.exists(wx_path):
86 return ReJson(1001, body=f"wx_path not exists: {wx_path}")
87
88 outpath = os.path.join(gc.work_path, "export", my_wxid, "dedb")
89 if not os.path.exists(outpath):
90 os.makedirs(outpath)
91 assert isinstance(outpath, str)
92 code, merge_save_path = decrypt_merge(wx_path=wx_path, key=key, outpath=outpath)
93 time.sleep(1)
94 if code:
95 return ReJson(0, body=merge_save_path)
96 else:
97 return ReJson(2001, body=merge_save_path)
98
99
100@export_router.api_route('/export_csv', methods=["GET", 'POST'])
101def get_export_csv(wxid: str = Body(..., embed=True)):
102 """
103 导出csv
104 :return:
105 """
106 my_wxid = gc.get_conf(gc.at, "last")
107 if not my_wxid: return ReJson(1001, body="my_wxid is required")
108 db_config = gc.get_conf(my_wxid, "db_config")
109
110 if not wxid:
111 return ReJson(1002, body=f"username is required: {wxid}")
112
113 outpath = os.path.join(gc.work_path, "export", my_wxid, "csv", wxid)
114 if not os.path.exists(outpath):
115 os.makedirs(outpath)
116
117 code, ret = export_csv(wxid, outpath, db_config, my_wxid=my_wxid)
118 if code:
119 return ReJson(0, ret)
120 else:
121 return ReJson(2001, body=ret)
122
123
124@export_router.api_route('/export_json', methods=["GET", 'POST'])
125def get_export_json(wxid: str = Body(..., embed=True)):
126 """
127 导出json
128 :return:
129 """
130 my_wxid = gc.get_conf(gc.at, "last")
131 if not my_wxid: return ReJson(1001, body="my_wxid is required")
132 db_config = gc.get_conf(my_wxid, "db_config")
133
134 if not wxid:
135 return ReJson(1002, body=f"username is required: {wxid}")
136
137 outpath = os.path.join(gc.work_path, "export", my_wxid, "json", wxid)
138 if not os.path.exists(outpath):
139 os.makedirs(outpath)
140
141 code, ret = export_json(wxid, outpath, db_config, my_wxid=my_wxid)
142 if code:
143 return ReJson(0, ret)
144 else:
145 return ReJson(2001, body=ret)
146
147
148class ExportHtmlRequest(BaseModel):
149 wxid: str
150
151
152@export_router.api_route('/export_html', methods=["GET", 'POST'])
153def get_export_html(wxid: str = Body(..., embed=True)):
154 """
155 导出html
156 :return:
157 """
158 my_wxid = gc.get_conf(gc.at, "last")
159 if not my_wxid: return ReJson(1001, body="my_wxid is required")
160 db_config = gc.get_conf(my_wxid, "db_config")
161
162 if not wxid:
163 return ReJson(1002, body=f"username is required: {wxid}")
164
165 html_outpath = os.path.join(gc.work_path, "export", my_wxid, "html")
166 if not os.path.exists(html_outpath):
167 os.makedirs(html_outpath)
168 assert isinstance(html_outpath, str)
169 outpath = os.path.join(html_outpath, wxid)
170 if os.path.exists(outpath):
171 shutil.rmtree(outpath, ignore_errors=True)
172 web_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "ui", "web")
173 shutil.copytree(web_path, outpath)
174
175 code, ret = export_html(wxid, outpath, db_config, my_wxid=my_wxid)
176
177 if code:
178 return ReJson(0, ret)
179 else:
180 return ReJson(2001, body=ret)