{"contents":"# -*- coding: utf-8 -*-#\n# -------------------------------------------------------------------------------\n# Name: MSG.py\n# Description: 负责处理消息数据库数据\n# Author: xaoyaoo\n# Date: 2024/04/15\n# -------------------------------------------------------------------------------\nimport json\nimport os\nimport re\nimport lz4.block\nimport blackboxprotobuf\n\nfrom .db_base import DatabaseBase\nfrom .utils import db_error, timestamp2str, xml2dict, match_BytesExtra, type_converter\n\n\nclass MsgHandler(DatabaseBase):\n _class_name = \"MSG\"\n MSG_required_tables = [\"MSG\"]\n\n def Msg_add_index(self):\n \"\"\"\n 添加索引,加快查询速度\n \"\"\"\n # 检查是否存在索引\n if not self.tables_exist(\"MSG\"):\n return\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_MSG_StrTalker ON MSG(StrTalker);\")\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_MSG_CreateTime ON MSG(CreateTime);\")\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_MSG_StrTalker_CreateTime ON MSG(StrTalker, CreateTime);\")\n\n @db_error\n def get_m_msg_count(self, wxids: list = \"\"):\n \"\"\"\n 获取聊天记录数量,根据wxid获取单个联系人的聊天记录数量,不传wxid则获取所有联系人的聊天记录数量\n :param wxids: wxid list\n :return: 聊天记录数量列表 {wxid: chat_count, total: total_count}\n \"\"\"\n if isinstance(wxids, str) and wxids:\n wxids = [wxids]\n if wxids:\n wxids = \"('\" + \"','\".join(wxids) + \"')\"\n sql = f\"SELECT StrTalker, COUNT(*) FROM MSG WHERE StrTalker IN {wxids} GROUP BY StrTalker ORDER BY COUNT(*) DESC;\"\n else:\n sql = f\"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;\"\n sql_total = f\"SELECT COUNT(*) FROM MSG;\"\n\n if not self.tables_exist(\"MSG\"):\n return {}\n result = self.execute(sql)\n total_ret = self.execute(sql_total)\n\n if not result:\n return {}\n total = 0\n if total_ret and len(total_ret) \u003e 0:\n total = total_ret[0][0]\n\n msg_count = {\"total\": total}\n msg_count.update({row[0]: row[1] for row in result})\n return msg_count\n\n @db_error\n def get_msg_list(self, wxids: list or str = \"\", start_index=0, page_size=500, msg_type: str = \"\",\n msg_sub_type: str = \"\", start_createtime=None, end_createtime=None, my_talker=\"我\"):\n \"\"\"\n 获取聊天记录列表\n :param wxids: [wxid]\n :param start_index: 起始索引\n :param page_size: 页大小\n :param msg_type: 消息类型\n :param msg_sub_type: 消息子类型\n :param start_createtime: 开始时间\n :param end_createtime: 结束时间\n :param my_talker: 我\n :return: 聊天记录列表 {\"id\": _id, \"MsgSvrID\": str(MsgSvrID), \"type_name\": type_name, \"is_sender\": IsSender,\n \"talker\": talker, \"room_name\": StrTalker, \"msg\": msg, \"src\": src, \"extra\": {},\n \"CreateTime\": CreateTime, }\n \"\"\"\n if not self.tables_exist(\"MSG\"):\n return [], []\n\n if isinstance(wxids, str) and wxids:\n wxids = [wxids]\n param = ()\n sql_wxid, param = (f\"AND StrTalker in ({', '.join('?' for _ in wxids)}) \",\n param + tuple(wxids)) if wxids else (\"\", param)\n sql_type, param = (\"AND Type=? \", param + (msg_type,)) if msg_type else (\"\", param)\n sql_sub_type, param = (\"AND SubType=? \", param + (msg_sub_type,)) if msg_type and msg_sub_type else (\"\", param)\n sql_start_createtime, param = (\"AND CreateTime\u003e=? \", param + (start_createtime,)) if start_createtime else (\n \"\", param)\n sql_end_createtime, param = (\"AND CreateTime\u003c=? \", param + (end_createtime,)) if end_createtime else (\"\", param)\n\n sql = (\n \"SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status,\"\n \"MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3,\"\n \"Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2,\"\n \"ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id \"\n \"FROM MSG WHERE 1=1 \"\n f\"{sql_wxid}\"\n f\"{sql_type}\"\n f\"{sql_sub_type}\"\n f\"{sql_start_createtime}\"\n f\"{sql_end_createtime}\"\n f\"ORDER BY CreateTime ASC LIMIT ?,?\"\n )\n param = param + (start_index, page_size)\n result = self.execute(sql, param)\n if not result:\n return [], []\n\n result_data = (self.get_msg_detail(row, my_talker=my_talker) for row in result)\n rdata = list(result_data) # 转为列表\n wxid_list = {d['talker'] for d in rdata} # 创建一个无重复的 wxid 列表\n return rdata, list(wxid_list)\n\n @db_error\n def get_date_count(self, wxid='', start_time: int = 0, end_time: int = 0, time_format='%Y-%m-%d'):\n \"\"\"\n 获取每日聊天记录数量,包括发送者数量、接收者数量和总数。\n \"\"\"\n if not self.tables_exist(\"MSG\"):\n return {}\n if isinstance(start_time, str) and start_time.isdigit():\n start_time = int(start_time)\n if isinstance(end_time, str) and end_time.isdigit():\n end_time = int(end_time)\n\n # if start_time or end_time is not an integer and not a float, set both to 0\n if not (isinstance(start_time, (int, float)) and isinstance(end_time, (int, float))):\n start_time = 0\n end_time = 0\n params = ()\n\n sql_wxid = \"AND StrTalker = ? \" if wxid else \"\"\n params = params + (wxid,) if wxid else params\n\n sql_time = \"AND CreateTime BETWEEN ? AND ? \" if start_time and end_time else \"\"\n params = params + (start_time, end_time) if start_time and end_time else params\n\n sql = (f\"SELECT strftime('{time_format}', CreateTime, 'unixepoch', 'localtime') AS date, \"\n \" COUNT(*) AS total_count ,\"\n \" SUM(CASE WHEN IsSender = 1 THEN 1 ELSE 0 END) AS sender_count, \"\n \" SUM(CASE WHEN IsSender = 0 THEN 1 ELSE 0 END) AS receiver_count \"\n \"FROM MSG \"\n \"WHERE StrTalker NOT LIKE '%chatroom%' \"\n f\"{sql_wxid} {sql_time} \"\n f\"GROUP BY date ORDER BY date ASC;\")\n result = self.execute(sql, params)\n\n if not result:\n return {}\n # 将查询结果转换为字典\n result_dict = {}\n for row in result:\n date, total_count, sender_count, receiver_count = row\n result_dict[date] = {\n \"sender_count\": sender_count,\n \"receiver_count\": receiver_count,\n \"total_count\": total_count\n }\n return result_dict\n\n @db_error\n def get_top_talker_count(self, top: int = 10, start_time: int = 0, end_time: int = 0):\n \"\"\"\n 获取聊天记录数量最多的联系人,他们聊天记录数量\n \"\"\"\n if not self.tables_exist(\"MSG\"):\n return {}\n if isinstance(start_time, str) and start_time.isdigit():\n start_time = int(start_time)\n if isinstance(end_time, str) and end_time.isdigit():\n end_time = int(end_time)\n\n # if start_time or end_time is not an integer and not a float, set both to 0\n if not (isinstance(start_time, (int, float)) and isinstance(end_time, (int, float))):\n start_time = 0\n end_time = 0\n\n sql_time = f\"AND CreateTime BETWEEN {start_time} AND {end_time} \" if start_time and end_time else \"\"\n sql = (\n \"SELECT StrTalker, COUNT(*) AS count,\"\n \"SUM(CASE WHEN IsSender = 1 THEN 1 ELSE 0 END) AS sender_count, \"\n \"SUM(CASE WHEN IsSender = 0 THEN 1 ELSE 0 END) AS receiver_count \"\n \"FROM MSG \"\n \"WHERE StrTalker NOT LIKE '%chatroom%' \"\n f\"{sql_time} \"\n \"GROUP BY StrTalker ORDER BY count DESC \"\n f\"LIMIT {top};\"\n )\n result = self.execute(sql)\n if not result:\n return {}\n # 将查询结果转换为字典\n result_dict = {row[0]: {\"total_count\": row[1], \"sender_count\": row[2], \"receiver_count\": row[3]} for row in\n result}\n return result_dict\n\n # 单条消息处理\n @db_error\n def get_msg_detail(self, row, my_talker=\"我\"):\n \"\"\"\n 获取单条消息详情,格式化输出\n \"\"\"\n (localId, TalkerId, MsgSvrID, Type, SubType, CreateTime, IsSender, Sequence, StatusEx, FlagEx, Status,\n MsgSequence, StrContent, MsgServerSeq, StrTalker, DisplayContent, Reserved0, Reserved1, Reserved3,\n Reserved4, Reserved5, Reserved6, CompressContent, BytesExtra, BytesTrans, Reserved2, _id) = row\n\n CreateTime = timestamp2str(CreateTime)\n\n type_id = (Type, SubType)\n type_name = type_converter(type_id)\n\n msg = StrContent\n src = \"\"\n extra = {}\n\n if type_id == (1, 0): # 文本\n msg = StrContent\n\n elif type_id == (3, 0): # 图片\n DictExtra = get_BytesExtra(BytesExtra)\n DictExtra_str = str(DictExtra)\n img_paths = [i for i in re.findall(r\"(FileStorage.*?)'\", DictExtra_str)]\n img_paths = sorted(img_paths, key=lambda p: \"Image\" in p, reverse=True)\n if img_paths:\n img_path = img_paths[0].replace(\"'\", \"\")\n img_path = [i for i in img_path.split(\"\\\\\") if i]\n img_path = os.path.join(*img_path)\n src = img_path\n else:\n src = \"\"\n msg = \"图片\"\n elif type_id == (34, 0): # 语音\n tmp_c = xml2dict(StrContent)\n voicelength = tmp_c.get(\"voicemsg\", {}).get(\"voicelength\", \"\")\n transtext = tmp_c.get(\"voicetrans\", {}).get(\"transtext\", \"\")\n if voicelength.isdigit():\n voicelength = int(voicelength) / 1000\n voicelength = f\"{voicelength:.2f}\"\n msg = f\"语音时长:{voicelength}秒\\n翻译结果:{transtext}\" if transtext else f\"语音时长:{voicelength}秒\"\n src = os.path.join(f\"{StrTalker}\",\n f\"{CreateTime.replace(':', '-').replace(' ', '_')}_{IsSender}_{MsgSvrID}.wav\")\n elif type_id == (43, 0): # 视频\n DictExtra = get_BytesExtra(BytesExtra)\n DictExtra = str(DictExtra)\n\n DictExtra_str = str(DictExtra)\n video_paths = [i for i in re.findall(r\"(FileStorage.*?)'\", DictExtra_str)]\n video_paths = sorted(video_paths, key=lambda p: \"mp4\" in p, reverse=True)\n if video_paths:\n video_path = video_paths[0].replace(\"'\", \"\")\n video_path = [i for i in video_path.split(\"\\\\\") if i]\n video_path = os.path.join(*video_path)\n src = video_path\n else:\n src = \"\"\n msg = \"视频\"\n\n elif type_id == (47, 0): # 动画表情\n content_tmp = xml2dict(StrContent)\n cdnurl = content_tmp.get(\"emoji\", {}).get(\"cdnurl\", \"\")\n if not cdnurl:\n DictExtra = get_BytesExtra(BytesExtra)\n cdnurl = match_BytesExtra(DictExtra)\n if cdnurl:\n msg, src = \"表情\", cdnurl\n\n elif type_id == (48, 0): # 地图信息\n content_tmp = xml2dict(StrContent)\n location = content_tmp.get(\"location\", {})\n msg = (f\"纬度:【{location.pop('x')}】 经度:【{location.pop('y')}】\\n\"\n f\"位置:{location.pop('label')} {location.pop('poiname')}\\n\"\n f\"其他信息:{json.dumps(location, ensure_ascii=False, indent=4)}\"\n )\n src = \"\"\n elif type_id == (49, 0): # 文件\n DictExtra = get_BytesExtra(BytesExtra)\n url = match_BytesExtra(DictExtra)\n src = url\n file_name = os.path.basename(url)\n msg = file_name\n\n elif type_id == (49, 5): # (分享)卡片式链接\n CompressContent = decompress_CompressContent(CompressContent)\n CompressContent_tmp = xml2dict(CompressContent)\n appmsg = CompressContent_tmp.get(\"appmsg\", {})\n title = appmsg.get(\"title\", \"\")\n des = appmsg.get(\"des\", \"\")\n url = appmsg.get(\"url\", \"\")\n msg = f'{title}\\n{des}\\n\\n\u003ca href=\"{url}\" target=\"_blank\"\u003e点击查看详情\u003c/a\u003e'\n src = url\n extra = appmsg\n\n elif type_id == (49, 19): # 合并转发的聊天记录\n CompressContent = decompress_CompressContent(CompressContent)\n content_tmp = xml2dict(CompressContent)\n title = content_tmp.get(\"appmsg\", {}).get(\"title\", \"\")\n des = content_tmp.get(\"appmsg\", {}).get(\"des\", \"\")\n recorditem = content_tmp.get(\"appmsg\", {}).get(\"recorditem\", \"\")\n recorditem = xml2dict(recorditem)\n msg = f\"{title}\\n{des}\"\n src = recorditem\n\n elif type_id == (49, 57): # 带有引用的文本消息\n CompressContent = decompress_CompressContent(CompressContent)\n content_tmp = xml2dict(CompressContent)\n appmsg = content_tmp.get(\"appmsg\", {})\n\n title = appmsg.get(\"title\", \"\")\n refermsg = appmsg.get(\"refermsg\", {})\n\n type_id = appmsg.get(\"type\", \"1\")\n\n displayname = refermsg.get(\"displayname\", \"\")\n display_content = refermsg.get(\"content\", \"\")\n display_createtime = refermsg.get(\"createtime\", \"\")\n\n display_createtime = timestamp2str(\n int(display_createtime)) if display_createtime.isdigit() else display_createtime\n\n if display_content and display_content.startswith(\"\u003c?xml\"):\n display_content = xml2dict(display_content)\n if \"img\" in display_content:\n display_content = \"图片\"\n else:\n appmsg1 = display_content.get(\"appmsg\", {})\n title1 = appmsg1.get(\"title\", \"\")\n display_content = title1 if title1 else display_content\n msg = f\"{title}\\n\\n[引用]({display_createtime}){displayname}:{display_content}\"\n src = \"\"\n\n elif type_id == (49, 2000): # 转账消息\n CompressContent = decompress_CompressContent(CompressContent)\n content_tmp = xml2dict(CompressContent)\n wcpayinfo = content_tmp.get(\"appmsg\", {}).get(\"wcpayinfo\", {})\n paysubtype = wcpayinfo.get(\"paysubtype\", \"\") # 转账类型\n feedesc = wcpayinfo.get(\"feedesc\", \"\") # 转账金额\n pay_memo = wcpayinfo.get(\"pay_memo\", \"\") # 转账备注\n begintransfertime = wcpayinfo.get(\"begintransfertime\", \"\") # 转账开始时间\n msg = (f\"{'已收款' if paysubtype == '3' else '转账'}:{feedesc}\\n\"\n f\"转账说明:{pay_memo if pay_memo else ''}\\n\"\n f\"转账时间:{timestamp2str(begintransfertime)}\\n\"\n )\n src = \"\"\n\n elif type_id[0] == 49 and type_id[1] != 0:\n DictExtra = get_BytesExtra(BytesExtra)\n url = match_BytesExtra(DictExtra)\n src = url\n msg = type_name\n\n elif type_id == (50, 0): # 语音通话\n msg = \"语音/视频通话[%s]\" % DisplayContent\n\n # elif type_id == (10000, 0):\n # msg = StrContent\n # elif type_id == (10000, 4):\n # msg = StrContent\n # elif type_id == (10000, 8000):\n # msg = StrContent\n\n talker = \"未知\"\n if IsSender == 1:\n talker = my_talker\n else:\n if StrTalker.endswith(\"@chatroom\"):\n bytes_extra = get_BytesExtra(BytesExtra)\n if bytes_extra:\n try:\n talker = bytes_extra['3'][0]['2']\n if \"publisher-id\" in talker:\n talker = \"系统\"\n except:\n pass\n else:\n talker = StrTalker\n\n row_data = {\"id\": _id, \"MsgSvrID\": str(MsgSvrID), \"type_name\": type_name, \"is_sender\": IsSender,\n \"talker\": talker, \"room_name\": StrTalker, \"msg\": msg, \"src\": src, \"extra\": extra,\n \"CreateTime\": CreateTime, }\n return row_data\n\n\n@db_error\ndef decompress_CompressContent(data):\n \"\"\"\n 解压缩Msg:CompressContent内容\n :param data: CompressContent内容 bytes\n :return:\n \"\"\"\n if data is None or not isinstance(data, bytes):\n return None\n try:\n dst = lz4.block.decompress(data, uncompressed_size=len(data) \u003c\u003c 8)\n dst = dst.replace(b'\\x00', b'') # 已经解码完成后,还含有0x00的部分,要删掉,要不后面ET识别的时候会报错\n uncompressed_data = dst.decode('utf-8', errors='ignore')\n return uncompressed_data\n except Exception as e:\n return data.decode('utf-8', errors='ignore')\n\n\n@db_error\ndef get_BytesExtra(BytesExtra):\n BytesExtra_message_type = {\n \"1\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"int\",\n \"name\": \"\"\n }\n },\n \"name\": \"1\"\n },\n \"3\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"str\",\n \"name\": \"\"\n }\n },\n \"name\": \"3\",\n \"alt_typedefs\": {\n \"1\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"message\",\n \"message_typedef\": {},\n \"name\": \"\"\n }\n },\n \"2\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"13\": {\n \"type\": \"fixed32\",\n \"name\": \"\"\n },\n \"12\": {\n \"type\": \"fixed32\",\n \"name\": \"\"\n }\n },\n \"name\": \"\"\n }\n },\n \"3\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"15\": {\n \"type\": \"fixed64\",\n \"name\": \"\"\n }\n },\n \"name\": \"\"\n }\n },\n \"4\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"15\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"14\": {\n \"type\": \"fixed32\",\n \"name\": \"\"\n }\n },\n \"name\": \"\"\n }\n },\n \"5\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"12\": {\n \"type\": \"fixed32\",\n \"name\": \"\"\n },\n \"7\": {\n \"type\": \"fixed64\",\n \"name\": \"\"\n },\n \"6\": {\n \"type\": \"fixed64\",\n \"name\": \"\"\n }\n },\n \"name\": \"\"\n }\n },\n \"6\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"7\": {\n \"type\": \"fixed64\",\n \"name\": \"\"\n },\n \"6\": {\n \"type\": \"fixed32\",\n \"name\": \"\"\n }\n },\n \"name\": \"\"\n }\n },\n \"7\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"12\": {\n \"type\": \"fixed64\",\n \"name\": \"\"\n }\n },\n \"name\": \"\"\n }\n },\n \"8\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"6\": {\n \"type\": \"fixed64\",\n \"name\": \"\"\n },\n \"12\": {\n \"type\": \"fixed32\",\n \"name\": \"\"\n }\n },\n \"name\": \"\"\n }\n },\n \"9\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"15\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"12\": {\n \"type\": \"fixed64\",\n \"name\": \"\"\n },\n \"6\": {\n \"type\": \"int\",\n \"name\": \"\"\n }\n },\n \"name\": \"\"\n }\n },\n \"10\": {\n \"1\": {\n \"type\": \"int\",\n \"name\": \"\"\n },\n \"2\": {\n \"type\": \"message\",\n \"message_typedef\": {\n \"6\": {\n \"type\": \"fixed32\",\n \"name\": \"\"\n },\n \"12\": {\n \"type\": \"fixed64\",\n \"name\": \"\"\n }\n },\n \"name\": \"\"\n }\n },\n }\n }\n }\n if BytesExtra is None or not isinstance(BytesExtra, bytes):\n return None\n try:\n deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra, BytesExtra_message_type)\n return deserialize_data\n except Exception as e:\n return None\n","is_binary":false,"path":"wxdump_linux/db/db_msg.py","ref":""}