解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: MSG.py
4# Description: 负责处理消息数据库数据
5# Author: xaoyaoo
6# Date: 2024/04/15
7# -------------------------------------------------------------------------------
8import json
9import os
10import re
11import lz4.block
12import blackboxprotobuf
13
14from .db_base import DatabaseBase
15from .utils import db_error, timestamp2str, xml2dict, match_BytesExtra, type_converter
16
17
18class MsgHandler(DatabaseBase):
19 _class_name = "MSG"
20 MSG_required_tables = ["MSG"]
21
22 def Msg_add_index(self):
23 """
24 添加索引,加快查询速度
25 """
26 # 检查是否存在索引
27 if not self.tables_exist("MSG"):
28 return
29 self.execute("CREATE INDEX IF NOT EXISTS idx_MSG_StrTalker ON MSG(StrTalker);")
30 self.execute("CREATE INDEX IF NOT EXISTS idx_MSG_CreateTime ON MSG(CreateTime);")
31 self.execute("CREATE INDEX IF NOT EXISTS idx_MSG_StrTalker_CreateTime ON MSG(StrTalker, CreateTime);")
32
33 @db_error
34 def get_m_msg_count(self, wxids: list = ""):
35 """
36 获取聊天记录数量,根据wxid获取单个联系人的聊天记录数量,不传wxid则获取所有联系人的聊天记录数量
37 :param wxids: wxid list
38 :return: 聊天记录数量列表 {wxid: chat_count, total: total_count}
39 """
40 if isinstance(wxids, str) and wxids:
41 wxids = [wxids]
42 if wxids:
43 wxids = "('" + "','".join(wxids) + "')"
44 sql = f"SELECT StrTalker, COUNT(*) FROM MSG WHERE StrTalker IN {wxids} GROUP BY StrTalker ORDER BY COUNT(*) DESC;"
45 else:
46 sql = f"SELECT StrTalker, COUNT(*) FROM MSG GROUP BY StrTalker ORDER BY COUNT(*) DESC;"
47 sql_total = f"SELECT COUNT(*) FROM MSG;"
48
49 if not self.tables_exist("MSG"):
50 return {}
51 result = self.execute(sql)
52 total_ret = self.execute(sql_total)
53
54 if not result:
55 return {}
56 total = 0
57 if total_ret and len(total_ret) > 0:
58 total = total_ret[0][0]
59
60 msg_count = {"total": total}
61 msg_count.update({row[0]: row[1] for row in result})
62 return msg_count
63
64 @db_error
65 def get_msg_list(self, wxids: list or str = "", start_index=0, page_size=500, msg_type: str = "",
66 msg_sub_type: str = "", start_createtime=None, end_createtime=None, my_talker="我"):
67 """
68 获取聊天记录列表
69 :param wxids: [wxid]
70 :param start_index: 起始索引
71 :param page_size: 页大小
72 :param msg_type: 消息类型
73 :param msg_sub_type: 消息子类型
74 :param start_createtime: 开始时间
75 :param end_createtime: 结束时间
76 :param my_talker: 我
77 :return: 聊天记录列表 {"id": _id, "MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender,
78 "talker": talker, "room_name": StrTalker, "msg": msg, "src": src, "extra": {},
79 "CreateTime": CreateTime, }
80 """
81 if not self.tables_exist("MSG"):
82 return [], []
83
84 if isinstance(wxids, str) and wxids:
85 wxids = [wxids]
86 param = ()
87 sql_wxid, param = (f"AND StrTalker in ({', '.join('?' for _ in wxids)}) ",
88 param + tuple(wxids)) if wxids else ("", param)
89 sql_type, param = ("AND Type=? ", param + (msg_type,)) if msg_type else ("", param)
90 sql_sub_type, param = ("AND SubType=? ", param + (msg_sub_type,)) if msg_type and msg_sub_type else ("", param)
91 sql_start_createtime, param = ("AND CreateTime>=? ", param + (start_createtime,)) if start_createtime else (
92 "", param)
93 sql_end_createtime, param = ("AND CreateTime<=? ", param + (end_createtime,)) if end_createtime else ("", param)
94
95 sql = (
96 "SELECT localId,TalkerId,MsgSvrID,Type,SubType,CreateTime,IsSender,Sequence,StatusEx,FlagEx,Status,"
97 "MsgSequence,StrContent,MsgServerSeq,StrTalker,DisplayContent,Reserved0,Reserved1,Reserved3,"
98 "Reserved4,Reserved5,Reserved6,CompressContent,BytesExtra,BytesTrans,Reserved2,"
99 "ROW_NUMBER() OVER (ORDER BY CreateTime ASC) AS id "
100 "FROM MSG WHERE 1=1 "
101 f"{sql_wxid}"
102 f"{sql_type}"
103 f"{sql_sub_type}"
104 f"{sql_start_createtime}"
105 f"{sql_end_createtime}"
106 f"ORDER BY CreateTime ASC LIMIT ?,?"
107 )
108 param = param + (start_index, page_size)
109 result = self.execute(sql, param)
110 if not result:
111 return [], []
112
113 result_data = (self.get_msg_detail(row, my_talker=my_talker) for row in result)
114 rdata = list(result_data) # 转为列表
115 wxid_list = {d['talker'] for d in rdata} # 创建一个无重复的 wxid 列表
116 return rdata, list(wxid_list)
117
118 @db_error
119 def get_date_count(self, wxid='', start_time: int = 0, end_time: int = 0, time_format='%Y-%m-%d'):
120 """
121 获取每日聊天记录数量,包括发送者数量、接收者数量和总数。
122 """
123 if not self.tables_exist("MSG"):
124 return {}
125 if isinstance(start_time, str) and start_time.isdigit():
126 start_time = int(start_time)
127 if isinstance(end_time, str) and end_time.isdigit():
128 end_time = int(end_time)
129
130 # if start_time or end_time is not an integer and not a float, set both to 0
131 if not (isinstance(start_time, (int, float)) and isinstance(end_time, (int, float))):
132 start_time = 0
133 end_time = 0
134 params = ()
135
136 sql_wxid = "AND StrTalker = ? " if wxid else ""
137 params = params + (wxid,) if wxid else params
138
139 sql_time = "AND CreateTime BETWEEN ? AND ? " if start_time and end_time else ""
140 params = params + (start_time, end_time) if start_time and end_time else params
141
142 sql = (f"SELECT strftime('{time_format}', CreateTime, 'unixepoch', 'localtime') AS date, "
143 " COUNT(*) AS total_count ,"
144 " SUM(CASE WHEN IsSender = 1 THEN 1 ELSE 0 END) AS sender_count, "
145 " SUM(CASE WHEN IsSender = 0 THEN 1 ELSE 0 END) AS receiver_count "
146 "FROM MSG "
147 "WHERE StrTalker NOT LIKE '%chatroom%' "
148 f"{sql_wxid} {sql_time} "
149 f"GROUP BY date ORDER BY date ASC;")
150 result = self.execute(sql, params)
151
152 if not result:
153 return {}
154 # 将查询结果转换为字典
155 result_dict = {}
156 for row in result:
157 date, total_count, sender_count, receiver_count = row
158 result_dict[date] = {
159 "sender_count": sender_count,
160 "receiver_count": receiver_count,
161 "total_count": total_count
162 }
163 return result_dict
164
165 @db_error
166 def get_top_talker_count(self, top: int = 10, start_time: int = 0, end_time: int = 0):
167 """
168 获取聊天记录数量最多的联系人,他们聊天记录数量
169 """
170 if not self.tables_exist("MSG"):
171 return {}
172 if isinstance(start_time, str) and start_time.isdigit():
173 start_time = int(start_time)
174 if isinstance(end_time, str) and end_time.isdigit():
175 end_time = int(end_time)
176
177 # if start_time or end_time is not an integer and not a float, set both to 0
178 if not (isinstance(start_time, (int, float)) and isinstance(end_time, (int, float))):
179 start_time = 0
180 end_time = 0
181
182 sql_time = f"AND CreateTime BETWEEN {start_time} AND {end_time} " if start_time and end_time else ""
183 sql = (
184 "SELECT StrTalker, COUNT(*) AS count,"
185 "SUM(CASE WHEN IsSender = 1 THEN 1 ELSE 0 END) AS sender_count, "
186 "SUM(CASE WHEN IsSender = 0 THEN 1 ELSE 0 END) AS receiver_count "
187 "FROM MSG "
188 "WHERE StrTalker NOT LIKE '%chatroom%' "
189 f"{sql_time} "
190 "GROUP BY StrTalker ORDER BY count DESC "
191 f"LIMIT {top};"
192 )
193 result = self.execute(sql)
194 if not result:
195 return {}
196 # 将查询结果转换为字典
197 result_dict = {row[0]: {"total_count": row[1], "sender_count": row[2], "receiver_count": row[3]} for row in
198 result}
199 return result_dict
200
201 # 单条消息处理
202 @db_error
203 def get_msg_detail(self, row, my_talker="我"):
204 """
205 获取单条消息详情,格式化输出
206 """
207 (localId, TalkerId, MsgSvrID, Type, SubType, CreateTime, IsSender, Sequence, StatusEx, FlagEx, Status,
208 MsgSequence, StrContent, MsgServerSeq, StrTalker, DisplayContent, Reserved0, Reserved1, Reserved3,
209 Reserved4, Reserved5, Reserved6, CompressContent, BytesExtra, BytesTrans, Reserved2, _id) = row
210
211 CreateTime = timestamp2str(CreateTime)
212
213 type_id = (Type, SubType)
214 type_name = type_converter(type_id)
215
216 msg = StrContent
217 src = ""
218 extra = {}
219
220 if type_id == (1, 0): # 文本
221 msg = StrContent
222
223 elif type_id == (3, 0): # 图片
224 DictExtra = get_BytesExtra(BytesExtra)
225 DictExtra_str = str(DictExtra)
226 img_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)]
227 img_paths = sorted(img_paths, key=lambda p: "Image" in p, reverse=True)
228 if img_paths:
229 img_path = img_paths[0].replace("'", "")
230 img_path = [i for i in img_path.split("\\") if i]
231 img_path = os.path.join(*img_path)
232 src = img_path
233 else:
234 src = ""
235 msg = "图片"
236 elif type_id == (34, 0): # 语音
237 tmp_c = xml2dict(StrContent)
238 voicelength = tmp_c.get("voicemsg", {}).get("voicelength", "")
239 transtext = tmp_c.get("voicetrans", {}).get("transtext", "")
240 if voicelength.isdigit():
241 voicelength = int(voicelength) / 1000
242 voicelength = f"{voicelength:.2f}"
243 msg = f"语音时长:{voicelength}秒\n翻译结果:{transtext}" if transtext else f"语音时长:{voicelength}秒"
244 src = os.path.join(f"{StrTalker}",
245 f"{CreateTime.replace(':', '-').replace(' ', '_')}_{IsSender}_{MsgSvrID}.wav")
246 elif type_id == (43, 0): # 视频
247 DictExtra = get_BytesExtra(BytesExtra)
248 DictExtra = str(DictExtra)
249
250 DictExtra_str = str(DictExtra)
251 video_paths = [i for i in re.findall(r"(FileStorage.*?)'", DictExtra_str)]
252 video_paths = sorted(video_paths, key=lambda p: "mp4" in p, reverse=True)
253 if video_paths:
254 video_path = video_paths[0].replace("'", "")
255 video_path = [i for i in video_path.split("\\") if i]
256 video_path = os.path.join(*video_path)
257 src = video_path
258 else:
259 src = ""
260 msg = "视频"
261
262 elif type_id == (47, 0): # 动画表情
263 content_tmp = xml2dict(StrContent)
264 cdnurl = content_tmp.get("emoji", {}).get("cdnurl", "")
265 if not cdnurl:
266 DictExtra = get_BytesExtra(BytesExtra)
267 cdnurl = match_BytesExtra(DictExtra)
268 if cdnurl:
269 msg, src = "表情", cdnurl
270
271 elif type_id == (48, 0): # 地图信息
272 content_tmp = xml2dict(StrContent)
273 location = content_tmp.get("location", {})
274 msg = (f"纬度:【{location.pop('x')}】 经度:【{location.pop('y')}】\n"
275 f"位置:{location.pop('label')} {location.pop('poiname')}\n"
276 f"其他信息:{json.dumps(location, ensure_ascii=False, indent=4)}"
277 )
278 src = ""
279 elif type_id == (49, 0): # 文件
280 DictExtra = get_BytesExtra(BytesExtra)
281 url = match_BytesExtra(DictExtra)
282 src = url
283 file_name = os.path.basename(url)
284 msg = file_name
285
286 elif type_id == (49, 5): # (分享)卡片式链接
287 CompressContent = decompress_CompressContent(CompressContent)
288 CompressContent_tmp = xml2dict(CompressContent)
289 appmsg = CompressContent_tmp.get("appmsg", {})
290 title = appmsg.get("title", "")
291 des = appmsg.get("des", "")
292 url = appmsg.get("url", "")
293 msg = f'{title}\n{des}\n\n<a href="{url}" target="_blank">点击查看详情</a>'
294 src = url
295 extra = appmsg
296
297 elif type_id == (49, 19): # 合并转发的聊天记录
298 CompressContent = decompress_CompressContent(CompressContent)
299 content_tmp = xml2dict(CompressContent)
300 title = content_tmp.get("appmsg", {}).get("title", "")
301 des = content_tmp.get("appmsg", {}).get("des", "")
302 recorditem = content_tmp.get("appmsg", {}).get("recorditem", "")
303 recorditem = xml2dict(recorditem)
304 msg = f"{title}\n{des}"
305 src = recorditem
306
307 elif type_id == (49, 57): # 带有引用的文本消息
308 CompressContent = decompress_CompressContent(CompressContent)
309 content_tmp = xml2dict(CompressContent)
310 appmsg = content_tmp.get("appmsg", {})
311
312 title = appmsg.get("title", "")
313 refermsg = appmsg.get("refermsg", {})
314
315 type_id = appmsg.get("type", "1")
316
317 displayname = refermsg.get("displayname", "")
318 display_content = refermsg.get("content", "")
319 display_createtime = refermsg.get("createtime", "")
320
321 display_createtime = timestamp2str(
322 int(display_createtime)) if display_createtime.isdigit() else display_createtime
323
324 if display_content and display_content.startswith("<?xml"):
325 display_content = xml2dict(display_content)
326 if "img" in display_content:
327 display_content = "图片"
328 else:
329 appmsg1 = display_content.get("appmsg", {})
330 title1 = appmsg1.get("title", "")
331 display_content = title1 if title1 else display_content
332 msg = f"{title}\n\n[引用]({display_createtime}){displayname}:{display_content}"
333 src = ""
334
335 elif type_id == (49, 2000): # 转账消息
336 CompressContent = decompress_CompressContent(CompressContent)
337 content_tmp = xml2dict(CompressContent)
338 wcpayinfo = content_tmp.get("appmsg", {}).get("wcpayinfo", {})
339 paysubtype = wcpayinfo.get("paysubtype", "") # 转账类型
340 feedesc = wcpayinfo.get("feedesc", "") # 转账金额
341 pay_memo = wcpayinfo.get("pay_memo", "") # 转账备注
342 begintransfertime = wcpayinfo.get("begintransfertime", "") # 转账开始时间
343 msg = (f"{'已收款' if paysubtype == '3' else '转账'}:{feedesc}\n"
344 f"转账说明:{pay_memo if pay_memo else ''}\n"
345 f"转账时间:{timestamp2str(begintransfertime)}\n"
346 )
347 src = ""
348
349 elif type_id[0] == 49 and type_id[1] != 0:
350 DictExtra = get_BytesExtra(BytesExtra)
351 url = match_BytesExtra(DictExtra)
352 src = url
353 msg = type_name
354
355 elif type_id == (50, 0): # 语音通话
356 msg = "语音/视频通话[%s]" % DisplayContent
357
358 # elif type_id == (10000, 0):
359 # msg = StrContent
360 # elif type_id == (10000, 4):
361 # msg = StrContent
362 # elif type_id == (10000, 8000):
363 # msg = StrContent
364
365 talker = "未知"
366 if IsSender == 1:
367 talker = my_talker
368 else:
369 if StrTalker.endswith("@chatroom"):
370 bytes_extra = get_BytesExtra(BytesExtra)
371 if bytes_extra:
372 try:
373 talker = bytes_extra['3'][0]['2']
374 if "publisher-id" in talker:
375 talker = "系统"
376 except:
377 pass
378 else:
379 talker = StrTalker
380
381 row_data = {"id": _id, "MsgSvrID": str(MsgSvrID), "type_name": type_name, "is_sender": IsSender,
382 "talker": talker, "room_name": StrTalker, "msg": msg, "src": src, "extra": extra,
383 "CreateTime": CreateTime, }
384 return row_data
385
386
387@db_error
388def decompress_CompressContent(data):
389 """
390 解压缩Msg:CompressContent内容
391 :param data: CompressContent内容 bytes
392 :return:
393 """
394 if data is None or not isinstance(data, bytes):
395 return None
396 try:
397 dst = lz4.block.decompress(data, uncompressed_size=len(data) << 8)
398 dst = dst.replace(b'\x00', b'') # 已经解码完成后,还含有0x00的部分,要删掉,要不后面ET识别的时候会报错
399 uncompressed_data = dst.decode('utf-8', errors='ignore')
400 return uncompressed_data
401 except Exception as e:
402 return data.decode('utf-8', errors='ignore')
403
404
405@db_error
406def get_BytesExtra(BytesExtra):
407 BytesExtra_message_type = {
408 "1": {
409 "type": "message",
410 "message_typedef": {
411 "1": {
412 "type": "int",
413 "name": ""
414 },
415 "2": {
416 "type": "int",
417 "name": ""
418 }
419 },
420 "name": "1"
421 },
422 "3": {
423 "type": "message",
424 "message_typedef": {
425 "1": {
426 "type": "int",
427 "name": ""
428 },
429 "2": {
430 "type": "str",
431 "name": ""
432 }
433 },
434 "name": "3",
435 "alt_typedefs": {
436 "1": {
437 "1": {
438 "type": "int",
439 "name": ""
440 },
441 "2": {
442 "type": "message",
443 "message_typedef": {},
444 "name": ""
445 }
446 },
447 "2": {
448 "1": {
449 "type": "int",
450 "name": ""
451 },
452 "2": {
453 "type": "message",
454 "message_typedef": {
455 "13": {
456 "type": "fixed32",
457 "name": ""
458 },
459 "12": {
460 "type": "fixed32",
461 "name": ""
462 }
463 },
464 "name": ""
465 }
466 },
467 "3": {
468 "1": {
469 "type": "int",
470 "name": ""
471 },
472 "2": {
473 "type": "message",
474 "message_typedef": {
475 "15": {
476 "type": "fixed64",
477 "name": ""
478 }
479 },
480 "name": ""
481 }
482 },
483 "4": {
484 "1": {
485 "type": "int",
486 "name": ""
487 },
488 "2": {
489 "type": "message",
490 "message_typedef": {
491 "15": {
492 "type": "int",
493 "name": ""
494 },
495 "14": {
496 "type": "fixed32",
497 "name": ""
498 }
499 },
500 "name": ""
501 }
502 },
503 "5": {
504 "1": {
505 "type": "int",
506 "name": ""
507 },
508 "2": {
509 "type": "message",
510 "message_typedef": {
511 "12": {
512 "type": "fixed32",
513 "name": ""
514 },
515 "7": {
516 "type": "fixed64",
517 "name": ""
518 },
519 "6": {
520 "type": "fixed64",
521 "name": ""
522 }
523 },
524 "name": ""
525 }
526 },
527 "6": {
528 "1": {
529 "type": "int",
530 "name": ""
531 },
532 "2": {
533 "type": "message",
534 "message_typedef": {
535 "7": {
536 "type": "fixed64",
537 "name": ""
538 },
539 "6": {
540 "type": "fixed32",
541 "name": ""
542 }
543 },
544 "name": ""
545 }
546 },
547 "7": {
548 "1": {
549 "type": "int",
550 "name": ""
551 },
552 "2": {
553 "type": "message",
554 "message_typedef": {
555 "12": {
556 "type": "fixed64",
557 "name": ""
558 }
559 },
560 "name": ""
561 }
562 },
563 "8": {
564 "1": {
565 "type": "int",
566 "name": ""
567 },
568 "2": {
569 "type": "message",
570 "message_typedef": {
571 "6": {
572 "type": "fixed64",
573 "name": ""
574 },
575 "12": {
576 "type": "fixed32",
577 "name": ""
578 }
579 },
580 "name": ""
581 }
582 },
583 "9": {
584 "1": {
585 "type": "int",
586 "name": ""
587 },
588 "2": {
589 "type": "message",
590 "message_typedef": {
591 "15": {
592 "type": "int",
593 "name": ""
594 },
595 "12": {
596 "type": "fixed64",
597 "name": ""
598 },
599 "6": {
600 "type": "int",
601 "name": ""
602 }
603 },
604 "name": ""
605 }
606 },
607 "10": {
608 "1": {
609 "type": "int",
610 "name": ""
611 },
612 "2": {
613 "type": "message",
614 "message_typedef": {
615 "6": {
616 "type": "fixed32",
617 "name": ""
618 },
619 "12": {
620 "type": "fixed64",
621 "name": ""
622 }
623 },
624 "name": ""
625 }
626 },
627 }
628 }
629 }
630 if BytesExtra is None or not isinstance(BytesExtra, bytes):
631 return None
632 try:
633 deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra, BytesExtra_message_type)
634 return deserialize_data
635 except Exception as e:
636 return None