解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: MicroMsg.py
4# Description: 负责处理联系人数据库
5# Author: xaoyaoo
6# Date: 2024/04/15
7# -------------------------------------------------------------------------------
8import logging
9
10from .db_base import DatabaseBase
11from .utils import timestamp2str, bytes2str, db_loger, db_error
12
13import blackboxprotobuf
14
15
16class MicroHandler(DatabaseBase):
17 _class_name = "MicroMsg"
18 Micro_required_tables = ["ContactLabel", "Contact", "ContactHeadImgUrl", "Session", "ChatInfo", "ChatRoom",
19 "ChatRoomInfo"]
20
21 def Micro_add_index(self):
22 """
23 添加索引, 加快查询速度
24 """
25 # 为 Session 表添加索引
26 if self.tables_exist("Session"):
27 self.execute("CREATE INDEX IF NOT EXISTS idx_Session_strUsrName_nTime ON Session(strUsrName, nTime);")
28 self.execute("CREATE INDEX IF NOT EXISTS idx_Session_nOrder ON Session(nOrder);")
29 self.execute("CREATE INDEX IF NOT EXISTS idx_Session_nTime ON Session(nTime);")
30
31 # 为 Contact 表添加索引
32
33 if self.tables_exist("Contact"):
34 self.execute("CREATE INDEX IF NOT EXISTS idx_Contact_UserName ON Contact(UserName);")
35
36 # 为 ContactHeadImgUrl 表添加索引
37 if self.tables_exist('ContactHeadImgUrl'):
38 self.execute("CREATE INDEX IF NOT EXISTS idx_ContactHeadImgUrl_usrName ON ContactHeadImgUrl(usrName);")
39
40 # 为 ChatInfo 表添加索引
41 if self.tables_exist('ChatInfo'):
42 self.execute("CREATE INDEX IF NOT EXISTS idx_ChatInfo_Username_LastReadedCreateTime "
43 "ON ChatInfo(Username, LastReadedCreateTime);")
44 self.execute(
45 "CREATE INDEX IF NOT EXISTS idx_ChatInfo_LastReadedCreateTime ON ChatInfo(LastReadedCreateTime);")
46
47 # 为 Contact 表添加复合索引
48 if self.tables_exist('Contact'):
49 self.execute("CREATE INDEX IF NOT EXISTS idx_Contact_search "
50 "ON Contact(UserName, NickName, Remark, Alias, QuanPin, PYInitial, RemarkQuanPin, RemarkPYInitial);")
51
52 # 为 ChatRoom 和 ChatRoomInfo 表添加索引
53 if self.tables_exist(['ChatRoomInfo', "ChatRoom"]):
54 self.execute("CREATE INDEX IF NOT EXISTS idx_ChatRoom_ChatRoomName ON ChatRoom(ChatRoomName);")
55 self.execute("CREATE INDEX IF NOT EXISTS idx_ChatRoomInfo_ChatRoomName ON ChatRoomInfo(ChatRoomName);")
56
57 @db_error
58 def get_labels(self, id_is_key=True):
59 """
60 读取标签列表
61 :param id_is_key: id_is_key: True: id作为key,False: name作为key
62 :return:
63 """
64 labels = {}
65 if not self.tables_exist("ContactLabel"):
66 return labels
67 sql = "SELECT LabelId, LabelName FROM ContactLabel ORDER BY LabelName ASC;"
68 result = self.execute(sql)
69 if not result:
70 return labels
71 if id_is_key:
72 labels = {row[0]: row[1] for row in result}
73 else:
74 labels = {row[1]: row[0] for row in result}
75 return labels
76
77 @db_error
78 def get_session_list(self):
79 """
80 获取会话列表
81 :return: 会话列表
82 """
83 sessions = {}
84 if not self.tables_exist(["Session", "Contact", "ContactHeadImgUrl"]):
85 return sessions
86 sql = (
87 "SELECT S.strUsrName,S.nOrder,S.nUnReadCount, S.strNickName, S.nStatus, S.nIsSend, S.strContent, "
88 "S.nMsgLocalID, S.nMsgStatus, S.nTime, S.nMsgType, S.Reserved2 AS nMsgSubType, C.UserName, C.Alias, "
89 "C.DelFlag, C.Type, C.VerifyFlag, C.Reserved1, C.Reserved2, C.Remark, C.NickName, C.LabelIDList, "
90 "C.ChatRoomType, C.ChatRoomNotify, C.Reserved5, C.Reserved6 as describe, C.ExtraBuf, H.bigHeadImgUrl "
91 "FROM (SELECT strUsrName, MAX(nTime) AS MaxnTime FROM Session GROUP BY strUsrName) AS SubQuery "
92 "JOIN Session S ON S.strUsrName = SubQuery.strUsrName AND S.nTime = SubQuery.MaxnTime "
93 "left join Contact C ON C.UserName = S.strUsrName "
94 "LEFT JOIN ContactHeadImgUrl H ON C.UserName = H.usrName "
95 "WHERE S.strUsrName!='@publicUser' "
96 "ORDER BY S.nTime DESC;"
97 )
98
99 db_loger.info(f"get_session_list sql: {sql}")
100 ret = self.execute(sql)
101 if not ret:
102 return sessions
103
104 id2label = self.get_labels()
105 for row in ret:
106 (strUsrName, nOrder, nUnReadCount, strNickName, nStatus, nIsSend, strContent,
107 nMsgLocalID, nMsgStatus, nTime, nMsgType, nMsgSubType,
108 UserName, Alias, DelFlag, Type, VerifyFlag, Reserved1, Reserved2, Remark, NickName, LabelIDList,
109 ChatRoomType, ChatRoomNotify, Reserved5, describe, ExtraBuf, bigHeadImgUrl) = row
110
111 ExtraBuf = get_ExtraBuf(ExtraBuf)
112 LabelIDList = LabelIDList.split(",") if LabelIDList else []
113 LabelIDList = [id2label.get(int(label_id), label_id) for label_id in LabelIDList if label_id]
114 nTime = timestamp2str(nTime) if nTime else None
115
116 sessions[strUsrName] = {
117 "wxid": strUsrName, "nOrder": nOrder, "nUnReadCount": nUnReadCount, "strNickName": strNickName,
118 "nStatus": nStatus, "nIsSend": nIsSend, "strContent": strContent, "nMsgLocalID": nMsgLocalID,
119 "nMsgStatus": nMsgStatus, "nTime": nTime, "nMsgType": nMsgType, "nMsgSubType": nMsgSubType,
120 "LastReadedCreateTime": nTime,
121 "nickname": NickName, "remark": Remark, "account": Alias,
122 "describe": describe, "headImgUrl": bigHeadImgUrl if bigHeadImgUrl else "",
123 "ExtraBuf": ExtraBuf, "LabelIDList": tuple(LabelIDList)
124 }
125 return sessions
126
127 @db_error
128 def get_recent_chat_wxid(self):
129 """
130 获取最近聊天的联系人
131 :return: 最近聊天的联系人
132 """
133 users = []
134 if not self.tables_exist(["ChatInfo"]):
135 return users
136 sql = (
137 "SELECT A.Username, LastReadedCreateTime, LastReadedSvrId "
138 "FROM ( SELECT Username, MAX(LastReadedCreateTime) AS MaxLastReadedCreateTime FROM ChatInfo "
139 "WHERE LastReadedCreateTime IS NOT NULL AND LastReadedCreateTime > 1007911408000 GROUP BY Username "
140 ") AS SubQuery JOIN ChatInfo A "
141 "ON A.Username = SubQuery.Username AND LastReadedCreateTime = SubQuery.MaxLastReadedCreateTime "
142 "ORDER BY A.LastReadedCreateTime DESC;"
143 )
144
145 db_loger.info(f"get_recent_chat_wxid sql: {sql}")
146 result = self.execute(sql)
147 if not result:
148 return []
149 for row in result:
150 # 获取用户名、昵称、备注和聊天记录数量
151 username, LastReadedCreateTime, LastReadedSvrId = row
152 LastReadedCreateTime = timestamp2str(LastReadedCreateTime) if LastReadedCreateTime else None
153 users.append(
154 {"wxid": username, "LastReadedCreateTime": LastReadedCreateTime, "LastReadedSvrId": LastReadedSvrId})
155 return users
156
157 @db_error
158 def get_user_list(self, word: str = None, wxids: list = None, label_ids: list = None):
159 """
160 获取联系人列表
161 [ 注意:如果修改这个函数,要同时修改dbOpenIMContact.py中的get_im_user_list函数 ]
162 :param word: 查询关键字,可以是wxid,用户名、昵称、备注、描述,允许拼音
163 :param wxids: wxid列表
164 :param label_ids: 标签id
165 :return: 联系人字典
166 """
167 if isinstance(wxids, str):
168 wxids = [wxids]
169 if isinstance(label_ids, str):
170 label_ids = [label_ids]
171
172 users = {}
173 if not self.tables_exist(["Contact", "ContactHeadImgUrl"]):
174 return users
175 sql = (
176 "SELECT A.UserName, A.Alias, A.DelFlag, A.Type, A.VerifyFlag, A.Reserved1, A.Reserved2,"
177 "A.Remark, A.NickName, A.LabelIDList, A.ChatRoomType, A.ChatRoomNotify, A.Reserved5,"
178 "A.Reserved6 as describe, A.ExtraBuf, B.bigHeadImgUrl "
179 "FROM Contact A LEFT JOIN ContactHeadImgUrl B ON A.UserName = B.usrName WHERE 1==1 ;"
180 )
181 if word:
182 sql = sql.replace(";",
183 f"AND ( A.UserName LIKE '%{word}%' "
184 f"OR A.NickName LIKE '%{word}%' "
185 f"OR A.Remark LIKE '%{word}%' "
186 f"OR A.Alias LIKE '%{word}%' "
187 f"OR LOWER(A.QuanPin) LIKE LOWER('%{word}%') "
188 f"OR LOWER(A.PYInitial) LIKE LOWER('%{word}%') "
189 f"OR LOWER(A.RemarkQuanPin) LIKE LOWER('%{word}%') "
190 f"OR LOWER(A.RemarkPYInitial) LIKE LOWER('%{word}%') "
191 f") "
192 ";")
193 if wxids:
194 sql = sql.replace(";", f"AND A.UserName IN ('" + "','".join(wxids) + "') ;")
195
196 if label_ids:
197 sql_label = [f"A.LabelIDList LIKE '%{i}%' " for i in label_ids]
198 sql_label = " OR ".join(sql_label)
199 sql = sql.replace(";", f"AND ({sql_label}) ;")
200
201 db_loger.info(f"get_user_list sql: {sql}")
202 result = self.execute(sql)
203 if not result:
204 return users
205 id2label = self.get_labels()
206 for row in result:
207 # 获取wxid,昵称,备注,描述,头像,标签
208 (UserName, Alias, DelFlag, Type, VerifyFlag, Reserved1, Reserved2, Remark, NickName, LabelIDList,
209 ChatRoomType, ChatRoomNotify, Reserved5, describe, ExtraBuf, bigHeadImgUrl) = row
210
211 ExtraBuf = get_ExtraBuf(ExtraBuf)
212 LabelIDList = LabelIDList.split(",") if LabelIDList else []
213 LabelIDList = [id2label.get(int(label_id), label_id) for label_id in LabelIDList if label_id]
214
215 # print(f"{UserName=}\n{Alias=}\n{DelFlag=}\n{Type=}\n{VerifyFlag=}\n{Reserved1=}\n{Reserved2=}\n"
216 # f"{Remark=}\n{NickName=}\n{LabelIDList=}\n{ChatRoomType=}\n{ChatRoomNotify=}\n{Reserved5=}\n"
217 # f"{describe=}\n{ExtraBuf=}\n{bigHeadImgUrl=}")
218 users[UserName] = {
219 "wxid": UserName, "nickname": NickName, "remark": Remark, "account": Alias,
220 "describe": describe, "headImgUrl": bigHeadImgUrl if bigHeadImgUrl else "",
221 "ExtraBuf": ExtraBuf, "LabelIDList": tuple(LabelIDList),
222 "extra": None}
223 extras = self.get_room_list(roomwxids=filter(lambda x: "@" in x, users.keys()))
224 for UserName in users:
225 users[UserName]["extra"] = extras.get(UserName, None)
226 return users
227
228 @db_error
229 def get_room_list(self, word=None, roomwxids: list = None):
230 """
231 获取群聊列表
232 :param word: 群聊搜索词
233 :param roomwxids: 群聊wxid列表
234 :return: 群聊字典
235 """
236 # 连接 MicroMsg.db 数据库,并执行查询
237 if isinstance(roomwxids, str):
238 roomwxids = [roomwxids]
239
240 rooms = {}
241 if not self.tables_exist(["ChatRoom", "ChatRoomInfo"]):
242 return rooms
243 sql = (
244 "SELECT A.ChatRoomName,A.UserNameList,A.DisplayNameList,A.ChatRoomFlag,A.IsShowName,"
245 "A.SelfDisplayName,A.Reserved2,A.RoomData, "
246 "B.Announcement,B.AnnouncementEditor,B.AnnouncementPublishTime "
247 "FROM ChatRoom A LEFT JOIN ChatRoomInfo B ON A.ChatRoomName==B.ChatRoomName "
248 "WHERE 1==1 ;")
249 if word:
250 sql = sql.replace(";",
251 f"AND A.ChatRoomName LIKE '%{word}%' ;")
252 if roomwxids:
253 sql = sql.replace(";", f"AND A.ChatRoomName IN ('" + "','".join(roomwxids) + "') ;")
254
255 db_loger.info(f"get_room_list sql: {sql}")
256 result = self.execute(sql)
257 if not result:
258 return rooms
259
260 for row in result:
261 # 获取用户名、昵称、备注和聊天记录数量
262 (ChatRoomName, UserNameList, DisplayNameList, ChatRoomFlag, IsShowName, SelfDisplayName,
263 Reserved2, RoomData,
264 Announcement, AnnouncementEditor, AnnouncementPublishTime) = row
265
266 UserNameList = UserNameList.split("^G")
267 DisplayNameList = DisplayNameList.split("^G")
268
269 RoomData = ChatRoom_RoomData(RoomData)
270 wxid2roomNickname = {}
271 if RoomData:
272 rd = []
273 for k, v in RoomData.items():
274 if isinstance(v, list):
275 rd += v
276 for i in rd:
277 try:
278 if isinstance(i, dict) and isinstance(i.get('1'), str) and i.get('2'):
279 wxid2roomNickname[i['1']] = i["2"]
280 except Exception as e:
281 db_loger.error(f"wxid2remark: ChatRoomName:{ChatRoomName}, {i} error:{e}", exc_info=True)
282
283 wxid2userinfo = self.get_user_list(wxids=UserNameList)
284 for i in wxid2userinfo:
285 wxid2userinfo[i]["roomNickname"] = wxid2roomNickname.get(i, "")
286
287 owner = wxid2userinfo.get(Reserved2, Reserved2)
288
289 rooms[ChatRoomName] = {
290 "wxid": ChatRoomName, "roomWxids": UserNameList, "IsShowName": IsShowName,
291 "ChatRoomFlag": ChatRoomFlag, "SelfDisplayName": SelfDisplayName,
292 "owner": owner, "wxid2userinfo": wxid2userinfo,
293 "Announcement": Announcement, "AnnouncementEditor": AnnouncementEditor,
294 "AnnouncementPublishTime": AnnouncementPublishTime}
295 return rooms
296
297
298@db_error
299def ChatRoom_RoomData(RoomData):
300 # 读取群聊数据,主要为 wxid,以及对应昵称
301 if RoomData is None or not isinstance(RoomData, bytes):
302 return None
303 data = get_BytesExtra(RoomData)
304 bytes2str(data) if data else None
305 return data
306
307
308@db_error
309def get_BytesExtra(BytesExtra):
310 if BytesExtra is None or not isinstance(BytesExtra, bytes):
311 return None
312 try:
313 deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra)
314 return deserialize_data
315 except Exception as e:
316 db_loger.warning(f"\nget_BytesExtra: {e}\n{BytesExtra}", exc_info=True)
317 return None
318
319
320@db_error
321def get_ExtraBuf(ExtraBuf: bytes):
322 """
323 读取ExtraBuf(联系人表)
324 :param ExtraBuf:
325 :return:
326 """
327 if not ExtraBuf:
328 return None
329 buf_dict = {
330 '74752C06': '性别[1男2女]', '46CF10C4': '个性签名', 'A4D9024A': '国', 'E2EAA8D1': '省', '1D025BBF': '市',
331 'F917BCC0': '公司名称', '759378AD': '手机号', '4EB96D85': '企微属性', '81AE19B4': '朋友圈背景',
332 '0E719F13': '备注图片', '945f3190': '备注图片2',
333 'DDF32683': '0', '88E28FCE': '1', '761A1D2D': '2', '0263A0CB': '3', '0451FF12': '4', '228C66A8': '5',
334 '4D6C4570': '6', '4335DFDD': '7', 'DE4CDAEB': '8', 'A72BC20A': '9', '069FED52': '10', '9B0F4299': '11',
335 '3D641E22': '12', '1249822C': '13', 'B4F73ACB': '14', '0959EB92': '15', '3CF4A315': '16',
336 'C9477AC60201E44CD0E8': '17', 'B7ACF0F5': '18', '57A7B5A8': '19', '695F3170': '20', 'FB083DD9': '21',
337 '0240E37F': '22', '315D02A3': '23', '7DEC0BC3': '24', '16791C90': '25'
338 }
339
340 rdata = {}
341 for buf_name in buf_dict:
342 rdata_name = buf_dict[buf_name]
343 buf_name = bytes.fromhex(buf_name)
344 offset = ExtraBuf.find(buf_name)
345 if offset == -1:
346 rdata[rdata_name] = ""
347 continue
348 offset += len(buf_name)
349 type_id = ExtraBuf[offset: offset + 1]
350 offset += 1
351
352 if type_id == b"\x04":
353 rdata[rdata_name] = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
354
355 elif type_id == b"\x18":
356 length = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
357 rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-16").rstrip("\x00")
358
359 elif type_id == b"\x17":
360 length = int.from_bytes(ExtraBuf[offset: offset + 4], "little")
361 rdata[rdata_name] = ExtraBuf[offset + 4: offset + 4 + length].decode("utf-8", errors="ignore").rstrip(
362 "\x00")
363 elif type_id == b"\x05":
364 rdata[rdata_name] = f"0x{ExtraBuf[offset: offset + 8].hex()}"
365 return rdata