解锁并提取Linux客户端微信数据库 (vibe coded)
at 113 lines 5.0 kB view raw
1# -*- coding: utf-8 -*-# 2# ------------------------------------------------------------------------------- 3# Name: PublicMsg.py 4# Description: 负责处理公众号数据库信息 5# Author: xaoyaoo 6# Date: 2024/07/03 7# ------------------------------------------------------------------------------- 8from .db_msg import MsgHandler 9from .utils import db_error 10 11 12class PublicMsgHandler(MsgHandler): 13 _class_name = "PublicMSG" 14 PublicMSG_required_tables = ["PublicMsg"] 15 16 def PublicMsg_add_index(self): 17 """ 18 添加索引,加快查询速度 19 """ 20 # 检查是否存在索引 21 if not self.tables_exist("PublicMsg"): 22 return 23 sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker ON PublicMsg(StrTalker);" 24 self.execute(sql) 25 sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_CreateTime ON PublicMsg(CreateTime);" 26 self.execute(sql) 27 sql = "CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker_CreateTime ON PublicMsg(StrTalker, CreateTime);" 28 self.execute(sql) 29 30 @db_error 31 def get_plc_msg_count(self, wxids: list = ""): 32 """ 33 获取聊天记录数量,根据wxid获取单个联系人的聊天记录数量,不传wxid则获取所有联系人的聊天记录数量 34 :param wxids: wxid list 35 :return: 聊天记录数量列表 {wxid: chat_count} 36 """ 37 if not self.tables_exist("PublicMsg"): 38 return {} 39 if isinstance(wxids, str) and wxids: 40 wxids = [wxids] 41 if wxids: 42 wxids = "('" + "','".join(wxids) + "')" 43 sql = f"SELECT StrTalker, COUNT(*) FROM PublicMsg WHERE StrTalker IN {wxids} GROUP BY StrTalker ORDER BY COUNT(*) DESC;" 44 else: 45 sql = f"SELECT StrTalker, COUNT(*) FROM PublicMsg GROUP BY StrTalker ORDER BY COUNT(*) DESC;" 46 sql_total = f"SELECT COUNT(*) FROM MSG;" 47 48 result = self.execute(sql) 49 total_ret = self.execute(sql_total) 50 51 if not result: 52 return {} 53 total = 0 54 if total_ret and len(total_ret) > 0: 55 total = total_ret[0][0] 56 57 msg_count = {"total": total} 58 msg_count.update({row[0]: row[1] for row in result}) 59 return msg_count 60 61 @db_error 62 def get_plc_msg_list(self, wxids: list or str = "", start_index=0, page_size=500, msg_type: str = "", 63 msg_sub_type: str = "", start_createtime=None, end_createtime=None, my_talker=""): 64 """ 65 获取聊天记录列表 66 :param wxids: [wxid] 67 :param start_index: 起始索引 68 :param page_size: 页大小 69 :param msg_type: 消息类型 70 :param msg_sub_type: 消息子类型 71 :param start_createtime: 开始时间 72 :param end_createtime: 结束时间 73 :return: 聊天记录列表 {"id": _id, "MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender, 74 "talker": talker, "room_name": StrTalker, "msg": msg, "src": src, "extra": {}, 75 "CreateTime": CreateTime, } 76 """ 77 if not self.tables_exist("PublicMsg"): 78 return [], [] 79 80 if isinstance(wxids, str) and wxids: 81 wxids = [wxids] 82 param = () 83 sql_wxid, param = (f"AND StrTalker in ({', '.join('?' for _ in wxids)}) ", 84 param + tuple(wxids)) if wxids else ("", param) 85 sql_type, param = ("AND Type=? ", param + (msg_type,)) if msg_type else ("", param) 86 sql_sub_type, param = ("AND SubType=? ", param + (msg_sub_type,)) if msg_type and msg_sub_type else ("", param) 87 sql_start_createtime, param = ("AND CreateTime>=? ", param + (start_createtime,)) if start_createtime else ( 88 "", param) 89 sql_end_createtime, param = ("AND CreateTime<=? ", param + (end_createtime,)) if end_createtime else ("", param) 90 91 sql = ( 92 "SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status," 93 "MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3," 94 "Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2," 95 "ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id " 96 "FROM PublicMsg WHERE 1=1 " 97 f"{sql_wxid}" 98 f"{sql_type}" 99 f"{sql_sub_type}" 100 f"{sql_start_createtime}" 101 f"{sql_end_createtime}" 102 f"ORDER BY CreateTime ASC LIMIT ?,?" 103 ) 104 param = param + (start_index, page_size) 105 result = self.execute(sql, param) 106 if not result: 107 return [], [] 108 109 result_data = (self.get_msg_detail(row, my_talker=my_talker) for row in result) 110 rdata = list(result_data) # 转为列表 111 wxid_list = {d['talker'] for d in rdata} # 创建一个无重复的 wxid 列表 112 113 return rdata, list(wxid_list)