解锁并提取Linux客户端微信数据库 (vibe coded)
at 233 lines 7.2 kB view raw
1# -*- coding: utf-8 -*-# 2# ------------------------------------------------------------------------------- 3# Name: local_server.py 4# Description: 5# Author: xaoyaoo 6# Date: 2024/08/01 7# ------------------------------------------------------------------------------- 8import os 9import time 10import shutil 11from pydantic import BaseModel 12from fastapi import APIRouter, Body 13 14from wxdump_linux import get_wx_db 15from wxdump_linux import get_wx_info, batch_decrypt, merge_db, decrypt_merge 16 17from .response import ReJson, RqJson 18from .utils import error9999, ls_loger, random_str, gc 19 20ls_api = APIRouter() 21 22 23# 以下为初始化相关 ******************************************************************************************************* 24 25@ls_api.post('/init_last_local_wxid') 26@error9999 27def init_last_local_wxid(): 28 """ 29 初始化,包括key 30 :return: 31 """ 32 local_wxid = gc.get_local_wxids() 33 local_wxid.remove(gc.at) 34 if local_wxid: 35 return ReJson(0, {"local_wxids": local_wxid}) 36 return ReJson(0, {"local_wxids": []}) 37 38 39@ls_api.post('/init_last') 40@error9999 41def init_last(my_wxid: str = Body(..., embed=True)): 42 """ 43 是否初始化 44 :return: 45 """ 46 my_wxid = my_wxid.strip().strip("'").strip('"') if isinstance(my_wxid, str) else "" 47 ls_loger.info(f"[+] init_last: {my_wxid}") 48 if not my_wxid: 49 my_wxid = gc.get_conf(gc.at, "last") 50 if not my_wxid: return ReJson(1001, body="my_wxid is required") 51 if my_wxid: 52 gc.set_conf(gc.at, "last", my_wxid) 53 merge_path = gc.get_conf(my_wxid, "merge_path") 54 wx_path = gc.get_conf(my_wxid, "wx_path") 55 key = gc.get_conf(my_wxid, "key") 56 rdata = { 57 "merge_path": merge_path, 58 "wx_path": wx_path, 59 "key": key, 60 "my_wxid": my_wxid, 61 "is_init": True, 62 } 63 if merge_path and wx_path: 64 return ReJson(0, rdata) 65 return ReJson(0, {"is_init": False, "my_wxid": ""}) 66 67 68class InitKeyRequest(BaseModel): 69 wx_path: str 70 my_wxid: str 71 72 73@ls_api.post('/init_key') 74@error9999 75def init_key(request: InitKeyRequest): 76 """ 77 初始化:从进程内存提取密钥,解密并合并数据库 78 :param request: 79 :return: 80 """ 81 wx_path = request.wx_path.strip().strip("'").strip('"') 82 my_wxid = request.my_wxid.strip().strip("'").strip('"') 83 if not wx_path: 84 return ReJson(1002, body=f"wx_path is required: {wx_path}") 85 if not os.path.exists(wx_path): 86 return ReJson(1001, body=f"wx_path not exists: {wx_path}") 87 if not my_wxid: 88 return ReJson(1002, body=f"my_wxid is required: {my_wxid}") 89 90 out_path = os.path.join(gc.work_path, "decrypted", my_wxid) if my_wxid else os.path.join(gc.work_path, "decrypted") 91 # 检查文件夹中文件是否被占用 92 if os.path.exists(out_path): 93 try: 94 shutil.rmtree(out_path) 95 except PermissionError as e: 96 ls_loger.error(f"{e}", exc_info=True) 97 return ReJson(2001, body=str(e)) 98 99 code, merge_save_path = decrypt_merge(wx_path=wx_path, outpath=str(out_path)) 100 time.sleep(1) 101 if code: 102 # 移动merge_save_path到g.work_path/my_wxid 103 if not os.path.exists(os.path.join(gc.work_path, my_wxid)): 104 os.makedirs(os.path.join(gc.work_path, my_wxid)) 105 merge_save_path_new = os.path.join(gc.work_path, my_wxid, "merge_all.db") 106 shutil.move(merge_save_path, str(merge_save_path_new)) 107 108 # 删除out_path 109 if os.path.exists(out_path): 110 try: 111 shutil.rmtree(out_path) 112 except PermissionError as e: 113 ls_loger.error(f"{e}", exc_info=True) 114 db_config = { 115 "key": random_str(16), 116 "type": "sqlite", 117 "path": merge_save_path_new 118 } 119 gc.set_conf(my_wxid, "db_config", db_config) 120 gc.set_conf(my_wxid, "merge_path", merge_save_path_new) 121 gc.set_conf(my_wxid, "wx_path", wx_path) 122 gc.set_conf(my_wxid, "my_wxid", my_wxid) 123 gc.set_conf(gc.at, "last", my_wxid) 124 rdata = { 125 "merge_path": merge_save_path_new, 126 "wx_path": wx_path, 127 "my_wxid": my_wxid, 128 "is_init": True, 129 } 130 return ReJson(0, rdata) 131 else: 132 return ReJson(2001, body=merge_save_path) 133 134 135class InitNoKeyRequest(BaseModel): 136 merge_path: str 137 wx_path: str 138 my_wxid: str 139 140 141@ls_api.post('/init_nokey') 142@error9999 143def init_nokey(request: InitNoKeyRequest): 144 """ 145 初始化,包括key 146 :return: 147 """ 148 merge_path = request.merge_path.strip().strip("'").strip('"') 149 wx_path = request.wx_path.strip().strip("'").strip('"') 150 my_wxid = request.my_wxid.strip().strip("'").strip('"') 151 152 if not wx_path: 153 return ReJson(1002, body=f"wx_path is required: {wx_path}") 154 if not os.path.exists(wx_path): 155 return ReJson(1001, body=f"wx_path not exists: {wx_path}") 156 if not merge_path: 157 return ReJson(1002, body=f"merge_path is required: {merge_path}") 158 if not my_wxid: 159 return ReJson(1002, body=f"my_wxid is required: {my_wxid}") 160 161 key = gc.get_conf(my_wxid, "key") 162 db_config = { 163 "key": random_str(16), 164 "type": "sqlite", 165 "path": merge_path 166 } 167 gc.set_conf(my_wxid, "db_config", db_config) 168 gc.set_conf(my_wxid, "merge_path", merge_path) 169 gc.set_conf(my_wxid, "wx_path", wx_path) 170 gc.set_conf(my_wxid, "key", key) 171 gc.set_conf(my_wxid, "my_wxid", my_wxid) 172 gc.set_conf(gc.at, "last", my_wxid) 173 rdata = { 174 "merge_path": merge_path, 175 "wx_path": wx_path, 176 "key": "", 177 "my_wxid": my_wxid, 178 "is_init": True, 179 } 180 return ReJson(0, rdata) 181 182 183# END 以上为初始化相关 *************************************************************************************************** 184 185 186# start 这部分为专业工具的api ********************************************************************************************* 187 188@ls_api.api_route('/wxinfo', methods=["GET", 'POST']) 189@error9999 190def get_wxinfo(): 191 """ 192 获取微信信息(Linux 版:从进程内存提取) 193 :return: 194 """ 195 wxinfos = get_wx_info() 196 return ReJson(0, wxinfos) 197 198 199@ls_api.api_route('/decrypt', methods=["GET", 'POST']) 200@error9999 201def get_decrypt(wxdbPath: str, outPath: str = ""): 202 """ 203 解密(Linux 版:自动从进程内存提取密钥) 204 :return: 205 """ 206 from wxdump_linux.linux.memscan import extract_all_keys 207 if not outPath: 208 outPath = gc.work_path 209 db_keys = extract_all_keys(wxdbPath) 210 wxinfos = batch_decrypt(db_keys, outPath) 211 return ReJson(0, str(wxinfos)) 212 213 214class MergeRequest(BaseModel): 215 dbPath: str 216 outPath: str 217 218 219@ls_api.post('/merge') 220@error9999 221def get_merge(request: MergeRequest): 222 """ 223 合并 224 :return: 225 """ 226 wxdb_path = request.dbPath 227 out_path = request.outPath 228 db_path = get_wx_db(wxdb_path) 229 # for i in db_path:print(i) 230 rdata = merge_db(db_path, out_path) 231 return ReJson(0, str(rdata)) 232 233# END 这部分为专业工具的api ***********************************************************************************************