解锁并提取Linux客户端微信数据库 (vibe coded)
at 88 lines 2.6 kB view raw
1# -*- coding: utf-8 -*-# 2# ------------------------------------------------------------------------------- 3# Name: contacts.py 4# Description: 联系人相关接口 5# ------------------------------------------------------------------------------- 6from typing import List 7 8from fastapi import APIRouter, Body 9 10from wxdump_linux.db import DBHandler 11from ..response import ReJson 12from ..decorators import error9999 13from ..config import gc 14 15contacts_router = APIRouter() 16 17 18@contacts_router.api_route('/is_init', methods=["GET", 'POST']) 19@error9999 20def is_init(): 21 """ 22 是否初始化 23 :return: 24 """ 25 local_wxids = gc.get_local_wxids() 26 if len(local_wxids) > 1: 27 return ReJson(0, True) 28 return ReJson(0, False) 29 30 31@contacts_router.api_route('/mywxid', methods=["GET", 'POST']) 32@error9999 33def mywxid(): 34 """ 35 获取我的微信id 36 :return: 37 """ 38 my_wxid = gc.get_conf(gc.at, "last") 39 if not my_wxid: return ReJson(1001, body="my_wxid is required") 40 return ReJson(0, {"my_wxid": my_wxid}) 41 42 43@contacts_router.api_route('/user_session_list', methods=["GET", 'POST']) 44@error9999 45def user_session_list(): 46 """ 47 获取联系人列表 48 :return: 49 """ 50 my_wxid = gc.get_conf(gc.at, "last") 51 if not my_wxid: return ReJson(1001, body="my_wxid is required") 52 db_config = gc.get_conf(my_wxid, "db_config") 53 db = DBHandler(db_config, my_wxid=my_wxid) 54 ret = db.get_session_list() 55 return ReJson(0, list(ret.values())) 56 57 58@contacts_router.api_route('/user_labels_dict', methods=["GET", 'POST']) 59@error9999 60def user_labels_dict(): 61 """ 62 获取标签字典 63 :return: 64 """ 65 my_wxid = gc.get_conf(gc.at, "last") 66 if not my_wxid: return ReJson(1001, body="my_wxid is required") 67 db_config = gc.get_conf(my_wxid, "db_config") 68 db = DBHandler(db_config, my_wxid=my_wxid) 69 user_labels_dict = db.get_labels() 70 return ReJson(0, user_labels_dict) 71 72 73@contacts_router.post('/user_list') 74@error9999 75def user_list(word: str = Body("", embed=True), wxids: List[str] = Body(None), labels: List[str] = Body(None)): 76 """ 77 获取联系人列表,可用于搜索 78 :return: 79 """ 80 if isinstance(wxids, str) and wxids == '' or wxids is None: wxids = [] 81 if isinstance(labels, str) and labels == '' or labels is None: labels = [] 82 83 my_wxid = gc.get_conf(gc.at, "last") 84 if not my_wxid: return ReJson(1001, body="my_wxid is required") 85 db_config = gc.get_conf(my_wxid, "db_config") 86 db = DBHandler(db_config, my_wxid=my_wxid) 87 users = db.get_user(word=word, wxids=wxids, labels=labels) 88 return ReJson(0, users)