解锁并提取Linux客户端微信数据库 (vibe coded)
at 365 lines 17 kB view raw
1# -*- coding: utf-8 -*-# 2# ------------------------------------------------------------------------------- 3# Name: MicroMsg.py 4# Description: 负责处理联系人数据库 5# Author: xaoyaoo 6# Date: 2024/04/15 7# ------------------------------------------------------------------------------- 8import logging 9 10from .db_base import DatabaseBase 11from .utils import timestamp2str, bytes2str, db_loger, db_error 12 13import blackboxprotobuf 14 15 16class MicroHandler(DatabaseBase): 17 _class_name = "MicroMsg" 18 Micro_required_tables = ["ContactLabel", "Contact", "ContactHeadImgUrl", "Session", "ChatInfo", "ChatRoom", 19 "ChatRoomInfo"] 20 21 def Micro_add_index(self): 22 """ 23 添加索引, 加快查询速度 24 """ 25 # 为 Session 表添加索引 26 if self.tables_exist("Session"): 27 self.execute("CREATE INDEX IF NOT EXISTS idx_Session_strUsrName_nTime ON Session(strUsrName, nTime);") 28 self.execute("CREATE INDEX IF NOT EXISTS idx_Session_nOrder ON Session(nOrder);") 29 self.execute("CREATE INDEX IF NOT EXISTS idx_Session_nTime ON Session(nTime);") 30 31 # 为 Contact 表添加索引 32 33 if self.tables_exist("Contact"): 34 self.execute("CREATE INDEX IF NOT EXISTS idx_Contact_UserName ON Contact(UserName);") 35 36 # 为 ContactHeadImgUrl 表添加索引 37 if self.tables_exist('ContactHeadImgUrl'): 38 self.execute("CREATE INDEX IF NOT EXISTS idx_ContactHeadImgUrl_usrName ON ContactHeadImgUrl(usrName);") 39 40 # 为 ChatInfo 表添加索引 41 if self.tables_exist('ChatInfo'): 42 self.execute("CREATE INDEX IF NOT EXISTS idx_ChatInfo_Username_LastReadedCreateTime " 43 "ON ChatInfo(Username, LastReadedCreateTime);") 44 self.execute( 45 "CREATE INDEX IF NOT EXISTS idx_ChatInfo_LastReadedCreateTime ON ChatInfo(LastReadedCreateTime);") 46 47 # 为 Contact 表添加复合索引 48 if self.tables_exist('Contact'): 49 self.execute("CREATE INDEX IF NOT EXISTS idx_Contact_search " 50 "ON Contact(UserName, NickName, Remark, Alias, QuanPin, PYInitial, RemarkQuanPin, RemarkPYInitial);") 51 52 # 为 ChatRoom 和 ChatRoomInfo 表添加索引 53 if self.tables_exist(['ChatRoomInfo', "ChatRoom"]): 54 self.execute("CREATE INDEX IF NOT EXISTS idx_ChatRoom_ChatRoomName ON ChatRoom(ChatRoomName);") 55 self.execute("CREATE INDEX IF NOT EXISTS idx_ChatRoomInfo_ChatRoomName ON ChatRoomInfo(ChatRoomName);") 56 57 @db_error 58 def get_labels(self, id_is_key=True): 59 """ 60 读取标签列表 61 :param id_is_key: id_is_key: True: id作为key,False: name作为key 62 :return: 63 """ 64 labels = {} 65 if not self.tables_exist("ContactLabel"): 66 return labels 67 sql = "SELECT LabelId, LabelName FROM ContactLabel ORDER BY LabelName ASC;" 68 result = self.execute(sql) 69 if not result: 70 return labels 71 if id_is_key: 72 labels = {row[0]: row[1] for row in result} 73 else: 74 labels = {row[1]: row[0] for row in result} 75 return labels 76 77 @db_error 78 def get_session_list(self): 79 """ 80 获取会话列表 81 :return: 会话列表 82 """ 83 sessions = {} 84 if not self.tables_exist(["Session", "Contact", "ContactHeadImgUrl"]): 85 return sessions 86 sql = ( 87 "SELECT S.strUsrName,S.nOrder,S.nUnReadCount, S.strNickName, S.nStatus, S.nIsSend, S.strContent, " 88 "S.nMsgLocalID, S.nMsgStatus, S.nTime, S.nMsgType, S.Reserved2 AS nMsgSubType, C.UserName, C.Alias, " 89 "C.DelFlag, C.Type, C.VerifyFlag, C.Reserved1, C.Reserved2, C.Remark, C.NickName, C.LabelIDList, " 90 "C.ChatRoomType, C.ChatRoomNotify, C.Reserved5, C.Reserved6 as describe, C.ExtraBuf, H.bigHeadImgUrl " 91 "FROM (SELECT strUsrName, MAX(nTime) AS MaxnTime FROM Session GROUP BY strUsrName) AS SubQuery " 92 "JOIN Session S ON S.strUsrName = SubQuery.strUsrName AND S.nTime = SubQuery.MaxnTime " 93 "left join Contact C ON C.UserName = S.strUsrName " 94 "LEFT JOIN ContactHeadImgUrl H ON C.UserName = H.usrName " 95 "WHERE S.strUsrName!='@publicUser' " 96 "ORDER BY S.nTime DESC;" 97 ) 98 99 db_loger.info(f"get_session_list sql: {sql}") 100 ret = self.execute(sql) 101 if not ret: 102 return sessions 103 104 id2label = self.get_labels() 105 for row in ret: 106 (strUsrName, nOrder, nUnReadCount, strNickName, nStatus, nIsSend, strContent, 107 nMsgLocalID, nMsgStatus, nTime, nMsgType, nMsgSubType, 108 UserName, Alias, DelFlag, Type, VerifyFlag, Reserved1, Reserved2, Remark, NickName, LabelIDList, 109 ChatRoomType, ChatRoomNotify, Reserved5, describe, ExtraBuf, bigHeadImgUrl) = row 110 111 ExtraBuf = get_ExtraBuf(ExtraBuf) 112 LabelIDList = LabelIDList.split(",") if LabelIDList else [] 113 LabelIDList = [id2label.get(int(label_id), label_id) for label_id in LabelIDList if label_id] 114 nTime = timestamp2str(nTime) if nTime else None 115 116 sessions[strUsrName] = { 117 "wxid": strUsrName, "nOrder": nOrder, "nUnReadCount": nUnReadCount, "strNickName": strNickName, 118 "nStatus": nStatus, "nIsSend": nIsSend, "strContent": strContent, "nMsgLocalID": nMsgLocalID, 119 "nMsgStatus": nMsgStatus, "nTime": nTime, "nMsgType": nMsgType, "nMsgSubType": nMsgSubType, 120 "LastReadedCreateTime": nTime, 121 "nickname": NickName, "remark": Remark, "account": Alias, 122 "describe": describe, "headImgUrl": bigHeadImgUrl if bigHeadImgUrl else "", 123 "ExtraBuf": ExtraBuf, "LabelIDList": tuple(LabelIDList) 124 } 125 return sessions 126 127 @db_error 128 def get_recent_chat_wxid(self): 129 """ 130 获取最近聊天的联系人 131 :return: 最近聊天的联系人 132 """ 133 users = [] 134 if not self.tables_exist(["ChatInfo"]): 135 return users 136 sql = ( 137 "SELECT A.Username, LastReadedCreateTime, LastReadedSvrId " 138 "FROM ( SELECT Username, MAX(LastReadedCreateTime) AS MaxLastReadedCreateTime FROM ChatInfo " 139 "WHERE LastReadedCreateTime IS NOT NULL AND LastReadedCreateTime > 1007911408000 GROUP BY Username " 140 ") AS SubQuery JOIN ChatInfo A " 141 "ON A.Username = SubQuery.Username AND LastReadedCreateTime = SubQuery.MaxLastReadedCreateTime " 142 "ORDER BY A.LastReadedCreateTime DESC;" 143 ) 144 145 db_loger.info(f"get_recent_chat_wxid sql: {sql}") 146 result = self.execute(sql) 147 if not result: 148 return [] 149 for row in result: 150 # 获取用户名、昵称、备注和聊天记录数量 151 username, LastReadedCreateTime, LastReadedSvrId = row 152 LastReadedCreateTime = timestamp2str(LastReadedCreateTime) if LastReadedCreateTime else None 153 users.append( 154 {"wxid": username, "LastReadedCreateTime": LastReadedCreateTime, "LastReadedSvrId": LastReadedSvrId}) 155 return users 156 157 @db_error 158 def get_user_list(self, word: str = None, wxids: list = None, label_ids: list = None): 159 """ 160 获取联系人列表 161 [ 注意:如果修改这个函数,要同时修改dbOpenIMContact.py中的get_im_user_list函数 ] 162 :param word: 查询关键字,可以是wxid,用户名、昵称、备注、描述,允许拼音 163 :param wxids: wxid列表 164 :param label_ids: 标签id 165 :return: 联系人字典 166 """ 167 if isinstance(wxids, str): 168 wxids = [wxids] 169 if isinstance(label_ids, str): 170 label_ids = [label_ids] 171 172 users = {} 173 if not self.tables_exist(["Contact", "ContactHeadImgUrl"]): 174 return users 175 sql = ( 176 "SELECT A.UserName, A.Alias, A.DelFlag, A.Type, A.VerifyFlag, A.Reserved1, A.Reserved2," 177 "A.Remark, A.NickName, A.LabelIDList, A.ChatRoomType, A.ChatRoomNotify, A.Reserved5," 178 "A.Reserved6 as describe, A.ExtraBuf, B.bigHeadImgUrl " 179 "FROM Contact A LEFT JOIN ContactHeadImgUrl B ON A.UserName = B.usrName WHERE 1==1 ;" 180 ) 181 if word: 182 sql = sql.replace(";", 183 f"AND ( A.UserName LIKE '%{word}%' " 184 f"OR A.NickName LIKE '%{word}%' " 185 f"OR A.Remark LIKE '%{word}%' " 186 f"OR A.Alias LIKE '%{word}%' " 187 f"OR LOWER(A.QuanPin) LIKE LOWER('%{word}%') " 188 f"OR LOWER(A.PYInitial) LIKE LOWER('%{word}%') " 189 f"OR LOWER(A.RemarkQuanPin) LIKE LOWER('%{word}%') " 190 f"OR LOWER(A.RemarkPYInitial) LIKE LOWER('%{word}%') " 191 f") " 192 ";") 193 if wxids: 194 sql = sql.replace(";", f"AND A.UserName IN ('" + "','".join(wxids) + "') ;") 195 196 if label_ids: 197 sql_label = [f"A.LabelIDList LIKE '%{i}%' " for i in label_ids] 198 sql_label = " OR ".join(sql_label) 199 sql = sql.replace(";", f"AND ({sql_label}) ;") 200 201 db_loger.info(f"get_user_list sql: {sql}") 202 result = self.execute(sql) 203 if not result: 204 return users 205 id2label = self.get_labels() 206 for row in result: 207 # 获取wxid,昵称,备注,描述,头像,标签 208 (UserName, Alias, DelFlag, Type, VerifyFlag, Reserved1, Reserved2, Remark, NickName, LabelIDList, 209 ChatRoomType, ChatRoomNotify, Reserved5, describe, ExtraBuf, bigHeadImgUrl) = row 210 211 ExtraBuf = get_ExtraBuf(ExtraBuf) 212 LabelIDList = LabelIDList.split(",") if LabelIDList else [] 213 LabelIDList = [id2label.get(int(label_id), label_id) for label_id in LabelIDList if label_id] 214 215 # print(f"{UserName=}\n{Alias=}\n{DelFlag=}\n{Type=}\n{VerifyFlag=}\n{Reserved1=}\n{Reserved2=}\n" 216 # f"{Remark=}\n{NickName=}\n{LabelIDList=}\n{ChatRoomType=}\n{ChatRoomNotify=}\n{Reserved5=}\n" 217 # f"{describe=}\n{ExtraBuf=}\n{bigHeadImgUrl=}") 218 users[UserName] = { 219 "wxid": UserName, "nickname": NickName, "remark": Remark, "account": Alias, 220 "describe": describe, "headImgUrl": bigHeadImgUrl if bigHeadImgUrl else "", 221 "ExtraBuf": ExtraBuf, "LabelIDList": tuple(LabelIDList), 222 "extra": None} 223 extras = self.get_room_list(roomwxids=filter(lambda x: "@" in x, users.keys())) 224 for UserName in users: 225 users[UserName]["extra"] = extras.get(UserName, None) 226 return users 227 228 @db_error 229 def get_room_list(self, word=None, roomwxids: list = None): 230 """ 231 获取群聊列表 232 :param word: 群聊搜索词 233 :param roomwxids: 群聊wxid列表 234 :return: 群聊字典 235 """ 236 # 连接 MicroMsg.db 数据库,并执行查询 237 if isinstance(roomwxids, str): 238 roomwxids = [roomwxids] 239 240 rooms = {} 241 if not self.tables_exist(["ChatRoom", "ChatRoomInfo"]): 242 return rooms 243 sql = ( 244 "SELECT A.ChatRoomName,A.UserNameList,A.DisplayNameList,A.ChatRoomFlag,A.IsShowName," 245 "A.SelfDisplayName,A.Reserved2,A.RoomData, " 246 "B.Announcement,B.AnnouncementEditor,B.AnnouncementPublishTime " 247 "FROM ChatRoom A LEFT JOIN ChatRoomInfo B ON A.ChatRoomName==B.ChatRoomName " 248 "WHERE 1==1 ;") 249 if word: 250 sql = sql.replace(";", 251 f"AND A.ChatRoomName LIKE '%{word}%' ;") 252 if roomwxids: 253 sql = sql.replace(";", f"AND A.ChatRoomName IN ('" + "','".join(roomwxids) + "') ;") 254 255 db_loger.info(f"get_room_list sql: {sql}") 256 result = self.execute(sql) 257 if not result: 258 return rooms 259 260 for row in result: 261 # 获取用户名、昵称、备注和聊天记录数量 262 (ChatRoomName, UserNameList, DisplayNameList, ChatRoomFlag, IsShowName, SelfDisplayName, 263 Reserved2, RoomData, 264 Announcement, AnnouncementEditor, AnnouncementPublishTime) = row 265 266 UserNameList = UserNameList.split("^G") 267 DisplayNameList = DisplayNameList.split("^G") 268 269 RoomData = ChatRoom_RoomData(RoomData) 270 wxid2roomNickname = {} 271 if RoomData: 272 rd = [] 273 for k, v in RoomData.items(): 274 if isinstance(v, list): 275 rd += v 276 for i in rd: 277 try: 278 if isinstance(i, dict) and isinstance(i.get('1'), str) and i.get('2'): 279 wxid2roomNickname[i['1']] = i["2"] 280 except Exception as e: 281 db_loger.error(f"wxid2remark: ChatRoomName:{ChatRoomName}, {i} error:{e}", exc_info=True) 282 283 wxid2userinfo = self.get_user_list(wxids=UserNameList) 284 for i in wxid2userinfo: 285 wxid2userinfo[i]["roomNickname"] = wxid2roomNickname.get(i, "") 286 287 owner = wxid2userinfo.get(Reserved2, Reserved2) 288 289 rooms[ChatRoomName] = { 290 "wxid": ChatRoomName, "roomWxids": UserNameList, "IsShowName": IsShowName, 291 "ChatRoomFlag": ChatRoomFlag, "SelfDisplayName": SelfDisplayName, 292 "owner": owner, "wxid2userinfo": wxid2userinfo, 293 "Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor, 294 "AnnouncementPublishTime": AnnouncementPublishTime} 295 return rooms 296 297 298@db_error 299def ChatRoom_RoomData(RoomData): 300 # 读取群聊数据,主要为 wxid,以及对应昵称 301 if RoomData is None or not isinstance(RoomData, bytes): 302 return None 303 data = get_BytesExtra(RoomData) 304 bytes2str(data) if data else None 305 return data 306 307 308@db_error 309def get_BytesExtra(BytesExtra): 310 if BytesExtra is None or not isinstance(BytesExtra, bytes): 311 return None 312 try: 313 deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra) 314 return deserialize_data 315 except Exception as e: 316 db_loger.warning(f"\nget_BytesExtra: {e}\n{BytesExtra}", exc_info=True) 317 return None 318 319 320@db_error 321def get_ExtraBuf(ExtraBuf: bytes): 322 """ 323 读取ExtraBuf(联系人表) 324 :param ExtraBuf: 325 :return: 326 """ 327 if not ExtraBuf: 328 return None 329 buf_dict = { 330 '74752C06': '性别[1男2女]', '46CF10C4': '个性签名', 'A4D9024A': '', 'E2EAA8D1': '', '1D025BBF': '', 331 'F917BCC0': '公司名称', '759378AD': '手机号', '4EB96D85': '企微属性', '81AE19B4': '朋友圈背景', 332 '0E719F13': '备注图片', '945f3190': '备注图片2', 333 'DDF32683': '0', '88E28FCE': '1', '761A1D2D': '2', '0263A0CB': '3', '0451FF12': '4', '228C66A8': '5', 334 '4D6C4570': '6', '4335DFDD': '7', 'DE4CDAEB': '8', 'A72BC20A': '9', '069FED52': '10', '9B0F4299': '11', 335 '3D641E22': '12', '1249822C': '13', 'B4F73ACB': '14', '0959EB92': '15', '3CF4A315': '16', 336 'C9477AC60201E44CD0E8': '17', 'B7ACF0F5': '18', '57A7B5A8': '19', '695F3170': '20', 'FB083DD9': '21', 337 '0240E37F': '22', '315D02A3': '23', '7DEC0BC3': '24', '16791C90': '25' 338 } 339 340 rdata = {} 341 for buf_name in buf_dict: 342 rdata_name = buf_dict[buf_name] 343 buf_name = bytes.fromhex(buf_name) 344 offset = ExtraBuf.find(buf_name) 345 if offset == -1: 346 rdata[rdata_name] = "" 347 continue 348 offset += len(buf_name) 349 type_id = ExtraBuf[offset: offset + 1] 350 offset += 1 351 352 if type_id == b"\x04": 353 rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], "little") 354 355 elif type_id == b"\x18": 356 length = int.from_bytes(ExtraBuf[offset: offset + 4], "little") 357 rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-16").rstrip("\x00") 358 359 elif type_id == b"\x17": 360 length = int.from_bytes(ExtraBuf[offset: offset + 4], "little") 361 rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-8", errors="ignore").rstrip( 362 "\x00") 363 elif type_id == b"\x05": 364 rdata[rdata_name] = f"0x{ExtraBuf[offset: offset + 8].hex()}" 365 return rdata