解锁并提取Linux客户端微信数据库 (vibe coded)
at 386 lines 12 kB view raw
1# -*- coding: utf-8 -*-# 2# ------------------------------------------------------------------------------- 3# Name: common_utils.py 4# Description: 5# Author: xaoyaoo 6# Date: 2024/04/15 7# ------------------------------------------------------------------------------- 8import hashlib 9import os 10import re 11import time 12import wave 13 14import requests 15from io import BytesIO 16import pysilk 17import lxml.etree as ET # 这个模块更健壮些,微信XML格式有时有非标格式,会导致xml.etree.ElementTree处理失败 18from collections import defaultdict 19 20from ._loger import db_loger 21 22 23def db_error(func): 24 """ 25 错误处理装饰器 26 :param func: 27 :return: 28 """ 29 30 def wrapper(*args, **kwargs): 31 try: 32 return func(*args, **kwargs) 33 except Exception as e: 34 db_loger.error(f"db_error: {e}", exc_info=True) 35 return None 36 37 return wrapper 38 39 40def type_converter(type_id_or_name: [str, tuple]): 41 """ 42 消息类型ID与名称转换 43 名称(str)=>ID(tuple) 44 ID(tuple)=>名称(str) 45 :param type_id_or_name: 消息类型ID或名称 46 :return: 消息类型ID或名称 47 """ 48 type_name_dict = defaultdict(lambda: "未知", { 49 (1, 0): "文本", 50 (3, 0): "图片", 51 (34, 0): "语音", 52 (37, 0): "添加好友", 53 (42, 0): "推荐公众号", 54 (43, 0): "视频", 55 (47, 0): "动画表情", 56 (48, 0): "位置", 57 58 (49, 0): "文件", 59 (49, 1): "粘贴的文本", 60 (49, 3): "(分享)音乐", 61 (49, 4): "(分享)卡片式链接", 62 (49, 5): "(分享)卡片式链接", 63 (49, 6): "文件", 64 (49, 7): "游戏相关", 65 (49, 8): "用户上传的GIF表情", 66 (49, 15): "未知-49,15", 67 (49, 17): "位置共享", 68 (49, 19): "合并转发的聊天记录", 69 (49, 24): "(分享)笔记", 70 (49, 33): "(分享)小程序", 71 (49, 36): "(分享)小程序", 72 (49, 40): "(分享)收藏夹", 73 (49, 44): "(分享)小说(猜)", 74 (49, 50): "(分享)视频号名片", 75 (49, 51): "(分享)视频号视频", 76 (49, 53): "接龙", 77 (49, 57): "引用回复", 78 (49, 63): "视频号直播或直播回放", 79 (49, 74): "文件(猜)", 80 (49, 87): "群公告", 81 (49, 88): "视频号直播或直播回放等", 82 (49, 2000): "转账", 83 (49, 2003): "赠送红包封面", 84 85 (50, 0): "语音通话", 86 (65, 0): "企业微信打招呼(猜)", 87 (66, 0): "企业微信添加好友(猜)", 88 89 (10000, 0): "系统通知", 90 (10000, 1): "消息撤回1", 91 (10000, 4): "拍一拍", 92 (10000, 5): "消息撤回5", 93 (10000, 6): "消息撤回6", 94 (10000, 33): "消息撤回33", 95 (10000, 36): "消息撤回36", 96 (10000, 57): "消息撤回57", 97 (10000, 8000): "邀请加群", 98 (11000, 0): "未知-11000,0" 99 }) 100 101 if isinstance(type_id_or_name, tuple): 102 return type_name_dict[type_id_or_name] 103 elif isinstance(type_id_or_name, str): 104 return next((k for k, v in type_name_dict.items() if v == type_id_or_name), (0, 0)) 105 else: 106 raise ValueError("Invalid input type") 107 108 109def typeid2name(type_id: tuple): 110 """ 111 获取消息类型名称 112 :param type_id: 消息类型ID 元组 eg: (1, 0) 113 :return: 114 """ 115 return type_converter(type_id) 116 117 118def name2typeid(type_name: str): 119 """ 120 获取消息类型ID 121 :param type_name: 消息类型名称 122 :return: 123 """ 124 return type_converter(type_name) 125 126 127def get_md5(data): 128 md5 = hashlib.md5() 129 md5.update(data) 130 return md5.hexdigest() 131 132 133def timestamp2str(timestamp): 134 """ 135 时间戳转换为时间字符串 136 :param timestamp: 时间戳 137 :return: 时间字符串 138 """ 139 if isinstance(timestamp, str) and timestamp.isdigit(): 140 timestamp = int(timestamp) 141 elif isinstance(timestamp, int) or isinstance(timestamp, float): 142 pass 143 else: 144 return timestamp 145 146 if len(str(timestamp)) == 13: 147 timestamp = timestamp / 1000 148 elif len(str(timestamp)) == 10: 149 pass 150 else: 151 return timestamp 152 153 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) 154 155 156def dat2img(input_data): 157 """ 158 读取图片文件dat格式 159 :param input_data: 图片文件路径或者图片文件数据 160 :return: 图片格式,图片md5,图片数据 161 """ 162 # 常见图片格式的文件头 163 img_head = { 164 b"\xFF\xD8\xFF": ".jpg", 165 b"\x89\x50\x4E\x47": ".png", 166 b"\x47\x49\x46\x38": ".gif", 167 b"\x42\x4D": ".BMP", 168 b"\x49\x49": ".TIFF", 169 b"\x4D\x4D": ".TIFF", 170 b"\x00\x00\x01\x00": ".ICO", 171 b"\x52\x49\x46\x46": ".WebP", 172 b"\x00\x00\x00\x18\x66\x74\x79\x70\x68\x65\x69\x63": ".HEIC", 173 } 174 175 if isinstance(input_data, str): 176 with open(input_data, "rb") as f: 177 input_bytes = f.read() 178 else: 179 input_bytes = input_data 180 181 try: 182 import numpy as np 183 input_bytes = np.frombuffer(input_bytes, dtype=np.uint8) 184 for hcode in img_head: # 遍历文件头 185 t = input_bytes[0] ^ hcode[0] # 异或解密 186 if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8), 187 np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作,并进行类型转换 188 fomt = img_head[hcode] # 获取文件格式 189 190 out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作 191 md5 = get_md5(out_bytes) 192 return True, fomt, md5, out_bytes 193 return False, False, False, False 194 except ImportError: 195 pass 196 197 for hcode in img_head: 198 t = input_bytes[0] ^ hcode[0] 199 for i in range(1, len(hcode)): 200 if t == input_bytes[i] ^ hcode[i]: 201 fomt = img_head[hcode] 202 out_bytes = bytearray() 203 for nowByte in input_bytes: # 读取文件 204 newByte = nowByte ^ t # 异或解密 205 out_bytes.append(newByte) 206 md5 = get_md5(out_bytes) 207 return True, fomt, md5, out_bytes 208 return False, False, False, False 209 210 211def xml2dict(xml_string): 212 """ 213 解析 XML 字符串 214 :param xml_string: 要解析的 XML 字符串 215 :return: 解析结果,以字典形式返回 216 """ 217 218 def parse_xml(element): 219 """ 220 递归解析 XML 元素 221 :param element: 要解析的 XML 元素 222 :return: 解析结果,以字典形式返回 223 """ 224 result = {} 225 # 解析当前元素的属性 226 if element is None or element.attrib is None: # 有时可能会遇到没有属性,要处理下 227 return result 228 for key, value in element.attrib.items(): 229 result[key] = value 230 # 解析当前元素的子元素 231 for child in element: 232 child_result = parse_xml(child) 233 # 如果子元素的标签已经在结果中存在,则将其转换为列表 234 if child.tag in result: 235 if not isinstance(result[child.tag], list): 236 result[child.tag] = [result[child.tag]] 237 result[child.tag].append(child_result) 238 else: 239 result[child.tag] = child_result 240 # 如果当前元素没有子元素,则将其文本内容作为值保存 241 if not result and element.text: 242 result = element.text 243 return result 244 245 if xml_string is None or not isinstance(xml_string, str): 246 return None 247 try: 248 parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面,会冒出来xml格式不对的情况,这里把parser设置成忽略错误 249 root = ET.fromstring(xml_string, parser) 250 except Exception as e: 251 return xml_string 252 return parse_xml(root) 253 254 255def download_file(url, save_path=None, proxies=None): 256 """ 257 下载文件 258 :param url: 文件下载地址 259 :param save_path: 保存路径 260 :param proxies: requests 代理 261 :return: 保存路径 262 """ 263 headers = { 264 "User-Agent": "Mozilla/5.0 (Linux; Android 10; Redmi K40 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36" 265 266 } 267 r = requests.get(url, headers=headers, proxies=proxies) 268 if r.status_code != 200: 269 return None 270 data = r.content 271 if save_path and isinstance(save_path, str): 272 # 创建文件夹 273 if not os.path.exists(os.path.dirname(save_path)): 274 os.makedirs(os.path.dirname(save_path)) 275 with open(save_path, "wb") as f: 276 f.write(data) 277 return data 278 279 280def bytes2str(d): 281 """ 282 遍历字典并将bytes转换为字符串 283 :param d: 284 :return: 285 """ 286 for k, v in d.items(): 287 if isinstance(v, dict): 288 bytes2str(v) 289 elif isinstance(v, list): 290 for item in v: 291 if isinstance(item, dict): 292 bytes2str(item) 293 elif isinstance(item, bytes): 294 item = item.decode('utf-8') # 将bytes转换为字符串 295 elif isinstance(v, bytes): 296 d[k] = v.decode('utf-8') 297 298 299def read_dict_all_values(data): 300 """ 301 读取字典中所有的值(单层) 302 :param dict_data: 字典 303 :return: 所有值的list 304 """ 305 result = [] 306 if isinstance(data, list): 307 for item in data: 308 result.extend(read_dict_all_values(item)) 309 elif isinstance(data, dict): 310 for key, value in data.items(): 311 result.extend(read_dict_all_values(value)) 312 else: 313 if isinstance(data, bytes): 314 tmp = data.decode("utf-8") 315 else: 316 tmp = str(data) if isinstance(data, int) else data 317 result.append(tmp) 318 319 for i in range(len(result)): 320 if isinstance(result[i], bytes): 321 result[i] = result[i].decode("utf-8") 322 return result 323 324 325def match_BytesExtra(BytesExtra, pattern=r"FileStorage(.*?)'"): 326 """ 327 匹配 BytesExtra 328 :param BytesExtra: BytesExtra 329 :param pattern: 匹配模式 330 :return: 331 """ 332 if not BytesExtra: 333 return False 334 BytesExtra = read_dict_all_values(BytesExtra) 335 BytesExtra = "'" + "'".join(BytesExtra) + "'" 336 # print(BytesExtra) 337 338 match = re.search(pattern, BytesExtra) 339 if match: 340 video_path = match.group(0).replace("'", "") 341 return video_path 342 else: 343 return "" 344 345 346def silk2audio(buf_data, is_play=False, is_wave=False, save_path=None, rate=24000): 347 silk_file = BytesIO(buf_data) # 读取silk文件 348 pcm_file = BytesIO() # 创建pcm文件 349 350 pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件->pcm文件 351 pcm_data = pcm_file.getvalue() # 获取pcm文件数据 352 353 silk_file.close() # 关闭silk文件 354 pcm_file.close() # 关闭pcm文件 355 if is_play: # 播放音频 356 def play_audio(pcm_data, rate): 357 try: 358 import pyaudio 359 except ImportError: 360 raise ImportError("请先安装pyaudio库[ pip install pyaudio ]") 361 362 p = pyaudio.PyAudio() # 实例化pyaudio 363 stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象 364 stream.write(pcm_data) # 写入音频流 365 stream.stop_stream() # 停止音频流 366 stream.close() # 关闭音频流 367 p.terminate() # 关闭pyaudio 368 369 play_audio(pcm_data, rate) 370 371 # print(is_play, is_wave, save_path) 372 373 if is_wave: # 转换为wav文件 374 wave_file = BytesIO() # 创建wav文件 375 with wave.open(wave_file, 'wb') as wf: 376 wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数 377 wf.writeframes(pcm_data) # 写入wav文件 378 rdata = wave_file.getvalue() # 获取wav文件数据 379 wave_file.close() # 关闭wav文件 380 if save_path and isinstance(save_path, str): 381 with open(save_path, "wb") as f: 382 f.write(rdata) 383 print('saved wav file') 384 return rdata 385 386 return pcm_data