解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: contacts.py
4# Description: 联系人相关接口
5# -------------------------------------------------------------------------------
6from typing import List
7
8from fastapi import APIRouter, Body
9
10from wxdump_linux.db import DBHandler
11from ..response import ReJson
12from ..decorators import error9999
13from ..config import gc
14
15contacts_router = APIRouter()
16
17
18@contacts_router.api_route('/is_init', methods=["GET", 'POST'])
19@error9999
20def is_init():
21 """
22 是否初始化
23 :return:
24 """
25 local_wxids = gc.get_local_wxids()
26 if len(local_wxids) > 1:
27 return ReJson(0, True)
28 return ReJson(0, False)
29
30
31@contacts_router.api_route('/mywxid', methods=["GET", 'POST'])
32@error9999
33def mywxid():
34 """
35 获取我的微信id
36 :return:
37 """
38 my_wxid = gc.get_conf(gc.at, "last")
39 if not my_wxid: return ReJson(1001, body="my_wxid is required")
40 return ReJson(0, {"my_wxid": my_wxid})
41
42
43@contacts_router.api_route('/user_session_list', methods=["GET", 'POST'])
44@error9999
45def user_session_list():
46 """
47 获取联系人列表
48 :return:
49 """
50 my_wxid = gc.get_conf(gc.at, "last")
51 if not my_wxid: return ReJson(1001, body="my_wxid is required")
52 db_config = gc.get_conf(my_wxid, "db_config")
53 db = DBHandler(db_config, my_wxid=my_wxid)
54 ret = db.get_session_list()
55 return ReJson(0, list(ret.values()))
56
57
58@contacts_router.api_route('/user_labels_dict', methods=["GET", 'POST'])
59@error9999
60def user_labels_dict():
61 """
62 获取标签字典
63 :return:
64 """
65 my_wxid = gc.get_conf(gc.at, "last")
66 if not my_wxid: return ReJson(1001, body="my_wxid is required")
67 db_config = gc.get_conf(my_wxid, "db_config")
68 db = DBHandler(db_config, my_wxid=my_wxid)
69 user_labels_dict = db.get_labels()
70 return ReJson(0, user_labels_dict)
71
72
73@contacts_router.post('/user_list')
74@error9999
75def user_list(word: str = Body("", embed=True), wxids: List[str] = Body(None), labels: List[str] = Body(None)):
76 """
77 获取联系人列表,可用于搜索
78 :return:
79 """
80 if isinstance(wxids, str) and wxids == '' or wxids is None: wxids = []
81 if isinstance(labels, str) and labels == '' or labels is None: labels = []
82
83 my_wxid = gc.get_conf(gc.at, "last")
84 if not my_wxid: return ReJson(1001, body="my_wxid is required")
85 db_config = gc.get_conf(my_wxid, "db_config")
86 db = DBHandler(db_config, my_wxid=my_wxid)
87 users = db.get_user(word=word, wxids=wxids, labels=labels)
88 return ReJson(0, users)