{"contents":"# -*- coding: utf-8 -*-#\n# -------------------------------------------------------------------------------\n# Name: local_server.py\n# Description: \n# Author: xaoyaoo\n# Date: 2024/08/01\n# -------------------------------------------------------------------------------\nimport os\nimport time\nimport shutil\nfrom pydantic import BaseModel\nfrom fastapi import APIRouter, Body\n\nfrom wxdump_linux import get_wx_db\nfrom wxdump_linux import get_wx_info, batch_decrypt, merge_db, decrypt_merge\n\nfrom .response import ReJson, RqJson\nfrom .utils import error9999, ls_loger, random_str, gc\n\nls_api = APIRouter()\n\n\n# 以下为初始化相关 *******************************************************************************************************\n\n@ls_api.post('/init_last_local_wxid')\n@error9999\ndef init_last_local_wxid():\n \"\"\"\n 初始化,包括key\n :return:\n \"\"\"\n local_wxid = gc.get_local_wxids()\n local_wxid.remove(gc.at)\n if local_wxid:\n return ReJson(0, {\"local_wxids\": local_wxid})\n return ReJson(0, {\"local_wxids\": []})\n\n\n@ls_api.post('/init_last')\n@error9999\ndef init_last(my_wxid: str = Body(..., embed=True)):\n \"\"\"\n 是否初始化\n :return:\n \"\"\"\n my_wxid = my_wxid.strip().strip(\"'\").strip('\"') if isinstance(my_wxid, str) else \"\"\n ls_loger.info(f\"[+] init_last: {my_wxid}\")\n if not my_wxid:\n my_wxid = gc.get_conf(gc.at, \"last\")\n if not my_wxid: return ReJson(1001, body=\"my_wxid is required\")\n if my_wxid:\n gc.set_conf(gc.at, \"last\", my_wxid)\n merge_path = gc.get_conf(my_wxid, \"merge_path\")\n wx_path = gc.get_conf(my_wxid, \"wx_path\")\n key = gc.get_conf(my_wxid, \"key\")\n rdata = {\n \"merge_path\": merge_path,\n \"wx_path\": wx_path,\n \"key\": key,\n \"my_wxid\": my_wxid,\n \"is_init\": True,\n }\n if merge_path and wx_path:\n return ReJson(0, rdata)\n return ReJson(0, {\"is_init\": False, \"my_wxid\": \"\"})\n\n\nclass InitKeyRequest(BaseModel):\n wx_path: str\n my_wxid: str\n\n\n@ls_api.post('/init_key')\n@error9999\ndef init_key(request: InitKeyRequest):\n \"\"\"\n 初始化:从进程内存提取密钥,解密并合并数据库\n :param request:\n :return:\n \"\"\"\n wx_path = request.wx_path.strip().strip(\"'\").strip('\"')\n my_wxid = request.my_wxid.strip().strip(\"'\").strip('\"')\n if not wx_path:\n return ReJson(1002, body=f\"wx_path is required: {wx_path}\")\n if not os.path.exists(wx_path):\n return ReJson(1001, body=f\"wx_path not exists: {wx_path}\")\n if not my_wxid:\n return ReJson(1002, body=f\"my_wxid is required: {my_wxid}\")\n\n out_path = os.path.join(gc.work_path, \"decrypted\", my_wxid) if my_wxid else os.path.join(gc.work_path, \"decrypted\")\n # 检查文件夹中文件是否被占用\n if os.path.exists(out_path):\n try:\n shutil.rmtree(out_path)\n except PermissionError as e:\n ls_loger.error(f\"{e}\", exc_info=True)\n return ReJson(2001, body=str(e))\n\n code, merge_save_path = decrypt_merge(wx_path=wx_path, outpath=str(out_path))\n time.sleep(1)\n if code:\n # 移动merge_save_path到g.work_path/my_wxid\n if not os.path.exists(os.path.join(gc.work_path, my_wxid)):\n os.makedirs(os.path.join(gc.work_path, my_wxid))\n merge_save_path_new = os.path.join(gc.work_path, my_wxid, \"merge_all.db\")\n shutil.move(merge_save_path, str(merge_save_path_new))\n\n # 删除out_path\n if os.path.exists(out_path):\n try:\n shutil.rmtree(out_path)\n except PermissionError as e:\n ls_loger.error(f\"{e}\", exc_info=True)\n db_config = {\n \"key\": random_str(16),\n \"type\": \"sqlite\",\n \"path\": merge_save_path_new\n }\n gc.set_conf(my_wxid, \"db_config\", db_config)\n gc.set_conf(my_wxid, \"merge_path\", merge_save_path_new)\n gc.set_conf(my_wxid, \"wx_path\", wx_path)\n gc.set_conf(my_wxid, \"my_wxid\", my_wxid)\n gc.set_conf(gc.at, \"last\", my_wxid)\n rdata = {\n \"merge_path\": merge_save_path_new,\n \"wx_path\": wx_path,\n \"my_wxid\": my_wxid,\n \"is_init\": True,\n }\n return ReJson(0, rdata)\n else:\n return ReJson(2001, body=merge_save_path)\n\n\nclass InitNoKeyRequest(BaseModel):\n merge_path: str\n wx_path: str\n my_wxid: str\n\n\n@ls_api.post('/init_nokey')\n@error9999\ndef init_nokey(request: InitNoKeyRequest):\n \"\"\"\n 初始化,包括key\n :return:\n \"\"\"\n merge_path = request.merge_path.strip().strip(\"'\").strip('\"')\n wx_path = request.wx_path.strip().strip(\"'\").strip('\"')\n my_wxid = request.my_wxid.strip().strip(\"'\").strip('\"')\n\n if not wx_path:\n return ReJson(1002, body=f\"wx_path is required: {wx_path}\")\n if not os.path.exists(wx_path):\n return ReJson(1001, body=f\"wx_path not exists: {wx_path}\")\n if not merge_path:\n return ReJson(1002, body=f\"merge_path is required: {merge_path}\")\n if not my_wxid:\n return ReJson(1002, body=f\"my_wxid is required: {my_wxid}\")\n\n key = gc.get_conf(my_wxid, \"key\")\n db_config = {\n \"key\": random_str(16),\n \"type\": \"sqlite\",\n \"path\": merge_path\n }\n gc.set_conf(my_wxid, \"db_config\", db_config)\n gc.set_conf(my_wxid, \"merge_path\", merge_path)\n gc.set_conf(my_wxid, \"wx_path\", wx_path)\n gc.set_conf(my_wxid, \"key\", key)\n gc.set_conf(my_wxid, \"my_wxid\", my_wxid)\n gc.set_conf(gc.at, \"last\", my_wxid)\n rdata = {\n \"merge_path\": merge_path,\n \"wx_path\": wx_path,\n \"key\": \"\",\n \"my_wxid\": my_wxid,\n \"is_init\": True,\n }\n return ReJson(0, rdata)\n\n\n# END 以上为初始化相关 ***************************************************************************************************\n\n\n# start 这部分为专业工具的api *********************************************************************************************\n\n@ls_api.api_route('/wxinfo', methods=[\"GET\", 'POST'])\n@error9999\ndef get_wxinfo():\n \"\"\"\n 获取微信信息(Linux 版:从进程内存提取)\n :return:\n \"\"\"\n wxinfos = get_wx_info()\n return ReJson(0, wxinfos)\n\n\n@ls_api.api_route('/decrypt', methods=[\"GET\", 'POST'])\n@error9999\ndef get_decrypt(wxdbPath: str, outPath: str = \"\"):\n \"\"\"\n 解密(Linux 版:自动从进程内存提取密钥)\n :return:\n \"\"\"\n from wxdump_linux.linux.memscan import extract_all_keys\n if not outPath:\n outPath = gc.work_path\n db_keys = extract_all_keys(wxdbPath)\n wxinfos = batch_decrypt(db_keys, outPath)\n return ReJson(0, str(wxinfos))\n\n\nclass MergeRequest(BaseModel):\n dbPath: str\n outPath: str\n\n\n@ls_api.post('/merge')\n@error9999\ndef get_merge(request: MergeRequest):\n \"\"\"\n 合并\n :return:\n \"\"\"\n wxdb_path = request.dbPath\n out_path = request.outPath\n db_path = get_wx_db(wxdb_path)\n # for i in db_path:print(i)\n rdata = merge_db(db_path, out_path)\n return ReJson(0, str(rdata))\n\n# END 这部分为专业工具的api ***********************************************************************************************\n","is_binary":false,"path":"wxdump_linux/api/local_server.py","ref":""}