{"contents":"# -*- coding: utf-8 -*-#\n# -------------------------------------------------------------------------------\n# Name: PublicMsg.py\n# Description: 负责处理公众号数据库信息\n# Author: xaoyaoo\n# Date: 2024/07/03\n# -------------------------------------------------------------------------------\nfrom .db_msg import MsgHandler\nfrom .utils import db_error\n\n\nclass PublicMsgHandler(MsgHandler):\n _class_name = \"PublicMSG\"\n PublicMSG_required_tables = [\"PublicMsg\"]\n\n def PublicMsg_add_index(self):\n \"\"\"\n 添加索引,加快查询速度\n \"\"\"\n # 检查是否存在索引\n if not self.tables_exist(\"PublicMsg\"):\n return\n sql = \"CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker ON PublicMsg(StrTalker);\"\n self.execute(sql)\n sql = \"CREATE INDEX IF NOT EXISTS idx_PublicMsg_CreateTime ON PublicMsg(CreateTime);\"\n self.execute(sql)\n sql = \"CREATE INDEX IF NOT EXISTS idx_PublicMsg_StrTalker_CreateTime ON PublicMsg(StrTalker, CreateTime);\"\n self.execute(sql)\n\n @db_error\n def get_plc_msg_count(self, wxids: list = \"\"):\n \"\"\"\n 获取聊天记录数量,根据wxid获取单个联系人的聊天记录数量,不传wxid则获取所有联系人的聊天记录数量\n :param wxids: wxid list\n :return: 聊天记录数量列表 {wxid: chat_count}\n \"\"\"\n if not self.tables_exist(\"PublicMsg\"):\n return {}\n if isinstance(wxids, str) and wxids:\n wxids = [wxids]\n if wxids:\n wxids = \"('\" + \"','\".join(wxids) + \"')\"\n sql = f\"SELECT StrTalker, COUNT(*) FROM PublicMsg WHERE StrTalker IN {wxids} GROUP BY StrTalker ORDER BY COUNT(*) DESC;\"\n else:\n sql = f\"SELECT StrTalker, COUNT(*) FROM PublicMsg GROUP BY StrTalker ORDER BY COUNT(*) DESC;\"\n sql_total = f\"SELECT COUNT(*) FROM MSG;\"\n\n result = self.execute(sql)\n total_ret = self.execute(sql_total)\n\n if not result:\n return {}\n total = 0\n if total_ret and len(total_ret) \u003e 0:\n total = total_ret[0][0]\n\n msg_count = {\"total\": total}\n msg_count.update({row[0]: row[1] for row in result})\n return msg_count\n\n @db_error\n def get_plc_msg_list(self, wxids: list or str = \"\", start_index=0, page_size=500, msg_type: str = \"\",\n msg_sub_type: str = \"\", start_createtime=None, end_createtime=None, my_talker=\"我\"):\n \"\"\"\n 获取聊天记录列表\n :param wxids: [wxid]\n :param start_index: 起始索引\n :param page_size: 页大小\n :param msg_type: 消息类型\n :param msg_sub_type: 消息子类型\n :param start_createtime: 开始时间\n :param end_createtime: 结束时间\n :return: 聊天记录列表 {\"id\": _id, \"MsgSvrID\": str(MsgSvrID), \"type_name\": type_name, \"is_sender\": IsSender,\n \"talker\": talker, \"room_name\": StrTalker, \"msg\": msg, \"src\": src, \"extra\": {},\n \"CreateTime\": CreateTime, }\n \"\"\"\n if not self.tables_exist(\"PublicMsg\"):\n return [], []\n\n if isinstance(wxids, str) and wxids:\n wxids = [wxids]\n param = ()\n sql_wxid, param = (f\"AND StrTalker in ({', '.join('?' for _ in wxids)}) \",\n param + tuple(wxids)) if wxids else (\"\", param)\n sql_type, param = (\"AND Type=? \", param + (msg_type,)) if msg_type else (\"\", param)\n sql_sub_type, param = (\"AND SubType=? \", param + (msg_sub_type,)) if msg_type and msg_sub_type else (\"\", param)\n sql_start_createtime, param = (\"AND CreateTime\u003e=? \", param + (start_createtime,)) if start_createtime else (\n \"\", param)\n sql_end_createtime, param = (\"AND CreateTime\u003c=? \", param + (end_createtime,)) if end_createtime else (\"\", param)\n\n sql = (\n \"SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status,\"\n \"MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3,\"\n \"Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2,\"\n \"ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id \"\n \"FROM PublicMsg WHERE 1=1 \"\n f\"{sql_wxid}\"\n f\"{sql_type}\"\n f\"{sql_sub_type}\"\n f\"{sql_start_createtime}\"\n f\"{sql_end_createtime}\"\n f\"ORDER BY CreateTime ASC LIMIT ?,?\"\n )\n param = param + (start_index, page_size)\n result = self.execute(sql, param)\n if not result:\n return [], []\n\n result_data = (self.get_msg_detail(row, my_talker=my_talker) for row in result)\n rdata = list(result_data) # 转为列表\n wxid_list = {d['talker'] for d in rdata} # 创建一个无重复的 wxid 列表\n\n return rdata, list(wxid_list)\n","is_binary":false,"path":"wxdump_linux/db/db_public_msg.py","ref":""}