解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: PublicMsg.py
4# Description: 负责处理公众号数据库信息
5# Author: xaoyaoo
6# Date: 2024/07/03
7# -------------------------------------------------------------------------------
8from .db_msg import MsgHandler
9from .utils import db_error
10
11
12class PublicMsgHandler(MsgHandler):
13 _class_name = "PublicMSG"
14 PublicMSG_required_tables = ["PublicMsg"]
15
16 def PublicMsg_add_index(self):
17 """
18 添加索引,加快查询速度
19 """
20 # 检查是否存在索引
21 if not self.tables_exist("PublicMsg"):
22 return
23 sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker ON PublicMsg(StrTalker);"
24 self.execute(sql)
25 sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_CreateTime ON PublicMsg(CreateTime);"
26 self.execute(sql)
27 sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker_CreateTime ON PublicMsg(StrTalker, CreateTime);"
28 self.execute(sql)
29
30 @db_error
31 def get_plc_msg_count(self, wxids: list = ""):
32 """
33 获取聊天记录数量,根据wxid获取单个联系人的聊天记录数量,不传wxid则获取所有联系人的聊天记录数量
34 :param wxids: wxid list
35 :return: 聊天记录数量列表 {wxid: chat_count}
36 """
37 if not self.tables_exist("PublicMsg"):
38 return {}
39 if isinstance(wxids, str) and wxids:
40 wxids = [wxids]
41 if wxids:
42 wxids = "('" + "','".join(wxids) + "')"
43 sql = f"SELECT StrTalker, COUNT(*) FROM PublicMsg WHERE StrTalker IN {wxids} GROUP BY StrTalker ORDER BY COUNT(*) DESC;"
44 else:
45 sql = f"SELECT StrTalker, COUNT(*) FROM PublicMsg GROUP BY StrTalker ORDER BY COUNT(*) DESC;"
46 sql_total = f"SELECT COUNT(*) FROM MSG;"
47
48 result = self.execute(sql)
49 total_ret = self.execute(sql_total)
50
51 if not result:
52 return {}
53 total = 0
54 if total_ret and len(total_ret) > 0:
55 total = total_ret[0][0]
56
57 msg_count = {"total": total}
58 msg_count.update({row[0]: row[1] for row in result})
59 return msg_count
60
61 @db_error
62 def get_plc_msg_list(self, wxids: list or str = "", start_index=0, page_size=500, msg_type: str = "",
63 msg_sub_type: str = "", start_createtime=None, end_createtime=None, my_talker="我"):
64 """
65 获取聊天记录列表
66 :param wxids: [wxid]
67 :param start_index: 起始索引
68 :param page_size: 页大小
69 :param msg_type: 消息类型
70 :param msg_sub_type: 消息子类型
71 :param start_createtime: 开始时间
72 :param end_createtime: 结束时间
73 :return: 聊天记录列表 {"id": _id, "MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender,
74 "talker": talker, "room_name": StrTalker, "msg": msg, "src": src, "extra": {},
75 "CreateTime": CreateTime, }
76 """
77 if not self.tables_exist("PublicMsg"):
78 return [], []
79
80 if isinstance(wxids, str) and wxids:
81 wxids = [wxids]
82 param = ()
83 sql_wxid, param = (f"AND StrTalker in ({', '.join('?' for _ in wxids)}) ",
84 param + tuple(wxids)) if wxids else ("", param)
85 sql_type, param = ("AND Type=? ", param + (msg_type,)) if msg_type else ("", param)
86 sql_sub_type, param = ("AND SubType=? ", param + (msg_sub_type,)) if msg_type and msg_sub_type else ("", param)
87 sql_start_createtime, param = ("AND CreateTime>=? ", param + (start_createtime,)) if start_createtime else (
88 "", param)
89 sql_end_createtime, param = ("AND CreateTime<=? ", param + (end_createtime,)) if end_createtime else ("", param)
90
91 sql = (
92 "SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status,"
93 "MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3,"
94 "Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2,"
95 "ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
96 "FROM PublicMsg WHERE 1=1 "
97 f"{sql_wxid}"
98 f"{sql_type}"
99 f"{sql_sub_type}"
100 f"{sql_start_createtime}"
101 f"{sql_end_createtime}"
102 f"ORDER BY CreateTime ASC LIMIT ?,?"
103 )
104 param = param + (start_index, page_size)
105 result = self.execute(sql, param)
106 if not result:
107 return [], []
108
109 result_data = (self.get_msg_detail(row, my_talker=my_talker) for row in result)
110 rdata = list(result_data) # 转为列表
111 wxid_list = {d['talker'] for d in rdata} # 创建一个无重复的 wxid 列表
112
113 return rdata, list(wxid_list)