解锁并提取Linux客户端微信数据库 (vibe coded)
at 180 lines 5.4 kB view raw
1# -*- coding: utf-8 -*-# 2# ------------------------------------------------------------------------------- 3# Name: export.py 4# Description: 导出接口 5# ------------------------------------------------------------------------------- 6import os 7import time 8import shutil 9 10from pydantic import BaseModel 11from fastapi import APIRouter, Body 12 13from wxdump_linux import decrypt_merge, get_core_db 14from ..export import export_csv, export_json, export_html 15from ..response import ReJson 16from ..decorators import error9999 17from ..config import gc 18 19export_router = APIRouter() 20 21 22class ExportEndbRequest(BaseModel): 23 wx_path: str = "" 24 outpath: str = "" 25 key: str = "" 26 27 28@export_router.api_route('/export_endb', methods=["GET", 'POST']) 29def get_export_endb(request: ExportEndbRequest): 30 """ 31 导出加密数据库 32 :return: 33 """ 34 my_wxid = gc.get_conf(gc.at, "last") 35 if not my_wxid: return ReJson(1001, body="my_wxid is required") 36 37 wx_path = request.wx_path 38 if not wx_path: 39 wx_path = gc.get_conf(my_wxid, "wx_path") 40 if not os.path.exists(wx_path if wx_path else ""): 41 return ReJson(1002, body=f"wx_path is required: {wx_path}") 42 43 code, wxdbpaths = get_core_db(wx_path) 44 if not code: 45 return ReJson(2001, body=wxdbpaths) 46 47 outpath = os.path.join(gc.work_path, "export", my_wxid, "endb") 48 if not os.path.exists(outpath): 49 os.makedirs(outpath) 50 51 for wxdb in wxdbpaths: 52 assert isinstance(outpath, str) 53 wxdb_path = wxdb.get("db_path") 54 shutil.copy(wxdb_path, os.path.join(outpath, os.path.basename(wxdb_path))) 55 return ReJson(0, body=outpath) 56 57 58class ExportDedbRequest(BaseModel): 59 wx_path: str = "" 60 outpath: str = "" 61 key: str = "" 62 63 64@export_router.api_route('/export_dedb', methods=["GET", "POST"]) 65def get_export_dedb(request: ExportDedbRequest): 66 """ 67 导出解密数据库 68 :return: 69 """ 70 key = request.key 71 wx_path = request.wx_path 72 73 my_wxid = gc.get_conf(gc.at, "last") 74 if not my_wxid: return ReJson(1001, body="my_wxid is required") 75 76 if not key: 77 key = gc.get_conf(my_wxid, "key") 78 if not wx_path: 79 wx_path = gc.get_conf(my_wxid, "wx_path") 80 81 if not key: 82 return ReJson(1002, body=f"key is required: {key}") 83 if not wx_path: 84 return ReJson(1002, body=f"wx_path is required: {wx_path}") 85 if not os.path.exists(wx_path): 86 return ReJson(1001, body=f"wx_path not exists: {wx_path}") 87 88 outpath = os.path.join(gc.work_path, "export", my_wxid, "dedb") 89 if not os.path.exists(outpath): 90 os.makedirs(outpath) 91 assert isinstance(outpath, str) 92 code, merge_save_path = decrypt_merge(wx_path=wx_path, key=key, outpath=outpath) 93 time.sleep(1) 94 if code: 95 return ReJson(0, body=merge_save_path) 96 else: 97 return ReJson(2001, body=merge_save_path) 98 99 100@export_router.api_route('/export_csv', methods=["GET", 'POST']) 101def get_export_csv(wxid: str = Body(..., embed=True)): 102 """ 103 导出csv 104 :return: 105 """ 106 my_wxid = gc.get_conf(gc.at, "last") 107 if not my_wxid: return ReJson(1001, body="my_wxid is required") 108 db_config = gc.get_conf(my_wxid, "db_config") 109 110 if not wxid: 111 return ReJson(1002, body=f"username is required: {wxid}") 112 113 outpath = os.path.join(gc.work_path, "export", my_wxid, "csv", wxid) 114 if not os.path.exists(outpath): 115 os.makedirs(outpath) 116 117 code, ret = export_csv(wxid, outpath, db_config, my_wxid=my_wxid) 118 if code: 119 return ReJson(0, ret) 120 else: 121 return ReJson(2001, body=ret) 122 123 124@export_router.api_route('/export_json', methods=["GET", 'POST']) 125def get_export_json(wxid: str = Body(..., embed=True)): 126 """ 127 导出json 128 :return: 129 """ 130 my_wxid = gc.get_conf(gc.at, "last") 131 if not my_wxid: return ReJson(1001, body="my_wxid is required") 132 db_config = gc.get_conf(my_wxid, "db_config") 133 134 if not wxid: 135 return ReJson(1002, body=f"username is required: {wxid}") 136 137 outpath = os.path.join(gc.work_path, "export", my_wxid, "json", wxid) 138 if not os.path.exists(outpath): 139 os.makedirs(outpath) 140 141 code, ret = export_json(wxid, outpath, db_config, my_wxid=my_wxid) 142 if code: 143 return ReJson(0, ret) 144 else: 145 return ReJson(2001, body=ret) 146 147 148class ExportHtmlRequest(BaseModel): 149 wxid: str 150 151 152@export_router.api_route('/export_html', methods=["GET", 'POST']) 153def get_export_html(wxid: str = Body(..., embed=True)): 154 """ 155 导出html 156 :return: 157 """ 158 my_wxid = gc.get_conf(gc.at, "last") 159 if not my_wxid: return ReJson(1001, body="my_wxid is required") 160 db_config = gc.get_conf(my_wxid, "db_config") 161 162 if not wxid: 163 return ReJson(1002, body=f"username is required: {wxid}") 164 165 html_outpath = os.path.join(gc.work_path, "export", my_wxid, "html") 166 if not os.path.exists(html_outpath): 167 os.makedirs(html_outpath) 168 assert isinstance(html_outpath, str) 169 outpath = os.path.join(html_outpath, wxid) 170 if os.path.exists(outpath): 171 shutil.rmtree(outpath, ignore_errors=True) 172 web_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "ui", "web") 173 shutil.copytree(web_path, outpath) 174 175 code, ret = export_html(wxid, outpath, db_config, my_wxid=my_wxid) 176 177 if code: 178 return ReJson(0, ret) 179 else: 180 return ReJson(2001, body=ret)