解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: common_utils.py
4# Description:
5# Author: xaoyaoo
6# Date: 2024/04/15
7# -------------------------------------------------------------------------------
8import hashlib
9import os
10import re
11import time
12import wave
13
14import requests
15from io import BytesIO
16import pysilk
17import lxml.etree as ET # 这个模块更健壮些,微信XML格式有时有非标格式,会导致xml.etree.ElementTree处理失败
18from collections import defaultdict
19
20from ._loger import db_loger
21
22
23def db_error(func):
24 """
25 错误处理装饰器
26 :param func:
27 :return:
28 """
29
30 def wrapper(*args, **kwargs):
31 try:
32 return func(*args, **kwargs)
33 except Exception as e:
34 db_loger.error(f"db_error: {e}", exc_info=True)
35 return None
36
37 return wrapper
38
39
40def type_converter(type_id_or_name: [str, tuple]):
41 """
42 消息类型ID与名称转换
43 名称(str)=>ID(tuple)
44 ID(tuple)=>名称(str)
45 :param type_id_or_name: 消息类型ID或名称
46 :return: 消息类型ID或名称
47 """
48 type_name_dict = defaultdict(lambda: "未知", {
49 (1, 0): "文本",
50 (3, 0): "图片",
51 (34, 0): "语音",
52 (37, 0): "添加好友",
53 (42, 0): "推荐公众号",
54 (43, 0): "视频",
55 (47, 0): "动画表情",
56 (48, 0): "位置",
57
58 (49, 0): "文件",
59 (49, 1): "粘贴的文本",
60 (49, 3): "(分享)音乐",
61 (49, 4): "(分享)卡片式链接",
62 (49, 5): "(分享)卡片式链接",
63 (49, 6): "文件",
64 (49, 7): "游戏相关",
65 (49, 8): "用户上传的GIF表情",
66 (49, 15): "未知-49,15",
67 (49, 17): "位置共享",
68 (49, 19): "合并转发的聊天记录",
69 (49, 24): "(分享)笔记",
70 (49, 33): "(分享)小程序",
71 (49, 36): "(分享)小程序",
72 (49, 40): "(分享)收藏夹",
73 (49, 44): "(分享)小说(猜)",
74 (49, 50): "(分享)视频号名片",
75 (49, 51): "(分享)视频号视频",
76 (49, 53): "接龙",
77 (49, 57): "引用回复",
78 (49, 63): "视频号直播或直播回放",
79 (49, 74): "文件(猜)",
80 (49, 87): "群公告",
81 (49, 88): "视频号直播或直播回放等",
82 (49, 2000): "转账",
83 (49, 2003): "赠送红包封面",
84
85 (50, 0): "语音通话",
86 (65, 0): "企业微信打招呼(猜)",
87 (66, 0): "企业微信添加好友(猜)",
88
89 (10000, 0): "系统通知",
90 (10000, 1): "消息撤回1",
91 (10000, 4): "拍一拍",
92 (10000, 5): "消息撤回5",
93 (10000, 6): "消息撤回6",
94 (10000, 33): "消息撤回33",
95 (10000, 36): "消息撤回36",
96 (10000, 57): "消息撤回57",
97 (10000, 8000): "邀请加群",
98 (11000, 0): "未知-11000,0"
99 })
100
101 if isinstance(type_id_or_name, tuple):
102 return type_name_dict[type_id_or_name]
103 elif isinstance(type_id_or_name, str):
104 return next((k for k, v in type_name_dict.items() if v == type_id_or_name), (0, 0))
105 else:
106 raise ValueError("Invalid input type")
107
108
109def typeid2name(type_id: tuple):
110 """
111 获取消息类型名称
112 :param type_id: 消息类型ID 元组 eg: (1, 0)
113 :return:
114 """
115 return type_converter(type_id)
116
117
118def name2typeid(type_name: str):
119 """
120 获取消息类型ID
121 :param type_name: 消息类型名称
122 :return:
123 """
124 return type_converter(type_name)
125
126
127def get_md5(data):
128 md5 = hashlib.md5()
129 md5.update(data)
130 return md5.hexdigest()
131
132
133def timestamp2str(timestamp):
134 """
135 时间戳转换为时间字符串
136 :param timestamp: 时间戳
137 :return: 时间字符串
138 """
139 if isinstance(timestamp, str) and timestamp.isdigit():
140 timestamp = int(timestamp)
141 elif isinstance(timestamp, int) or isinstance(timestamp, float):
142 pass
143 else:
144 return timestamp
145
146 if len(str(timestamp)) == 13:
147 timestamp = timestamp / 1000
148 elif len(str(timestamp)) == 10:
149 pass
150 else:
151 return timestamp
152
153 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
154
155
156def dat2img(input_data):
157 """
158 读取图片文件dat格式
159 :param input_data: 图片文件路径或者图片文件数据
160 :return: 图片格式,图片md5,图片数据
161 """
162 # 常见图片格式的文件头
163 img_head = {
164 b"\xFF\xD8\xFF": ".jpg",
165 b"\x89\x50\x4E\x47": ".png",
166 b"\x47\x49\x46\x38": ".gif",
167 b"\x42\x4D": ".BMP",
168 b"\x49\x49": ".TIFF",
169 b"\x4D\x4D": ".TIFF",
170 b"\x00\x00\x01\x00": ".ICO",
171 b"\x52\x49\x46\x46": ".WebP",
172 b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": ".HEIC",
173 }
174
175 if isinstance(input_data, str):
176 with open(input_data, "rb") as f:
177 input_bytes = f.read()
178 else:
179 input_bytes = input_data
180
181 try:
182 import numpy as np
183 input_bytes = np.frombuffer(input_bytes, dtype=np.uint8)
184 for hcode in img_head: # 遍历文件头
185 t = input_bytes[0] ^ hcode[0] # 异或解密
186 if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8),
187 np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作,并进行类型转换
188 fomt = img_head[hcode] # 获取文件格式
189
190 out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作
191 md5 = get_md5(out_bytes)
192 return True, fomt, md5, out_bytes
193 return False, False, False, False
194 except ImportError:
195 pass
196
197 for hcode in img_head:
198 t = input_bytes[0] ^ hcode[0]
199 for i in range(1, len(hcode)):
200 if t == input_bytes[i] ^ hcode[i]:
201 fomt = img_head[hcode]
202 out_bytes = bytearray()
203 for nowByte in input_bytes: # 读取文件
204 newByte = nowByte ^ t # 异或解密
205 out_bytes.append(newByte)
206 md5 = get_md5(out_bytes)
207 return True, fomt, md5, out_bytes
208 return False, False, False, False
209
210
211def xml2dict(xml_string):
212 """
213 解析 XML 字符串
214 :param xml_string: 要解析的 XML 字符串
215 :return: 解析结果,以字典形式返回
216 """
217
218 def parse_xml(element):
219 """
220 递归解析 XML 元素
221 :param element: 要解析的 XML 元素
222 :return: 解析结果,以字典形式返回
223 """
224 result = {}
225 # 解析当前元素的属性
226 if element is None or element.attrib is None: # 有时可能会遇到没有属性,要处理下
227 return result
228 for key, value in element.attrib.items():
229 result[key] = value
230 # 解析当前元素的子元素
231 for child in element:
232 child_result = parse_xml(child)
233 # 如果子元素的标签已经在结果中存在,则将其转换为列表
234 if child.tag in result:
235 if not isinstance(result[child.tag], list):
236 result[child.tag] = [result[child.tag]]
237 result[child.tag].append(child_result)
238 else:
239 result[child.tag] = child_result
240 # 如果当前元素没有子元素,则将其文本内容作为值保存
241 if not result and element.text:
242 result = element.text
243 return result
244
245 if xml_string is None or not isinstance(xml_string, str):
246 return None
247 try:
248 parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面,会冒出来xml格式不对的情况,这里把parser设置成忽略错误
249 root = ET.fromstring(xml_string, parser)
250 except Exception as e:
251 return xml_string
252 return parse_xml(root)
253
254
255def download_file(url, save_path=None, proxies=None):
256 """
257 下载文件
258 :param url: 文件下载地址
259 :param save_path: 保存路径
260 :param proxies: requests 代理
261 :return: 保存路径
262 """
263 headers = {
264 "User-Agent": "Mozilla/5.0 (Linux; Android 10; Redmi K40 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36"
265
266 }
267 r = requests.get(url, headers=headers, proxies=proxies)
268 if r.status_code != 200:
269 return None
270 data = r.content
271 if save_path and isinstance(save_path, str):
272 # 创建文件夹
273 if not os.path.exists(os.path.dirname(save_path)):
274 os.makedirs(os.path.dirname(save_path))
275 with open(save_path, "wb") as f:
276 f.write(data)
277 return data
278
279
280def bytes2str(d):
281 """
282 遍历字典并将bytes转换为字符串
283 :param d:
284 :return:
285 """
286 for k, v in d.items():
287 if isinstance(v, dict):
288 bytes2str(v)
289 elif isinstance(v, list):
290 for item in v:
291 if isinstance(item, dict):
292 bytes2str(item)
293 elif isinstance(item, bytes):
294 item = item.decode('utf-8') # 将bytes转换为字符串
295 elif isinstance(v, bytes):
296 d[k] = v.decode('utf-8')
297
298
299def read_dict_all_values(data):
300 """
301 读取字典中所有的值(单层)
302 :param dict_data: 字典
303 :return: 所有值的list
304 """
305 result = []
306 if isinstance(data, list):
307 for item in data:
308 result.extend(read_dict_all_values(item))
309 elif isinstance(data, dict):
310 for key, value in data.items():
311 result.extend(read_dict_all_values(value))
312 else:
313 if isinstance(data, bytes):
314 tmp = data.decode("utf-8")
315 else:
316 tmp = str(data) if isinstance(data, int) else data
317 result.append(tmp)
318
319 for i in range(len(result)):
320 if isinstance(result[i], bytes):
321 result[i] = result[i].decode("utf-8")
322 return result
323
324
325def match_BytesExtra(BytesExtra, pattern=r"FileStorage(.*?)'"):
326 """
327 匹配 BytesExtra
328 :param BytesExtra: BytesExtra
329 :param pattern: 匹配模式
330 :return:
331 """
332 if not BytesExtra:
333 return False
334 BytesExtra = read_dict_all_values(BytesExtra)
335 BytesExtra = "'" + "'".join(BytesExtra) + "'"
336 # print(BytesExtra)
337
338 match = re.search(pattern, BytesExtra)
339 if match:
340 video_path = match.group(0).replace("'", "")
341 return video_path
342 else:
343 return ""
344
345
346def silk2audio(buf_data, is_play=False, is_wave=False, save_path=None, rate=24000):
347 silk_file = BytesIO(buf_data) # 读取silk文件
348 pcm_file = BytesIO() # 创建pcm文件
349
350 pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件->pcm文件
351 pcm_data = pcm_file.getvalue() # 获取pcm文件数据
352
353 silk_file.close() # 关闭silk文件
354 pcm_file.close() # 关闭pcm文件
355 if is_play: # 播放音频
356 def play_audio(pcm_data, rate):
357 try:
358 import pyaudio
359 except ImportError:
360 raise ImportError("请先安装pyaudio库[ pip install pyaudio ]")
361
362 p = pyaudio.PyAudio() # 实例化pyaudio
363 stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象
364 stream.write(pcm_data) # 写入音频流
365 stream.stop_stream() # 停止音频流
366 stream.close() # 关闭音频流
367 p.terminate() # 关闭pyaudio
368
369 play_audio(pcm_data, rate)
370
371 # print(is_play, is_wave, save_path)
372
373 if is_wave: # 转换为wav文件
374 wave_file = BytesIO() # 创建wav文件
375 with wave.open(wave_file, 'wb') as wf:
376 wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数
377 wf.writeframes(pcm_data) # 写入wav文件
378 rdata = wave_file.getvalue() # 获取wav文件数据
379 wave_file.close() # 关闭wav文件
380 if save_path and isinstance(save_path, str):
381 with open(save_path, "wb") as f:
382 f.write(rdata)
383 print('saved wav file')
384 return rdata
385
386 return pcm_data