{"contents":"# -*- coding: utf-8 -*-#\n# -------------------------------------------------------------------------------\n# Name: common_utils.py\n# Description: \n# Author: xaoyaoo\n# Date: 2024/04/15\n# -------------------------------------------------------------------------------\nimport hashlib\nimport os\nimport re\nimport time\nimport wave\n\nimport requests\nfrom io import BytesIO\nimport pysilk\nimport lxml.etree as ET # 这个模块更健壮些,微信XML格式有时有非标格式,会导致xml.etree.ElementTree处理失败\nfrom collections import defaultdict\n\nfrom ._loger import db_loger\n\n\ndef db_error(func):\n \"\"\"\n 错误处理装饰器\n :param func:\n :return:\n \"\"\"\n\n def wrapper(*args, **kwargs):\n try:\n return func(*args, **kwargs)\n except Exception as e:\n db_loger.error(f\"db_error: {e}\", exc_info=True)\n return None\n\n return wrapper\n\n\ndef type_converter(type_id_or_name: [str, tuple]):\n \"\"\"\n 消息类型ID与名称转换\n 名称(str)=\u003eID(tuple)\n ID(tuple)=\u003e名称(str)\n :param type_id_or_name: 消息类型ID或名称\n :return: 消息类型ID或名称\n \"\"\"\n type_name_dict = defaultdict(lambda: \"未知\", {\n (1, 0): \"文本\",\n (3, 0): \"图片\",\n (34, 0): \"语音\",\n (37, 0): \"添加好友\",\n (42, 0): \"推荐公众号\",\n (43, 0): \"视频\",\n (47, 0): \"动画表情\",\n (48, 0): \"位置\",\n\n (49, 0): \"文件\",\n (49, 1): \"粘贴的文本\",\n (49, 3): \"(分享)音乐\",\n (49, 4): \"(分享)卡片式链接\",\n (49, 5): \"(分享)卡片式链接\",\n (49, 6): \"文件\",\n (49, 7): \"游戏相关\",\n (49, 8): \"用户上传的GIF表情\",\n (49, 15): \"未知-49,15\",\n (49, 17): \"位置共享\",\n (49, 19): \"合并转发的聊天记录\",\n (49, 24): \"(分享)笔记\",\n (49, 33): \"(分享)小程序\",\n (49, 36): \"(分享)小程序\",\n (49, 40): \"(分享)收藏夹\",\n (49, 44): \"(分享)小说(猜)\",\n (49, 50): \"(分享)视频号名片\",\n (49, 51): \"(分享)视频号视频\",\n (49, 53): \"接龙\",\n (49, 57): \"引用回复\",\n (49, 63): \"视频号直播或直播回放\",\n (49, 74): \"文件(猜)\",\n (49, 87): \"群公告\",\n (49, 88): \"视频号直播或直播回放等\",\n (49, 2000): \"转账\",\n (49, 2003): \"赠送红包封面\",\n\n (50, 0): \"语音通话\",\n (65, 0): \"企业微信打招呼(猜)\",\n (66, 0): \"企业微信添加好友(猜)\",\n\n (10000, 0): \"系统通知\",\n (10000, 1): \"消息撤回1\",\n (10000, 4): \"拍一拍\",\n (10000, 5): \"消息撤回5\",\n (10000, 6): \"消息撤回6\",\n (10000, 33): \"消息撤回33\",\n (10000, 36): \"消息撤回36\",\n (10000, 57): \"消息撤回57\",\n (10000, 8000): \"邀请加群\",\n (11000, 0): \"未知-11000,0\"\n })\n\n if isinstance(type_id_or_name, tuple):\n return type_name_dict[type_id_or_name]\n elif isinstance(type_id_or_name, str):\n return next((k for k, v in type_name_dict.items() if v == type_id_or_name), (0, 0))\n else:\n raise ValueError(\"Invalid input type\")\n\n\ndef typeid2name(type_id: tuple):\n \"\"\"\n 获取消息类型名称\n :param type_id: 消息类型ID 元组 eg: (1, 0)\n :return:\n \"\"\"\n return type_converter(type_id)\n\n\ndef name2typeid(type_name: str):\n \"\"\"\n 获取消息类型ID\n :param type_name: 消息类型名称\n :return:\n \"\"\"\n return type_converter(type_name)\n\n\ndef get_md5(data):\n md5 = hashlib.md5()\n md5.update(data)\n return md5.hexdigest()\n\n\ndef timestamp2str(timestamp):\n \"\"\"\n 时间戳转换为时间字符串\n :param timestamp: 时间戳\n :return: 时间字符串\n \"\"\"\n if isinstance(timestamp, str) and timestamp.isdigit():\n timestamp = int(timestamp)\n elif isinstance(timestamp, int) or isinstance(timestamp, float):\n pass\n else:\n return timestamp\n\n if len(str(timestamp)) == 13:\n timestamp = timestamp / 1000\n elif len(str(timestamp)) == 10:\n pass\n else:\n return timestamp\n\n return time.strftime(\"%Y-%m-%d %H:%M:%S\", time.localtime(timestamp))\n\n\ndef dat2img(input_data):\n \"\"\"\n 读取图片文件dat格式\n :param input_data: 图片文件路径或者图片文件数据\n :return: 图片格式,图片md5,图片数据\n \"\"\"\n # 常见图片格式的文件头\n img_head = {\n b\"\\xFF\\xD8\\xFF\": \".jpg\",\n b\"\\x89\\x50\\x4E\\x47\": \".png\",\n b\"\\x47\\x49\\x46\\x38\": \".gif\",\n b\"\\x42\\x4D\": \".BMP\",\n b\"\\x49\\x49\": \".TIFF\",\n b\"\\x4D\\x4D\": \".TIFF\",\n b\"\\x00\\x00\\x01\\x00\": \".ICO\",\n b\"\\x52\\x49\\x46\\x46\": \".WebP\",\n b\"\\x00\\x00\\x00\\x18\\x66\\x74\\x79\\x70\\x68\\x65\\x69\\x63\": \".HEIC\",\n }\n\n if isinstance(input_data, str):\n with open(input_data, \"rb\") as f:\n input_bytes = f.read()\n else:\n input_bytes = input_data\n\n try:\n import numpy as np\n input_bytes = np.frombuffer(input_bytes, dtype=np.uint8)\n for hcode in img_head: # 遍历文件头\n t = input_bytes[0] ^ hcode[0] # 异或解密\n if np.all(t == np.bitwise_xor(np.frombuffer(input_bytes[:len(hcode)], dtype=np.uint8),\n np.frombuffer(hcode, dtype=np.uint8))): # 使用NumPy进行向量化的异或解密操作,并进行类型转换\n fomt = img_head[hcode] # 获取文件格式\n\n out_bytes = np.bitwise_xor(input_bytes, t) # 使用NumPy进行向量化的异或解密操作\n md5 = get_md5(out_bytes)\n return True, fomt, md5, out_bytes\n return False, False, False, False\n except ImportError:\n pass\n\n for hcode in img_head:\n t = input_bytes[0] ^ hcode[0]\n for i in range(1, len(hcode)):\n if t == input_bytes[i] ^ hcode[i]:\n fomt = img_head[hcode]\n out_bytes = bytearray()\n for nowByte in input_bytes: # 读取文件\n newByte = nowByte ^ t # 异或解密\n out_bytes.append(newByte)\n md5 = get_md5(out_bytes)\n return True, fomt, md5, out_bytes\n return False, False, False, False\n\n\ndef xml2dict(xml_string):\n \"\"\"\n 解析 XML 字符串\n :param xml_string: 要解析的 XML 字符串\n :return: 解析结果,以字典形式返回\n \"\"\"\n\n def parse_xml(element):\n \"\"\"\n 递归解析 XML 元素\n :param element: 要解析的 XML 元素\n :return: 解析结果,以字典形式返回\n \"\"\"\n result = {}\n # 解析当前元素的属性\n if element is None or element.attrib is None: # 有时可能会遇到没有属性,要处理下\n return result\n for key, value in element.attrib.items():\n result[key] = value\n # 解析当前元素的子元素\n for child in element:\n child_result = parse_xml(child)\n # 如果子元素的标签已经在结果中存在,则将其转换为列表\n if child.tag in result:\n if not isinstance(result[child.tag], list):\n result[child.tag] = [result[child.tag]]\n result[child.tag].append(child_result)\n else:\n result[child.tag] = child_result\n # 如果当前元素没有子元素,则将其文本内容作为值保存\n if not result and element.text:\n result = element.text\n return result\n\n if xml_string is None or not isinstance(xml_string, str):\n return None\n try:\n parser = ET.XMLParser(recover=True) # 有时微信的聊天记录里面,会冒出来xml格式不对的情况,这里把parser设置成忽略错误\n root = ET.fromstring(xml_string, parser)\n except Exception as e:\n return xml_string\n return parse_xml(root)\n\n\ndef download_file(url, save_path=None, proxies=None):\n \"\"\"\n 下载文件\n :param url: 文件下载地址\n :param save_path: 保存路径\n :param proxies: requests 代理\n :return: 保存路径\n \"\"\"\n headers = {\n \"User-Agent\": \"Mozilla/5.0 (Linux; Android 10; Redmi K40 Pro) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Mobile Safari/537.36\"\n\n }\n r = requests.get(url, headers=headers, proxies=proxies)\n if r.status_code != 200:\n return None\n data = r.content\n if save_path and isinstance(save_path, str):\n # 创建文件夹\n if not os.path.exists(os.path.dirname(save_path)):\n os.makedirs(os.path.dirname(save_path))\n with open(save_path, \"wb\") as f:\n f.write(data)\n return data\n\n\ndef bytes2str(d):\n \"\"\"\n 遍历字典并将bytes转换为字符串\n :param d:\n :return:\n \"\"\"\n for k, v in d.items():\n if isinstance(v, dict):\n bytes2str(v)\n elif isinstance(v, list):\n for item in v:\n if isinstance(item, dict):\n bytes2str(item)\n elif isinstance(item, bytes):\n item = item.decode('utf-8') # 将bytes转换为字符串\n elif isinstance(v, bytes):\n d[k] = v.decode('utf-8')\n\n\ndef read_dict_all_values(data):\n \"\"\"\n 读取字典中所有的值(单层)\n :param dict_data: 字典\n :return: 所有值的list\n \"\"\"\n result = []\n if isinstance(data, list):\n for item in data:\n result.extend(read_dict_all_values(item))\n elif isinstance(data, dict):\n for key, value in data.items():\n result.extend(read_dict_all_values(value))\n else:\n if isinstance(data, bytes):\n tmp = data.decode(\"utf-8\")\n else:\n tmp = str(data) if isinstance(data, int) else data\n result.append(tmp)\n\n for i in range(len(result)):\n if isinstance(result[i], bytes):\n result[i] = result[i].decode(\"utf-8\")\n return result\n\n\ndef match_BytesExtra(BytesExtra, pattern=r\"FileStorage(.*?)'\"):\n \"\"\"\n 匹配 BytesExtra\n :param BytesExtra: BytesExtra\n :param pattern: 匹配模式\n :return:\n \"\"\"\n if not BytesExtra:\n return False\n BytesExtra = read_dict_all_values(BytesExtra)\n BytesExtra = \"'\" + \"'\".join(BytesExtra) + \"'\"\n # print(BytesExtra)\n\n match = re.search(pattern, BytesExtra)\n if match:\n video_path = match.group(0).replace(\"'\", \"\")\n return video_path\n else:\n return \"\"\n\n\ndef silk2audio(buf_data, is_play=False, is_wave=False, save_path=None, rate=24000):\n silk_file = BytesIO(buf_data) # 读取silk文件\n pcm_file = BytesIO() # 创建pcm文件\n\n pysilk.decode(silk_file, pcm_file, rate) # 解码silk文件-\u003epcm文件\n pcm_data = pcm_file.getvalue() # 获取pcm文件数据\n\n silk_file.close() # 关闭silk文件\n pcm_file.close() # 关闭pcm文件\n if is_play: # 播放音频\n def play_audio(pcm_data, rate):\n try:\n import pyaudio\n except ImportError:\n raise ImportError(\"请先安装pyaudio库[ pip install pyaudio ]\")\n\n p = pyaudio.PyAudio() # 实例化pyaudio\n stream = p.open(format=pyaudio.paInt16, channels=1, rate=rate, output=True) # 创建音频流对象\n stream.write(pcm_data) # 写入音频流\n stream.stop_stream() # 停止音频流\n stream.close() # 关闭音频流\n p.terminate() # 关闭pyaudio\n\n play_audio(pcm_data, rate)\n\n # print(is_play, is_wave, save_path)\n\n if is_wave: # 转换为wav文件\n wave_file = BytesIO() # 创建wav文件\n with wave.open(wave_file, 'wb') as wf:\n wf.setparams((1, 2, rate, 0, 'NONE', 'NONE')) # 设置wav文件参数\n wf.writeframes(pcm_data) # 写入wav文件\n rdata = wave_file.getvalue() # 获取wav文件数据\n wave_file.close() # 关闭wav文件\n if save_path and isinstance(save_path, str):\n with open(save_path, \"wb\") as f:\n f.write(rdata)\n print('saved wav file')\n return rdata\n\n return pcm_data\n","is_binary":false,"path":"wxdump_linux/db/utils/common_utils.py","ref":""}