解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: analytics.py
4# Description: 统计分析(wordcloud 等)
5# -------------------------------------------------------------------------------
6from collections import Counter
7
8from pydantic import BaseModel
9from fastapi import APIRouter
10
11from wxdump_linux.db import DBHandler
12from ..response import ReJson
13from ..decorators import error9999
14from ..config import gc
15
16analytics_router = APIRouter()
17
18
19class DateCountRequest(BaseModel):
20 wxid: str = ""
21 start_time: int = 0
22 end_time: int = 0
23 time_format: str = "%Y-%m-%d"
24
25
26@analytics_router.api_route('/date_count', methods=["GET", 'POST'])
27def get_date_count(request: DateCountRequest):
28 """
29 获取日期统计
30 :return:
31 """
32 wxid = request.wxid
33 start_time = request.start_time
34 end_time = request.end_time
35 time_format = request.time_format
36
37 my_wxid = gc.get_conf(gc.at, "last")
38 if not my_wxid: return ReJson(1001, body="my_wxid is required")
39 db_config = gc.get_conf(my_wxid, "db_config")
40 db = DBHandler(db_config, my_wxid=my_wxid)
41 date_count = db.get_date_count(wxid=wxid, start_time=start_time, end_time=end_time, time_format=time_format)
42 return ReJson(0, date_count)
43
44
45class TopTalkerCountRequest(BaseModel):
46 top: int = 10
47 start_time: int = 0
48 end_time: int = 0
49
50
51@analytics_router.api_route('/top_talker_count', methods=["GET", 'POST'])
52def get_top_talker_count(request: TopTalkerCountRequest):
53 """
54 获取最多聊天的人
55 :return:
56 """
57 top = request.top
58 start_time = request.start_time
59 end_time = request.end_time
60
61 my_wxid = gc.get_conf(gc.at, "last")
62 if not my_wxid: return ReJson(1001, body="my_wxid is required")
63 db_config = gc.get_conf(my_wxid, "db_config")
64 date_count = DBHandler(db_config, my_wxid=my_wxid).get_top_talker_count(top=top, start_time=start_time,
65 end_time=end_time)
66 return ReJson(0, date_count)
67
68
69class WordCloudRequest(BaseModel):
70 target: str = "signature"
71
72
73@analytics_router.api_route('/wordcloud', methods=["GET", 'POST'])
74@error9999
75def get_wordcloud(request: WordCloudRequest):
76 try:
77 import jieba
78 except ImportError:
79 return ReJson(9999, body="jieba is required")
80
81 target = request.target
82 if not target:
83 return ReJson(1002, body="target is required")
84
85 my_wxid = gc.get_conf(gc.at, "last")
86 if not my_wxid: return ReJson(1001, body="my_wxid is required")
87 db_config = gc.get_conf(my_wxid, "db_config")
88 db = DBHandler(db_config, my_wxid=my_wxid)
89
90 if target == "signature":
91 users = db.get_user()
92 signature_list = []
93 for wxid, user in users.items():
94 ExtraBuf = user.get("ExtraBuf", {})
95 signature = ExtraBuf.get("个性签名", "") if ExtraBuf else ""
96 if signature:
97 signature_list.append(signature)
98 signature_str = " ".join(signature_list)
99 words = jieba.lcut(signature_str)
100 words = [word for word in words if len(word) > 1]
101 count_dict = dict(Counter(words))
102 return ReJson(0, count_dict)
103 elif target == "nickname":
104 users = db.get_user()
105 nickname_list = []
106 for wxid, user in users.items():
107 nickname = user.get("nickname", "")
108 if nickname:
109 nickname_list.append(nickname)
110 nickname_str = " ".join(nickname_list)
111 words = jieba.lcut(nickname_str)
112 words = [word for word in words if len(word) > 1]
113 count_dict = dict(Counter(words))
114 return ReJson(0, count_dict)
115
116 return ReJson(1002, body="target is required")