{"contents":"# -*- coding: utf-8 -*-#\n# -------------------------------------------------------------------------------\n# Name: analyser.py\n# Description: \n# Author: xaoyaoo\n# Date: 2023/12/01\n# -------------------------------------------------------------------------------\nimport sqlite3\nimport time\nfrom collections import Counter\n\nfrom wxdump_linux.db.utils import xml2dict\nfrom wxdump_linux.db import db_msg\n\ndef date_chat_count(chat_data, interval=\"W\"):\n \"\"\"\n 获取每个时间段的聊天数量\n :param chat_data: 聊天数据 json {\"CreateTime\":时间,\"Type\":消息类型,\"SubType\":消息子类型,\"StrContent\":消息内容,\"StrTalker\":聊天对象,\"IsSender\":是否发送者}\n :param interval: 时间间隔 可选值:day、month、year、week\n \"\"\"\n import pandas as pd\n chat_data = pd.DataFrame(chat_data)\n chat_data[\"CreateTime\"] = pd.to_datetime(chat_data[\"CreateTime\"])\n chat_data[\"AdjustedTime\"] = pd.to_datetime(chat_data[\"CreateTime\"]) - pd.Timedelta(hours=4)\n chat_data[\"AdjustedTime\"] = chat_data[\"AdjustedTime\"].dt.strftime(\"%Y-%m-%d %H:%M:%S\")\n chat_data[\"CreateTime\"] = chat_data[\"CreateTime\"].dt.strftime(\"%Y-%m-%d %H:%M:%S\")\n\n interval_dict = {\"day\": \"%Y-%m-%d\", \"month\": \"%Y-%m\", \"year\": \"%Y\", \"week\": \"%Y-%W\",\n \"d\": \"%Y-%m-%d\", \"m\": \"%Y-%m\", \"y\": \"%Y\", \"W\": \"%Y-%W\"\n }\n if interval not in interval_dict:\n raise ValueError(\"interval参数错误,可选值为day、month、year、week\")\n chat_data[\"interval\"] = chat_data[\"AdjustedTime\"].dt.strftime(interval_dict[interval])\n\n # 根据chat_data[\"interval\"]最大值和最小值,生成一个时间间隔列表\n interval_list = pd.date_range(chat_data[\"AdjustedTime\"].min(), chat_data[\"AdjustedTime\"].max(), freq=interval)\n interval_list = interval_list.append(pd.Index([interval_list[-1] + pd.Timedelta(days=1)])) # 最后一天加一天\n\n # 构建数据集\n # interval type_name1 type_name2 type_name3\n # 2021-01 文本数量 其他类型数量 其他类型数量\n # 2021-02 文本数量 其他类型数量 其他类型数量\n type_data = pd.DataFrame(columns=[\"interval\"] + list(chat_data[\"type_name\"].unique()))\n type_data[\"interval\"] = interval_list.strftime(interval_dict[interval])\n type_data = type_data.set_index(\"interval\")\n for type_name in chat_data[\"type_name\"].unique():\n type_data[type_name] = chat_data[chat_data[\"type_name\"] == type_name].groupby(\"interval\").size()\n type_data[\"全部类型\"] = type_data.sum(axis=1)\n type_data[\"发送\"] = chat_data[chat_data[\"IsSender\"] == 1].groupby(\"interval\").size()\n type_data[\"接收\"] = chat_data[chat_data[\"IsSender\"] == 0].groupby(\"interval\").size()\n\n return type_data\n\n\n\ndef read_msgs(MSG_path, selected_talker=None, start_time=time.time() * 3600 * 24 * 365, end_time=time.time()):\n \"\"\"\n 读取消息内容-MSG.db 包含IsSender,StrContent,StrTalker,ype,SubType,CreateTime,MsgSvrID\n :param MSG_path: MSG.db 路径\n :param selected_talker: 选中的聊天对象\n :param start_time: 开始时间 时间戳10位\n :param end_time: 结束时间 时间戳10位\n :return:\n \"\"\"\n type_name_dict = {\n 1: {0: \"文本\"},\n 3: {0: \"图片\"},\n 34: {0: \"语音\"},\n 43: {0: \"视频\"},\n 47: {0: \"动画表情\"},\n 49: {0: \"文本\", 1: \"类文本消息\", 5: \"卡片式链接\", 6: \"文件\", 8: \"上传的GIF表情\",\n 19: \"合并转发聊天记录\", 33: \"分享的小程序\", 36: \"分享的小程序\", 57: \"带有引用的文本\",\n 63: \"视频号直播或回放等\",\n 87: \"群公告\", 88: \"视频号直播或回放等\", 2000: \"转账消息\", 2003: \"红包封面\"},\n 50: {0: \"语音通话\"},\n 10000: {0: \"系统通知\", 4: \"拍一拍\", 8000: \"系统通知\"}\n }\n\n # 连接 MSG_ALL.db 数据库,并执行查询\n db1 = sqlite3.connect(MSG_path)\n cursor1 = db1.cursor()\n\n if isinstance(start_time, str):\n start_time = time.mktime(time.strptime(start_time, \"%Y-%m-%d %H:%M:%S\"))\n if isinstance(end_time, str):\n end_time = time.mktime(time.strptime(end_time, \"%Y-%m-%d %H:%M:%S\"))\n\n if selected_talker is None or selected_talker == \"\": # 如果 selected_talker 为 None,则查询全部对话\n cursor1.execute(\n \"SELECT MsgSvrID,IsSender, StrContent, StrTalker, Type, SubType,CreateTime FROM MSG WHERE CreateTime\u003e=? AND CreateTime\u003c=? ORDER BY CreateTime ASC\",\n (start_time, end_time))\n else:\n cursor1.execute(\n \"SELECT MsgSvrID,IsSender, StrContent, StrTalker, Type, SubType,CreateTime FROM MSG WHERE StrTalker=? AND CreateTime\u003e=? AND CreateTime\u003c=? ORDER BY CreateTime ASC\",\n (selected_talker, start_time, end_time))\n result1 = cursor1.fetchall()\n cursor1.close()\n db1.close()\n\n def get_emoji_cdnurl(row):\n if row[\"type_name\"] == \"动画表情\":\n parsed_content = xml2dict(row[\"StrContent\"])\n if isinstance(parsed_content, dict) and \"emoji\" in parsed_content:\n return parsed_content[\"emoji\"].get(\"cdnurl\", \"\")\n return row[\"content\"]\n\n init_data = pd.DataFrame(result1, columns=[\"MsgSvrID\", \"IsSender\", \"StrContent\", \"StrTalker\", \"Type\", \"SubType\",\n \"CreateTime\"])\n init_data[\"CreateTime\"] = pd.to_datetime(init_data[\"CreateTime\"], unit=\"s\")\n init_data[\"AdjustedTime\"] = init_data[\"CreateTime\"] - pd.Timedelta(hours=4)\n init_data[\"AdjustedTime\"] = init_data[\"AdjustedTime\"].dt.strftime(\"%Y-%m-%d %H:%M:%S\")\n init_data[\"CreateTime\"] = init_data[\"CreateTime\"].dt.strftime(\"%Y-%m-%d %H:%M:%S\")\n init_data[\"type_name\"] = init_data.apply(lambda x: type_name_dict.get(x[\"Type\"], {}).get(x[\"SubType\"], \"未知\"),\n axis=1)\n init_data[\"content\"] = init_data.apply(lambda x: x[\"StrContent\"] if x[\"type_name\"] == \"文本\" else \"\", axis=1)\n init_data[\"content\"] = init_data.apply(get_emoji_cdnurl, axis=1)\n\n init_data[\"content_len\"] = init_data.apply(lambda x: len(x[\"content\"]) if x[\"type_name\"] == \"文本\" else 0, axis=1)\n\n chat_data = init_data[\n [\"MsgSvrID\", \"IsSender\", \"StrTalker\", \"type_name\", \"content\", \"content_len\", \"CreateTime\", \"AdjustedTime\"]]\n\n return True, chat_data\n\n\n# 绘制直方图\ndef draw_hist_all_count(chat_data, out_path=\"\", is_show=False):\n try:\n import matplotlib.pyplot as plt\n except ImportError as e:\n print(\"error\", e)\n raise ImportError(\"请安装matplotlib库\")\n plt.rcParams['font.sans-serif'] = ['SimHei']\n plt.rcParams['axes.unicode_minus'] = False\n\n type_count = Counter(chat_data[\"type_name\"])\n\n # 对type_count按值进行排序,并返回排序后的结果\n sorted_type_count = dict(sorted(type_count.items(), key=lambda item: item[1], reverse=True))\n\n plt.figure(figsize=(12, 8))\n plt.bar(range(len(sorted_type_count)), list(sorted_type_count.values()), tick_label=list(sorted_type_count.keys()))\n plt.title(\"消息类型分布图\")\n plt.xlabel(\"消息类型\")\n plt.ylabel(\"数量\")\n\n # 设置x轴标签的旋转角度为45度\n plt.xticks(rotation=-45)\n\n # 在每个柱上添加数字标签\n for i, v in enumerate(list(sorted_type_count.values())):\n plt.text(i, v, str(v), ha='center', va='bottom')\n\n if out_path != \"\":\n plt.savefig(out_path)\n if is_show:\n plt.show()\n plt.close()\n\n\n# 按照interval绘制折线图\ndef draw_line_type_name(chat_data, interval=\"W\", type_name_list=None, out_path=\"\", is_show=False):\n \"\"\"\n 绘制折线图,横轴为时间,纵轴为消息数量,不同类型的消息用不同的颜色表示\n :param chat_data:\n :param interval:\n :param type_name_list: 消息类型列表,按照列表中的顺序绘制折线图 可选:全部类型、发送、接收、总字数、发送字数、接收字数、其他类型\n :param out_path:\n :param is_show:\n :return:\n \"\"\"\n if type_name_list is None:\n type_name_list = [\"全部类型\", \"发送\", \"接收\"] + [\"总字数\", \"发送字数\", \"接收字数\"]\n # type_name_list = [\"总字数\", \"发送字数\", \"接收字数\"]\n\n try:\n import matplotlib.pyplot as plt\n import pandas as pd\n except ImportError as e:\n print(\"error\", e)\n raise ImportError(\"请安装matplotlib库\")\n plt.rcParams['font.sans-serif'] = ['SimHei']\n plt.rcParams['axes.unicode_minus'] = False\n\n chat_data[\"CreateTime\"] = pd.to_datetime(chat_data[\"CreateTime\"])\n chat_data[\"AdjustedTime\"] = pd.to_datetime(chat_data[\"AdjustedTime\"])\n\n # interval = interval.lower()\n interval_dict = {\"day\": \"%Y-%m-%d\", \"month\": \"%Y-%m\", \"year\": \"%Y\", \"week\": \"%Y-%W\",\n \"d\": \"%Y-%m-%d\", \"m\": \"%Y-%m\", \"y\": \"%Y\", \"W\": \"%Y-%W\"\n }\n if interval not in interval_dict:\n raise ValueError(\"interval参数错误,可选值为day、month、year、week\")\n chat_data[\"interval\"] = chat_data[\"AdjustedTime\"].dt.strftime(interval_dict[interval])\n\n # 根据chat_data[\"interval\"]最大值和最小值,生成一个时间间隔列表\n interval_list = pd.date_range(chat_data[\"AdjustedTime\"].min(), chat_data[\"AdjustedTime\"].max(), freq=interval)\n interval_list = interval_list.append(pd.Index([interval_list[-1] + pd.Timedelta(days=1)])) # 最后一天加一天\n\n # 构建数据集\n # interval type_name1 type_name2 type_name3\n # 2021-01 文本数量 其他类型数量 其他类型数量\n # 2021-02 文本数量 其他类型数量 其他类型数量\n type_data = pd.DataFrame(columns=[\"interval\"] + list(chat_data[\"type_name\"].unique()))\n type_data[\"interval\"] = interval_list.strftime(interval_dict[interval])\n type_data = type_data.set_index(\"interval\")\n for type_name in chat_data[\"type_name\"].unique():\n type_data[type_name] = chat_data[chat_data[\"type_name\"] == type_name].groupby(\"interval\").size()\n type_data[\"全部类型\"] = type_data.sum(axis=1)\n type_data[\"发送\"] = chat_data[chat_data[\"IsSender\"] == 1].groupby(\"interval\").size()\n type_data[\"接收\"] = chat_data[chat_data[\"IsSender\"] == 0].groupby(\"interval\").size()\n\n type_data[\"总字数\"] = chat_data.groupby(\"interval\")[\"content_len\"].sum()\n type_data[\"发送字数\"] = chat_data[chat_data[\"IsSender\"] == 1].groupby(\"interval\")[\"content_len\"].sum()\n type_data[\"接收字数\"] = chat_data[chat_data[\"IsSender\"] == 0].groupby(\"interval\")[\"content_len\"].sum()\n\n type_data = type_data.fillna(0)\n # 调整typename顺序,使其按照总数量排序,只要最大的5个\n type_data = type_data.reindex(type_data.sum().sort_values(ascending=False).index, axis=1)\n if type_name_list is not None:\n type_data = type_data[type_name_list]\n else:\n type_data = type_data.iloc[:, :5]\n\n # if interval == \"W\" or interval == \"week\": # 改为当前周的周一的日期\n # #\n\n plt.figure(figsize=(12, 8))\n\n # 绘制折线图\n for type_name in type_data.columns:\n plt.plot(type_data.index, type_data[type_name], label=type_name)\n\n # 设置x轴标签的旋转角度为45度\n plt.xticks(rotation=-45)\n # 设置标题、坐标轴标签、图例等信息\n plt.title(\"消息类型分布图\")\n plt.xlabel(\"时间\")\n plt.ylabel(\"数量\")\n\n plt.legend(loc=\"upper right\") # 设置图例位置\n\n # 显示图形\n if out_path != \"\":\n plt.savefig(out_path)\n if is_show:\n plt.tight_layout()\n plt.show()\n plt.close()\n\n\n\ndef wordcloud_generator(chat_data, interval=\"m\", stopwords=None, out_path=\"\", is_show=False, bg_img=None,\n font=\"C:\\Windows\\Fonts\\simhei.ttf\"):\n \"\"\"\n 词云\n :param is_show: 是否显示\n :param img_path: 背景图片路径\n :param text: 文本\n :param font: 字体路径\n :return:\n \"\"\"\n try:\n from wordcloud import WordCloud, ImageColorGenerator\n import wordcloud\n import jieba\n import numpy as np\n import matplotlib.pyplot as plt\n from matplotlib.font_manager import fontManager\n import pandas as pd\n import codecs\n import re\n from imageio import imread\n except ImportError as e:\n print(\"error\", e)\n raise ImportError(\"请安装wordcloud,jieba,numpy,matplotlib,pillow库\")\n\n plt.rcParams['font.sans-serif'] = ['SimHei']\n plt.rcParams['axes.unicode_minus'] = False\n\n chat_data[\"CreateTime\"] = pd.to_datetime(chat_data[\"CreateTime\"])\n chat_data[\"AdjustedTime\"] = pd.to_datetime(chat_data[\"AdjustedTime\"])\n\n # interval = interval.lower()\n interval_dict = {\"day\": \"%Y-%m-%d\", \"month\": \"%Y-%m\", \"year\": \"%Y\", \"week\": \"%Y-%W\",\n \"d\": \"%Y-%m-%d\", \"m\": \"%Y-%m\", \"y\": \"%Y\", \"W\": \"%Y-%W\"\n }\n if interval not in interval_dict:\n raise ValueError(\"interval参数错误,可选值为day、month、year、week\")\n chat_data[\"interval\"] = chat_data[\"AdjustedTime\"].dt.strftime(interval_dict[interval])\n\n # 根据chat_data[\"interval\"]最大值和最小值,生成一个时间间隔列表\n interval_list = pd.date_range(chat_data[\"AdjustedTime\"].min(), chat_data[\"AdjustedTime\"].max(), freq=interval)\n interval_list = interval_list.append(pd.Index([interval_list[-1] + pd.Timedelta(days=1)])) # 最后一天加一天\n\n # 构建数据集\n # interval text_all text_sender text_receiver\n # 2021-01 文本\\n合并 聊天记录\\n文本\\n合并 聊天记录\\n文本\\n合并 聊天记录\\n\n def merage_text(x):\n pattern = re.compile(\"(\\[.+?\\])\") # 匹配表情\n rt = \"\\n\".join(x)\n rt = pattern.sub('', rt).replace(\"\\n\", \" \")\n return rt\n\n chat_data[\"content\"] = chat_data.apply(lambda x: x[\"content\"] if x[\"type_name\"] == \"文本\" else \"\", axis=1)\n\n text_data = pd.DataFrame(columns=[\"interval\", \"text_all\", \"text_sender\", \"text_receiver\"])\n text_data[\"interval\"] = interval_list.strftime(interval_dict[interval])\n text_data = text_data.set_index(\"interval\")\n # 使用“\\n”合并\n text_data[\"text_all\"] = chat_data.groupby(\"interval\")[\"content\"].apply(merage_text)\n text_data[\"text_sender\"] = chat_data[chat_data[\"IsSender\"] == 1].groupby(\"interval\")[\"content\"].apply(merage_text)\n text_data[\"text_receiver\"] = chat_data[chat_data[\"IsSender\"] == 0].groupby(\"interval\")[\"content\"].apply(merage_text)\n\n def gen_img(texts,out_path,is_show,bg_img,title=\"\"):\n words = jieba.lcut(texts)\n res = [word for word in words if word not in stopwords and word.replace(\" \", \"\") != \"\" and len(word) \u003e 1]\n count_dict = dict(Counter(res))\n\n if bg_img:\n bgimg = imread(open(bg_img, 'rb'))\n # 获得词云对象,设定词云背景颜色及其图片和字体\n wc = WordCloud(background_color='white', mask=bgimg, font_path='simhei.ttf', mode='RGBA', include_numbers=False,\n random_state=0)\n else:\n # 如果你的背景色是透明的,请用这两条语句替换上面两条\n bgimg = None\n wc = WordCloud(background_color='white', mode='RGBA', font_path='simhei.ttf', include_numbers=False,\n random_state=0,width=500, height=500) # 如果不指定中文字体路径,词云会乱码\n wc = wc.fit_words(count_dict)\n\n fig = plt.figure(figsize=(8, 8))\n fig.suptitle(title, fontsize=26)\n ax = fig.subplots()\n\n ax.imshow(wc)\n ax.axis('off')\n\n if out_path != \"\":\n plt.savefig(out_path)\n if is_show:\n plt.show()\n plt.close()\n\n for i in text_data.index:\n out_path = f\"out/img_{i}.png\"\n gen_img(text_data[\"text_all\"][i], out_path=out_path, is_show=False, bg_img=bg_img, title=f\"全部({i})\")\n # gen_img(text_data[\"text_sender\"][i], out_path=\"\", is_show=is_show, bg_img=bg_img, title=f\"发送_{i}\")\n # gen_img(text_data[\"text_receiver\"][i], out_path=\"\", is_show=is_show, bg_img=bg_img, title=f\"接收_{i}\")\n # time.sleep(1)\n\n# 情感分析\ndef sentiment_analysis(chat_data, stopwords=\"\", out_path=\"\", is_show=False, bg_img=None):\n try:\n from snownlp import SnowNLP\n import pandas as pd\n import matplotlib.pyplot as plt\n import seaborn as sns\n\n except ImportError as e:\n print(\"error\", e)\n raise ImportError(\"请安装snownlp,pandas,matplotlib,seaborn库\")\n\n sns.set_style('white', {'font.sans-serif': ['simhei', 'FangSong']})\n\n chats = []\n for row in chat_data:\n if row[\"type_name\"] != \"文本\" or row[\"content\"] == \"\":\n continue\n chats.append(row)\n\n scores = []\n for row in chats:\n s = SnowNLP(row[\"content\"])\n scores.append(s.sentiments)\n\n def draw(data):\n df = pd.DataFrame({'Sentiment Score': data})\n plt.figure(figsize=(8, 6))\n sns.histplot(data=df, x='Sentiment Score', kde=True)\n plt.title(\"Sentiment Analysis\")\n plt.xlabel(\"Sentiment Score\")\n plt.ylabel(\"Frequency\")\n\n if out_path != \"\":\n plt.savefig(out_path)\n if is_show:\n plt.show()\n plt.close()\n\n draw(scores)\n\n\nif __name__ == '__main__':\n MSG_PATH = r\"\"\n selected_talker = \"wxid_\"\n start_time = time.time() - 3600 * 24 * 50000\n end_time = time.time()\n code, chat_data = read_msgs(MSG_PATH, selected_talker, start_time, end_time)\n # print(chat_data)\n # code, data, classify_count, all_type_count = merge_chat_data(chat_data, interval=\"month\")\n # draw_hist_all_count(chat_data, is_show=True) # 绘制直方图 消息类型分布图\n # draw_line_type_name(chat_data, is_show=True) # 绘制折线图 消息类型分布图\n\n # bg_img = 'img.png'\n stopwords = ['的', '了', '是', '在', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到',\n '说', '要',\n '去', '你', '会', '着', '没有', '看', '好', '自己', '这']\n wordcloud_generator(chat_data, stopwords=stopwords, out_path=\"\", is_show=True)\n # sentiment_analysis(chat_data)\n","is_binary":false,"path":"wxdump_linux/analyzer/chat_analysis.py","ref":""}