解锁并提取Linux客户端微信数据库 (vibe coded)
at 116 lines 3.8 kB view raw
1# -*- coding: utf-8 -*-# 2# ------------------------------------------------------------------------------- 3# Name: analytics.py 4# Description: 统计分析(wordcloud 等) 5# ------------------------------------------------------------------------------- 6from collections import Counter 7 8from pydantic import BaseModel 9from fastapi import APIRouter 10 11from wxdump_linux.db import DBHandler 12from ..response import ReJson 13from ..decorators import error9999 14from ..config import gc 15 16analytics_router = APIRouter() 17 18 19class DateCountRequest(BaseModel): 20 wxid: str = "" 21 start_time: int = 0 22 end_time: int = 0 23 time_format: str = "%Y-%m-%d" 24 25 26@analytics_router.api_route('/date_count', methods=["GET", 'POST']) 27def get_date_count(request: DateCountRequest): 28 """ 29 获取日期统计 30 :return: 31 """ 32 wxid = request.wxid 33 start_time = request.start_time 34 end_time = request.end_time 35 time_format = request.time_format 36 37 my_wxid = gc.get_conf(gc.at, "last") 38 if not my_wxid: return ReJson(1001, body="my_wxid is required") 39 db_config = gc.get_conf(my_wxid, "db_config") 40 db = DBHandler(db_config, my_wxid=my_wxid) 41 date_count = db.get_date_count(wxid=wxid, start_time=start_time, end_time=end_time, time_format=time_format) 42 return ReJson(0, date_count) 43 44 45class TopTalkerCountRequest(BaseModel): 46 top: int = 10 47 start_time: int = 0 48 end_time: int = 0 49 50 51@analytics_router.api_route('/top_talker_count', methods=["GET", 'POST']) 52def get_top_talker_count(request: TopTalkerCountRequest): 53 """ 54 获取最多聊天的人 55 :return: 56 """ 57 top = request.top 58 start_time = request.start_time 59 end_time = request.end_time 60 61 my_wxid = gc.get_conf(gc.at, "last") 62 if not my_wxid: return ReJson(1001, body="my_wxid is required") 63 db_config = gc.get_conf(my_wxid, "db_config") 64 date_count = DBHandler(db_config, my_wxid=my_wxid).get_top_talker_count(top=top, start_time=start_time, 65 end_time=end_time) 66 return ReJson(0, date_count) 67 68 69class WordCloudRequest(BaseModel): 70 target: str = "signature" 71 72 73@analytics_router.api_route('/wordcloud', methods=["GET", 'POST']) 74@error9999 75def get_wordcloud(request: WordCloudRequest): 76 try: 77 import jieba 78 except ImportError: 79 return ReJson(9999, body="jieba is required") 80 81 target = request.target 82 if not target: 83 return ReJson(1002, body="target is required") 84 85 my_wxid = gc.get_conf(gc.at, "last") 86 if not my_wxid: return ReJson(1001, body="my_wxid is required") 87 db_config = gc.get_conf(my_wxid, "db_config") 88 db = DBHandler(db_config, my_wxid=my_wxid) 89 90 if target == "signature": 91 users = db.get_user() 92 signature_list = [] 93 for wxid, user in users.items(): 94 ExtraBuf = user.get("ExtraBuf", {}) 95 signature = ExtraBuf.get("个性签名", "") if ExtraBuf else "" 96 if signature: 97 signature_list.append(signature) 98 signature_str = " ".join(signature_list) 99 words = jieba.lcut(signature_str) 100 words = [word for word in words if len(word) > 1] 101 count_dict = dict(Counter(words)) 102 return ReJson(0, count_dict) 103 elif target == "nickname": 104 users = db.get_user() 105 nickname_list = [] 106 for wxid, user in users.items(): 107 nickname = user.get("nickname", "") 108 if nickname: 109 nickname_list.append(nickname) 110 nickname_str = " ".join(nickname_list) 111 words = jieba.lcut(nickname_str) 112 words = [word for word in words if len(word) > 1] 113 count_dict = dict(Counter(words)) 114 return ReJson(0, count_dict) 115 116 return ReJson(1002, body="target is required")