{"contents":"# -*- coding: utf-8 -*-#\n# -------------------------------------------------------------------------------\n# Name: export.py\n# Description: 导出接口\n# -------------------------------------------------------------------------------\nimport os\nimport time\nimport shutil\n\nfrom pydantic import BaseModel\nfrom fastapi import APIRouter, Body\n\nfrom wxdump_linux import decrypt_merge, get_core_db\nfrom ..export import export_csv, export_json, export_html\nfrom ..response import ReJson\nfrom ..decorators import error9999\nfrom ..config import gc\n\nexport_router = APIRouter()\n\n\nclass ExportEndbRequest(BaseModel):\n wx_path: str = \"\"\n outpath: str = \"\"\n key: str = \"\"\n\n\n@export_router.api_route('/export_endb', methods=[\"GET\", 'POST'])\ndef get_export_endb(request: ExportEndbRequest):\n \"\"\"\n 导出加密数据库\n :return:\n \"\"\"\n my_wxid = gc.get_conf(gc.at, \"last\")\n if not my_wxid: return ReJson(1001, body=\"my_wxid is required\")\n\n wx_path = request.wx_path\n if not wx_path:\n wx_path = gc.get_conf(my_wxid, \"wx_path\")\n if not os.path.exists(wx_path if wx_path else \"\"):\n return ReJson(1002, body=f\"wx_path is required: {wx_path}\")\n\n code, wxdbpaths = get_core_db(wx_path)\n if not code:\n return ReJson(2001, body=wxdbpaths)\n\n outpath = os.path.join(gc.work_path, \"export\", my_wxid, \"endb\")\n if not os.path.exists(outpath):\n os.makedirs(outpath)\n\n for wxdb in wxdbpaths:\n assert isinstance(outpath, str)\n wxdb_path = wxdb.get(\"db_path\")\n shutil.copy(wxdb_path, os.path.join(outpath, os.path.basename(wxdb_path)))\n return ReJson(0, body=outpath)\n\n\nclass ExportDedbRequest(BaseModel):\n wx_path: str = \"\"\n outpath: str = \"\"\n key: str = \"\"\n\n\n@export_router.api_route('/export_dedb', methods=[\"GET\", \"POST\"])\ndef get_export_dedb(request: ExportDedbRequest):\n \"\"\"\n 导出解密数据库\n :return:\n \"\"\"\n key = request.key\n wx_path = request.wx_path\n\n my_wxid = gc.get_conf(gc.at, \"last\")\n if not my_wxid: return ReJson(1001, body=\"my_wxid is required\")\n\n if not key:\n key = gc.get_conf(my_wxid, \"key\")\n if not wx_path:\n wx_path = gc.get_conf(my_wxid, \"wx_path\")\n\n if not key:\n return ReJson(1002, body=f\"key is required: {key}\")\n if not wx_path:\n return ReJson(1002, body=f\"wx_path is required: {wx_path}\")\n if not os.path.exists(wx_path):\n return ReJson(1001, body=f\"wx_path not exists: {wx_path}\")\n\n outpath = os.path.join(gc.work_path, \"export\", my_wxid, \"dedb\")\n if not os.path.exists(outpath):\n os.makedirs(outpath)\n assert isinstance(outpath, str)\n code, merge_save_path = decrypt_merge(wx_path=wx_path, key=key, outpath=outpath)\n time.sleep(1)\n if code:\n return ReJson(0, body=merge_save_path)\n else:\n return ReJson(2001, body=merge_save_path)\n\n\n@export_router.api_route('/export_csv', methods=[\"GET\", 'POST'])\ndef get_export_csv(wxid: str = Body(..., embed=True)):\n \"\"\"\n 导出csv\n :return:\n \"\"\"\n my_wxid = gc.get_conf(gc.at, \"last\")\n if not my_wxid: return ReJson(1001, body=\"my_wxid is required\")\n db_config = gc.get_conf(my_wxid, \"db_config\")\n\n if not wxid:\n return ReJson(1002, body=f\"username is required: {wxid}\")\n\n outpath = os.path.join(gc.work_path, \"export\", my_wxid, \"csv\", wxid)\n if not os.path.exists(outpath):\n os.makedirs(outpath)\n\n code, ret = export_csv(wxid, outpath, db_config, my_wxid=my_wxid)\n if code:\n return ReJson(0, ret)\n else:\n return ReJson(2001, body=ret)\n\n\n@export_router.api_route('/export_json', methods=[\"GET\", 'POST'])\ndef get_export_json(wxid: str = Body(..., embed=True)):\n \"\"\"\n 导出json\n :return:\n \"\"\"\n my_wxid = gc.get_conf(gc.at, \"last\")\n if not my_wxid: return ReJson(1001, body=\"my_wxid is required\")\n db_config = gc.get_conf(my_wxid, \"db_config\")\n\n if not wxid:\n return ReJson(1002, body=f\"username is required: {wxid}\")\n\n outpath = os.path.join(gc.work_path, \"export\", my_wxid, \"json\", wxid)\n if not os.path.exists(outpath):\n os.makedirs(outpath)\n\n code, ret = export_json(wxid, outpath, db_config, my_wxid=my_wxid)\n if code:\n return ReJson(0, ret)\n else:\n return ReJson(2001, body=ret)\n\n\nclass ExportHtmlRequest(BaseModel):\n wxid: str\n\n\n@export_router.api_route('/export_html', methods=[\"GET\", 'POST'])\ndef get_export_html(wxid: str = Body(..., embed=True)):\n \"\"\"\n 导出html\n :return:\n \"\"\"\n my_wxid = gc.get_conf(gc.at, \"last\")\n if not my_wxid: return ReJson(1001, body=\"my_wxid is required\")\n db_config = gc.get_conf(my_wxid, \"db_config\")\n\n if not wxid:\n return ReJson(1002, body=f\"username is required: {wxid}\")\n\n html_outpath = os.path.join(gc.work_path, \"export\", my_wxid, \"html\")\n if not os.path.exists(html_outpath):\n os.makedirs(html_outpath)\n assert isinstance(html_outpath, str)\n outpath = os.path.join(html_outpath, wxid)\n if os.path.exists(outpath):\n shutil.rmtree(outpath, ignore_errors=True)\n web_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), \"ui\", \"web\")\n shutil.copytree(web_path, outpath)\n\n code, ret = export_html(wxid, outpath, db_config, my_wxid=my_wxid)\n\n if code:\n return ReJson(0, ret)\n else:\n return ReJson(2001, body=ret)\n","is_binary":false,"path":"wxdump_linux/api/routes/export.py","ref":""}