{"contents":"# -*- coding: utf-8 -*-#\n# -------------------------------------------------------------------------------\n# Name: MicroMsg.py\n# Description: 负责处理联系人数据库\n# Author: xaoyaoo\n# Date: 2024/04/15\n# -------------------------------------------------------------------------------\nimport logging\n\nfrom .db_base import DatabaseBase\nfrom .utils import timestamp2str, bytes2str, db_loger, db_error\n\nimport blackboxprotobuf\n\n\nclass MicroHandler(DatabaseBase):\n _class_name = \"MicroMsg\"\n Micro_required_tables = [\"ContactLabel\", \"Contact\", \"ContactHeadImgUrl\", \"Session\", \"ChatInfo\", \"ChatRoom\",\n \"ChatRoomInfo\"]\n\n def Micro_add_index(self):\n \"\"\"\n 添加索引, 加快查询速度\n \"\"\"\n # 为 Session 表添加索引\n if self.tables_exist(\"Session\"):\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_Session_strUsrName_nTime ON Session(strUsrName, nTime);\")\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_Session_nOrder ON Session(nOrder);\")\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_Session_nTime ON Session(nTime);\")\n\n # 为 Contact 表添加索引\n\n if self.tables_exist(\"Contact\"):\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_Contact_UserName ON Contact(UserName);\")\n\n # 为 ContactHeadImgUrl 表添加索引\n if self.tables_exist('ContactHeadImgUrl'):\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_ContactHeadImgUrl_usrName ON ContactHeadImgUrl(usrName);\")\n\n # 为 ChatInfo 表添加索引\n if self.tables_exist('ChatInfo'):\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_ChatInfo_Username_LastReadedCreateTime \"\n \"ON ChatInfo(Username, LastReadedCreateTime);\")\n self.execute(\n \"CREATE INDEX IF NOT EXISTS idx_ChatInfo_LastReadedCreateTime ON ChatInfo(LastReadedCreateTime);\")\n\n # 为 Contact 表添加复合索引\n if self.tables_exist('Contact'):\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_Contact_search \"\n \"ON Contact(UserName, NickName, Remark, Alias, QuanPin, PYInitial, RemarkQuanPin, RemarkPYInitial);\")\n\n # 为 ChatRoom 和 ChatRoomInfo 表添加索引\n if self.tables_exist(['ChatRoomInfo', \"ChatRoom\"]):\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_ChatRoom_ChatRoomName ON ChatRoom(ChatRoomName);\")\n self.execute(\"CREATE INDEX IF NOT EXISTS idx_ChatRoomInfo_ChatRoomName ON ChatRoomInfo(ChatRoomName);\")\n\n @db_error\n def get_labels(self, id_is_key=True):\n \"\"\"\n 读取标签列表\n :param id_is_key: id_is_key: True: id作为key,False: name作为key\n :return:\n \"\"\"\n labels = {}\n if not self.tables_exist(\"ContactLabel\"):\n return labels\n sql = \"SELECT LabelId, LabelName FROM ContactLabel ORDER BY LabelName ASC;\"\n result = self.execute(sql)\n if not result:\n return labels\n if id_is_key:\n labels = {row[0]: row[1] for row in result}\n else:\n labels = {row[1]: row[0] for row in result}\n return labels\n\n @db_error\n def get_session_list(self):\n \"\"\"\n 获取会话列表\n :return: 会话列表\n \"\"\"\n sessions = {}\n if not self.tables_exist([\"Session\", \"Contact\", \"ContactHeadImgUrl\"]):\n return sessions\n sql = (\n \"SELECT S.strUsrName,S.nOrder,S.nUnReadCount, S.strNickName, S.nStatus, S.nIsSend, S.strContent, \"\n \"S.nMsgLocalID, S.nMsgStatus, S.nTime, S.nMsgType, S.Reserved2 AS nMsgSubType, C.UserName, C.Alias, \"\n \"C.DelFlag, C.Type, C.VerifyFlag, C.Reserved1, C.Reserved2, C.Remark, C.NickName, C.LabelIDList, \"\n \"C.ChatRoomType, C.ChatRoomNotify, C.Reserved5, C.Reserved6 as describe, C.ExtraBuf, H.bigHeadImgUrl \"\n \"FROM (SELECT strUsrName, MAX(nTime) AS MaxnTime FROM Session GROUP BY strUsrName) AS SubQuery \"\n \"JOIN Session S ON S.strUsrName = SubQuery.strUsrName AND S.nTime = SubQuery.MaxnTime \"\n \"left join Contact C ON C.UserName = S.strUsrName \"\n \"LEFT JOIN ContactHeadImgUrl H ON C.UserName = H.usrName \"\n \"WHERE S.strUsrName!='@publicUser' \"\n \"ORDER BY S.nTime DESC;\"\n )\n\n db_loger.info(f\"get_session_list sql: {sql}\")\n ret = self.execute(sql)\n if not ret:\n return sessions\n\n id2label = self.get_labels()\n for row in ret:\n (strUsrName, nOrder, nUnReadCount, strNickName, nStatus, nIsSend, strContent,\n nMsgLocalID, nMsgStatus, nTime, nMsgType, nMsgSubType,\n UserName, Alias, DelFlag, Type, VerifyFlag, Reserved1, Reserved2, Remark, NickName, LabelIDList,\n ChatRoomType, ChatRoomNotify, Reserved5, describe, ExtraBuf, bigHeadImgUrl) = row\n\n ExtraBuf = get_ExtraBuf(ExtraBuf)\n LabelIDList = LabelIDList.split(\",\") if LabelIDList else []\n LabelIDList = [id2label.get(int(label_id), label_id) for label_id in LabelIDList if label_id]\n nTime = timestamp2str(nTime) if nTime else None\n\n sessions[strUsrName] = {\n \"wxid\": strUsrName, \"nOrder\": nOrder, \"nUnReadCount\": nUnReadCount, \"strNickName\": strNickName,\n \"nStatus\": nStatus, \"nIsSend\": nIsSend, \"strContent\": strContent, \"nMsgLocalID\": nMsgLocalID,\n \"nMsgStatus\": nMsgStatus, \"nTime\": nTime, \"nMsgType\": nMsgType, \"nMsgSubType\": nMsgSubType,\n \"LastReadedCreateTime\": nTime,\n \"nickname\": NickName, \"remark\": Remark, \"account\": Alias,\n \"describe\": describe, \"headImgUrl\": bigHeadImgUrl if bigHeadImgUrl else \"\",\n \"ExtraBuf\": ExtraBuf, \"LabelIDList\": tuple(LabelIDList)\n }\n return sessions\n\n @db_error\n def get_recent_chat_wxid(self):\n \"\"\"\n 获取最近聊天的联系人\n :return: 最近聊天的联系人\n \"\"\"\n users = []\n if not self.tables_exist([\"ChatInfo\"]):\n return users\n sql = (\n \"SELECT A.Username, LastReadedCreateTime, LastReadedSvrId \"\n \"FROM ( SELECT Username, MAX(LastReadedCreateTime) AS MaxLastReadedCreateTime FROM ChatInfo \"\n \"WHERE LastReadedCreateTime IS NOT NULL AND LastReadedCreateTime \u003e 1007911408000 GROUP BY Username \"\n \") AS SubQuery JOIN ChatInfo A \"\n \"ON A.Username = SubQuery.Username AND LastReadedCreateTime = SubQuery.MaxLastReadedCreateTime \"\n \"ORDER BY A.LastReadedCreateTime DESC;\"\n )\n\n db_loger.info(f\"get_recent_chat_wxid sql: {sql}\")\n result = self.execute(sql)\n if not result:\n return []\n for row in result:\n # 获取用户名、昵称、备注和聊天记录数量\n username, LastReadedCreateTime, LastReadedSvrId = row\n LastReadedCreateTime = timestamp2str(LastReadedCreateTime) if LastReadedCreateTime else None\n users.append(\n {\"wxid\": username, \"LastReadedCreateTime\": LastReadedCreateTime, \"LastReadedSvrId\": LastReadedSvrId})\n return users\n\n @db_error\n def get_user_list(self, word: str = None, wxids: list = None, label_ids: list = None):\n \"\"\"\n 获取联系人列表\n [ 注意:如果修改这个函数,要同时修改dbOpenIMContact.py中的get_im_user_list函数 ]\n :param word: 查询关键字,可以是wxid,用户名、昵称、备注、描述,允许拼音\n :param wxids: wxid列表\n :param label_ids: 标签id\n :return: 联系人字典\n \"\"\"\n if isinstance(wxids, str):\n wxids = [wxids]\n if isinstance(label_ids, str):\n label_ids = [label_ids]\n\n users = {}\n if not self.tables_exist([\"Contact\", \"ContactHeadImgUrl\"]):\n return users\n sql = (\n \"SELECT A.UserName, A.Alias, A.DelFlag, A.Type, A.VerifyFlag, A.Reserved1, A.Reserved2,\"\n \"A.Remark, A.NickName, A.LabelIDList, A.ChatRoomType, A.ChatRoomNotify, A.Reserved5,\"\n \"A.Reserved6 as describe, A.ExtraBuf, B.bigHeadImgUrl \"\n \"FROM Contact A LEFT JOIN ContactHeadImgUrl B ON A.UserName = B.usrName WHERE 1==1 ;\"\n )\n if word:\n sql = sql.replace(\";\",\n f\"AND ( A.UserName LIKE '%{word}%' \"\n f\"OR A.NickName LIKE '%{word}%' \"\n f\"OR A.Remark LIKE '%{word}%' \"\n f\"OR A.Alias LIKE '%{word}%' \"\n f\"OR LOWER(A.QuanPin) LIKE LOWER('%{word}%') \"\n f\"OR LOWER(A.PYInitial) LIKE LOWER('%{word}%') \"\n f\"OR LOWER(A.RemarkQuanPin) LIKE LOWER('%{word}%') \"\n f\"OR LOWER(A.RemarkPYInitial) LIKE LOWER('%{word}%') \"\n f\") \"\n \";\")\n if wxids:\n sql = sql.replace(\";\", f\"AND A.UserName IN ('\" + \"','\".join(wxids) + \"') ;\")\n\n if label_ids:\n sql_label = [f\"A.LabelIDList LIKE '%{i}%' \" for i in label_ids]\n sql_label = \" OR \".join(sql_label)\n sql = sql.replace(\";\", f\"AND ({sql_label}) ;\")\n\n db_loger.info(f\"get_user_list sql: {sql}\")\n result = self.execute(sql)\n if not result:\n return users\n id2label = self.get_labels()\n for row in result:\n # 获取wxid,昵称,备注,描述,头像,标签\n (UserName, Alias, DelFlag, Type, VerifyFlag, Reserved1, Reserved2, Remark, NickName, LabelIDList,\n ChatRoomType, ChatRoomNotify, Reserved5, describe, ExtraBuf, bigHeadImgUrl) = row\n\n ExtraBuf = get_ExtraBuf(ExtraBuf)\n LabelIDList = LabelIDList.split(\",\") if LabelIDList else []\n LabelIDList = [id2label.get(int(label_id), label_id) for label_id in LabelIDList if label_id]\n\n # print(f\"{UserName=}\\n{Alias=}\\n{DelFlag=}\\n{Type=}\\n{VerifyFlag=}\\n{Reserved1=}\\n{Reserved2=}\\n\"\n # f\"{Remark=}\\n{NickName=}\\n{LabelIDList=}\\n{ChatRoomType=}\\n{ChatRoomNotify=}\\n{Reserved5=}\\n\"\n # f\"{describe=}\\n{ExtraBuf=}\\n{bigHeadImgUrl=}\")\n users[UserName] = {\n \"wxid\": UserName, \"nickname\": NickName, \"remark\": Remark, \"account\": Alias,\n \"describe\": describe, \"headImgUrl\": bigHeadImgUrl if bigHeadImgUrl else \"\",\n \"ExtraBuf\": ExtraBuf, \"LabelIDList\": tuple(LabelIDList),\n \"extra\": None}\n extras = self.get_room_list(roomwxids=filter(lambda x: \"@\" in x, users.keys()))\n for UserName in users:\n users[UserName][\"extra\"] = extras.get(UserName, None)\n return users\n\n @db_error\n def get_room_list(self, word=None, roomwxids: list = None):\n \"\"\"\n 获取群聊列表\n :param word: 群聊搜索词\n :param roomwxids: 群聊wxid列表\n :return: 群聊字典\n \"\"\"\n # 连接 MicroMsg.db 数据库,并执行查询\n if isinstance(roomwxids, str):\n roomwxids = [roomwxids]\n\n rooms = {}\n if not self.tables_exist([\"ChatRoom\", \"ChatRoomInfo\"]):\n return rooms\n sql = (\n \"SELECT A.ChatRoomName,A.UserNameList,A.DisplayNameList,A.ChatRoomFlag,A.IsShowName,\"\n \"A.SelfDisplayName,A.Reserved2,A.RoomData, \"\n \"B.Announcement,B.AnnouncementEditor,B.AnnouncementPublishTime \"\n \"FROM ChatRoom A LEFT JOIN ChatRoomInfo B ON A.ChatRoomName==B.ChatRoomName \"\n \"WHERE 1==1 ;\")\n if word:\n sql = sql.replace(\";\",\n f\"AND A.ChatRoomName LIKE '%{word}%' ;\")\n if roomwxids:\n sql = sql.replace(\";\", f\"AND A.ChatRoomName IN ('\" + \"','\".join(roomwxids) + \"') ;\")\n\n db_loger.info(f\"get_room_list sql: {sql}\")\n result = self.execute(sql)\n if not result:\n return rooms\n\n for row in result:\n # 获取用户名、昵称、备注和聊天记录数量\n (ChatRoomName, UserNameList, DisplayNameList, ChatRoomFlag, IsShowName, SelfDisplayName,\n Reserved2, RoomData,\n Announcement, AnnouncementEditor, AnnouncementPublishTime) = row\n\n UserNameList = UserNameList.split(\"^G\")\n DisplayNameList = DisplayNameList.split(\"^G\")\n\n RoomData = ChatRoom_RoomData(RoomData)\n wxid2roomNickname = {}\n if RoomData:\n rd = []\n for k, v in RoomData.items():\n if isinstance(v, list):\n rd += v\n for i in rd:\n try:\n if isinstance(i, dict) and isinstance(i.get('1'), str) and i.get('2'):\n wxid2roomNickname[i['1']] = i[\"2\"]\n except Exception as e:\n db_loger.error(f\"wxid2remark: ChatRoomName:{ChatRoomName}, {i} error:{e}\", exc_info=True)\n\n wxid2userinfo = self.get_user_list(wxids=UserNameList)\n for i in wxid2userinfo:\n wxid2userinfo[i][\"roomNickname\"] = wxid2roomNickname.get(i, \"\")\n\n owner = wxid2userinfo.get(Reserved2, Reserved2)\n\n rooms[ChatRoomName] = {\n \"wxid\": ChatRoomName, \"roomWxids\": UserNameList, \"IsShowName\": IsShowName,\n \"ChatRoomFlag\": ChatRoomFlag, \"SelfDisplayName\": SelfDisplayName,\n \"owner\": owner, \"wxid2userinfo\": wxid2userinfo,\n \"Announcement\": Announcement, \"AnnouncementEditor\": AnnouncementEditor,\n \"AnnouncementPublishTime\": AnnouncementPublishTime}\n return rooms\n\n\n@db_error\ndef ChatRoom_RoomData(RoomData):\n # 读取群聊数据,主要为 wxid,以及对应昵称\n if RoomData is None or not isinstance(RoomData, bytes):\n return None\n data = get_BytesExtra(RoomData)\n bytes2str(data) if data else None\n return data\n\n\n@db_error\ndef get_BytesExtra(BytesExtra):\n if BytesExtra is None or not isinstance(BytesExtra, bytes):\n return None\n try:\n deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra)\n return deserialize_data\n except Exception as e:\n db_loger.warning(f\"\\nget_BytesExtra: {e}\\n{BytesExtra}\", exc_info=True)\n return None\n\n\n@db_error\ndef get_ExtraBuf(ExtraBuf: bytes):\n \"\"\"\n 读取ExtraBuf(联系人表)\n :param ExtraBuf:\n :return:\n \"\"\"\n if not ExtraBuf:\n return None\n buf_dict = {\n '74752C06': '性别[1男2女]', '46CF10C4': '个性签名', 'A4D9024A': '国', 'E2EAA8D1': '省', '1D025BBF': '市',\n 'F917BCC0': '公司名称', '759378AD': '手机号', '4EB96D85': '企微属性', '81AE19B4': '朋友圈背景',\n '0E719F13': '备注图片', '945f3190': '备注图片2',\n 'DDF32683': '0', '88E28FCE': '1', '761A1D2D': '2', '0263A0CB': '3', '0451FF12': '4', '228C66A8': '5',\n '4D6C4570': '6', '4335DFDD': '7', 'DE4CDAEB': '8', 'A72BC20A': '9', '069FED52': '10', '9B0F4299': '11',\n '3D641E22': '12', '1249822C': '13', 'B4F73ACB': '14', '0959EB92': '15', '3CF4A315': '16',\n 'C9477AC60201E44CD0E8': '17', 'B7ACF0F5': '18', '57A7B5A8': '19', '695F3170': '20', 'FB083DD9': '21',\n '0240E37F': '22', '315D02A3': '23', '7DEC0BC3': '24', '16791C90': '25'\n }\n\n rdata = {}\n for buf_name in buf_dict:\n rdata_name = buf_dict[buf_name]\n buf_name = bytes.fromhex(buf_name)\n offset = ExtraBuf.find(buf_name)\n if offset == -1:\n rdata[rdata_name] = \"\"\n continue\n offset += len(buf_name)\n type_id = ExtraBuf[offset: offset + 1]\n offset += 1\n\n if type_id == b\"\\x04\":\n rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], \"little\")\n\n elif type_id == b\"\\x18\":\n length = int.from_bytes(ExtraBuf[offset: offset + 4], \"little\")\n rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode(\"utf-16\").rstrip(\"\\x00\")\n\n elif type_id == b\"\\x17\":\n length = int.from_bytes(ExtraBuf[offset: offset + 4], \"little\")\n rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode(\"utf-8\", errors=\"ignore\").rstrip(\n \"\\x00\")\n elif type_id == b\"\\x05\":\n rdata[rdata_name] = f\"0x{ExtraBuf[offset: offset + 8].hex()}\"\n return rdata\n","is_binary":false,"path":"wxdump_linux/db/db_micro.py","ref":""}