解锁并提取Linux客户端微信数据库 (vibe coded)
1# -*- coding: utf-8 -*-#
2# -------------------------------------------------------------------------------
3# Name: analyser.py
4# Description:
5# Author: xaoyaoo
6# Date: 2023/12/01
7# -------------------------------------------------------------------------------
8import sqlite3
9import time
10from collections import Counter
11
12from wxdump_linux.db.utils import xml2dict
13from wxdump_linux.db import db_msg
14
15def date_chat_count(chat_data, interval="W"):
16 """
17 获取每个时间段的聊天数量
18 :param chat_data: 聊天数据 json {"CreateTime":时间,"Type":消息类型,"SubType":消息子类型,"StrContent":消息内容,"StrTalker":聊天对象,"IsSender":是否发送者}
19 :param interval: 时间间隔 可选值:day、month、year、week
20 """
21 import pandas as pd
22 chat_data = pd.DataFrame(chat_data)
23 chat_data["CreateTime"] = pd.to_datetime(chat_data["CreateTime"])
24 chat_data["AdjustedTime"] = pd.to_datetime(chat_data["CreateTime"]) - pd.Timedelta(hours=4)
25 chat_data["AdjustedTime"] = chat_data["AdjustedTime"].dt.strftime("%Y-%m-%d %H:%M:%S")
26 chat_data["CreateTime"] = chat_data["CreateTime"].dt.strftime("%Y-%m-%d %H:%M:%S")
27
28 interval_dict = {"day": "%Y-%m-%d", "month": "%Y-%m", "year": "%Y", "week": "%Y-%W",
29 "d": "%Y-%m-%d", "m": "%Y-%m", "y": "%Y", "W": "%Y-%W"
30 }
31 if interval not in interval_dict:
32 raise ValueError("interval参数错误,可选值为day、month、year、week")
33 chat_data["interval"] = chat_data["AdjustedTime"].dt.strftime(interval_dict[interval])
34
35 # 根据chat_data["interval"]最大值和最小值,生成一个时间间隔列表
36 interval_list = pd.date_range(chat_data["AdjustedTime"].min(), chat_data["AdjustedTime"].max(), freq=interval)
37 interval_list = interval_list.append(pd.Index([interval_list[-1] + pd.Timedelta(days=1)])) # 最后一天加一天
38
39 # 构建数据集
40 # interval type_name1 type_name2 type_name3
41 # 2021-01 文本数量 其他类型数量 其他类型数量
42 # 2021-02 文本数量 其他类型数量 其他类型数量
43 type_data = pd.DataFrame(columns=["interval"] + list(chat_data["type_name"].unique()))
44 type_data["interval"] = interval_list.strftime(interval_dict[interval])
45 type_data = type_data.set_index("interval")
46 for type_name in chat_data["type_name"].unique():
47 type_data[type_name] = chat_data[chat_data["type_name"] == type_name].groupby("interval").size()
48 type_data["全部类型"] = type_data.sum(axis=1)
49 type_data["发送"] = chat_data[chat_data["IsSender"] == 1].groupby("interval").size()
50 type_data["接收"] = chat_data[chat_data["IsSender"] == 0].groupby("interval").size()
51
52 return type_data
53
54
55
56def read_msgs(MSG_path, selected_talker=None, start_time=time.time() * 3600 * 24 * 365, end_time=time.time()):
57 """
58 读取消息内容-MSG.db 包含IsSender,StrContent,StrTalker,ype,SubType,CreateTime,MsgSvrID
59 :param MSG_path: MSG.db 路径
60 :param selected_talker: 选中的聊天对象
61 :param start_time: 开始时间 时间戳10位
62 :param end_time: 结束时间 时间戳10位
63 :return:
64 """
65 type_name_dict = {
66 1: {0: "文本"},
67 3: {0: "图片"},
68 34: {0: "语音"},
69 43: {0: "视频"},
70 47: {0: "动画表情"},
71 49: {0: "文本", 1: "类文本消息", 5: "卡片式链接", 6: "文件", 8: "上传的GIF表情",
72 19: "合并转发聊天记录", 33: "分享的小程序", 36: "分享的小程序", 57: "带有引用的文本",
73 63: "视频号直播或回放等",
74 87: "群公告", 88: "视频号直播或回放等", 2000: "转账消息", 2003: "红包封面"},
75 50: {0: "语音通话"},
76 10000: {0: "系统通知", 4: "拍一拍", 8000: "系统通知"}
77 }
78
79 # 连接 MSG_ALL.db 数据库,并执行查询
80 db1 = sqlite3.connect(MSG_path)
81 cursor1 = db1.cursor()
82
83 if isinstance(start_time, str):
84 start_time = time.mktime(time.strptime(start_time, "%Y-%m-%d %H:%M:%S"))
85 if isinstance(end_time, str):
86 end_time = time.mktime(time.strptime(end_time, "%Y-%m-%d %H:%M:%S"))
87
88 if selected_talker is None or selected_talker == "": # 如果 selected_talker 为 None,则查询全部对话
89 cursor1.execute(
90 "SELECT MsgSvrID,IsSender, StrContent, StrTalker, Type, SubType,CreateTime FROM MSG WHERE CreateTime>=? AND CreateTime<=? ORDER BY CreateTime ASC",
91 (start_time, end_time))
92 else:
93 cursor1.execute(
94 "SELECT MsgSvrID,IsSender, StrContent, StrTalker, Type, SubType,CreateTime FROM MSG WHERE StrTalker=? AND CreateTime>=? AND CreateTime<=? ORDER BY CreateTime ASC",
95 (selected_talker, start_time, end_time))
96 result1 = cursor1.fetchall()
97 cursor1.close()
98 db1.close()
99
100 def get_emoji_cdnurl(row):
101 if row["type_name"] == "动画表情":
102 parsed_content = xml2dict(row["StrContent"])
103 if isinstance(parsed_content, dict) and "emoji" in parsed_content:
104 return parsed_content["emoji"].get("cdnurl", "")
105 return row["content"]
106
107 init_data = pd.DataFrame(result1, columns=["MsgSvrID", "IsSender", "StrContent", "StrTalker", "Type", "SubType",
108 "CreateTime"])
109 init_data["CreateTime"] = pd.to_datetime(init_data["CreateTime"], unit="s")
110 init_data["AdjustedTime"] = init_data["CreateTime"] - pd.Timedelta(hours=4)
111 init_data["AdjustedTime"] = init_data["AdjustedTime"].dt.strftime("%Y-%m-%d %H:%M:%S")
112 init_data["CreateTime"] = init_data["CreateTime"].dt.strftime("%Y-%m-%d %H:%M:%S")
113 init_data["type_name"] = init_data.apply(lambda x: type_name_dict.get(x["Type"], {}).get(x["SubType"], "未知"),
114 axis=1)
115 init_data["content"] = init_data.apply(lambda x: x["StrContent"] if x["type_name"] == "文本" else "", axis=1)
116 init_data["content"] = init_data.apply(get_emoji_cdnurl, axis=1)
117
118 init_data["content_len"] = init_data.apply(lambda x: len(x["content"]) if x["type_name"] == "文本" else 0, axis=1)
119
120 chat_data = init_data[
121 ["MsgSvrID", "IsSender", "StrTalker", "type_name", "content", "content_len", "CreateTime", "AdjustedTime"]]
122
123 return True, chat_data
124
125
126# 绘制直方图
127def draw_hist_all_count(chat_data, out_path="", is_show=False):
128 try:
129 import matplotlib.pyplot as plt
130 except ImportError as e:
131 print("error", e)
132 raise ImportError("请安装matplotlib库")
133 plt.rcParams['font.sans-serif'] = ['SimHei']
134 plt.rcParams['axes.unicode_minus'] = False
135
136 type_count = Counter(chat_data["type_name"])
137
138 # 对type_count按值进行排序,并返回排序后的结果
139 sorted_type_count = dict(sorted(type_count.items(), key=lambda item: item[1], reverse=True))
140
141 plt.figure(figsize=(12, 8))
142 plt.bar(range(len(sorted_type_count)), list(sorted_type_count.values()), tick_label=list(sorted_type_count.keys()))
143 plt.title("消息类型分布图")
144 plt.xlabel("消息类型")
145 plt.ylabel("数量")
146
147 # 设置x轴标签的旋转角度为45度
148 plt.xticks(rotation=-45)
149
150 # 在每个柱上添加数字标签
151 for i, v in enumerate(list(sorted_type_count.values())):
152 plt.text(i, v, str(v), ha='center', va='bottom')
153
154 if out_path != "":
155 plt.savefig(out_path)
156 if is_show:
157 plt.show()
158 plt.close()
159
160
161# 按照interval绘制折线图
162def draw_line_type_name(chat_data, interval="W", type_name_list=None, out_path="", is_show=False):
163 """
164 绘制折线图,横轴为时间,纵轴为消息数量,不同类型的消息用不同的颜色表示
165 :param chat_data:
166 :param interval:
167 :param type_name_list: 消息类型列表,按照列表中的顺序绘制折线图 可选:全部类型、发送、接收、总字数、发送字数、接收字数、其他类型
168 :param out_path:
169 :param is_show:
170 :return:
171 """
172 if type_name_list is None:
173 type_name_list = ["全部类型", "发送", "接收"] + ["总字数", "发送字数", "接收字数"]
174 # type_name_list = ["总字数", "发送字数", "接收字数"]
175
176 try:
177 import matplotlib.pyplot as plt
178 import pandas as pd
179 except ImportError as e:
180 print("error", e)
181 raise ImportError("请安装matplotlib库")
182 plt.rcParams['font.sans-serif'] = ['SimHei']
183 plt.rcParams['axes.unicode_minus'] = False
184
185 chat_data["CreateTime"] = pd.to_datetime(chat_data["CreateTime"])
186 chat_data["AdjustedTime"] = pd.to_datetime(chat_data["AdjustedTime"])
187
188 # interval = interval.lower()
189 interval_dict = {"day": "%Y-%m-%d", "month": "%Y-%m", "year": "%Y", "week": "%Y-%W",
190 "d": "%Y-%m-%d", "m": "%Y-%m", "y": "%Y", "W": "%Y-%W"
191 }
192 if interval not in interval_dict:
193 raise ValueError("interval参数错误,可选值为day、month、year、week")
194 chat_data["interval"] = chat_data["AdjustedTime"].dt.strftime(interval_dict[interval])
195
196 # 根据chat_data["interval"]最大值和最小值,生成一个时间间隔列表
197 interval_list = pd.date_range(chat_data["AdjustedTime"].min(), chat_data["AdjustedTime"].max(), freq=interval)
198 interval_list = interval_list.append(pd.Index([interval_list[-1] + pd.Timedelta(days=1)])) # 最后一天加一天
199
200 # 构建数据集
201 # interval type_name1 type_name2 type_name3
202 # 2021-01 文本数量 其他类型数量 其他类型数量
203 # 2021-02 文本数量 其他类型数量 其他类型数量
204 type_data = pd.DataFrame(columns=["interval"] + list(chat_data["type_name"].unique()))
205 type_data["interval"] = interval_list.strftime(interval_dict[interval])
206 type_data = type_data.set_index("interval")
207 for type_name in chat_data["type_name"].unique():
208 type_data[type_name] = chat_data[chat_data["type_name"] == type_name].groupby("interval").size()
209 type_data["全部类型"] = type_data.sum(axis=1)
210 type_data["发送"] = chat_data[chat_data["IsSender"] == 1].groupby("interval").size()
211 type_data["接收"] = chat_data[chat_data["IsSender"] == 0].groupby("interval").size()
212
213 type_data["总字数"] = chat_data.groupby("interval")["content_len"].sum()
214 type_data["发送字数"] = chat_data[chat_data["IsSender"] == 1].groupby("interval")["content_len"].sum()
215 type_data["接收字数"] = chat_data[chat_data["IsSender"] == 0].groupby("interval")["content_len"].sum()
216
217 type_data = type_data.fillna(0)
218 # 调整typename顺序,使其按照总数量排序,只要最大的5个
219 type_data = type_data.reindex(type_data.sum().sort_values(ascending=False).index, axis=1)
220 if type_name_list is not None:
221 type_data = type_data[type_name_list]
222 else:
223 type_data = type_data.iloc[:, :5]
224
225 # if interval == "W" or interval == "week": # 改为当前周的周一的日期
226 # #
227
228 plt.figure(figsize=(12, 8))
229
230 # 绘制折线图
231 for type_name in type_data.columns:
232 plt.plot(type_data.index, type_data[type_name], label=type_name)
233
234 # 设置x轴标签的旋转角度为45度
235 plt.xticks(rotation=-45)
236 # 设置标题、坐标轴标签、图例等信息
237 plt.title("消息类型分布图")
238 plt.xlabel("时间")
239 plt.ylabel("数量")
240
241 plt.legend(loc="upper right") # 设置图例位置
242
243 # 显示图形
244 if out_path != "":
245 plt.savefig(out_path)
246 if is_show:
247 plt.tight_layout()
248 plt.show()
249 plt.close()
250
251
252
253def wordcloud_generator(chat_data, interval="m", stopwords=None, out_path="", is_show=False, bg_img=None,
254 font="C:\Windows\Fonts\simhei.ttf"):
255 """
256 词云
257 :param is_show: 是否显示
258 :param img_path: 背景图片路径
259 :param text: 文本
260 :param font: 字体路径
261 :return:
262 """
263 try:
264 from wordcloud import WordCloud, ImageColorGenerator
265 import wordcloud
266 import jieba
267 import numpy as np
268 import matplotlib.pyplot as plt
269 from matplotlib.font_manager import fontManager
270 import pandas as pd
271 import codecs
272 import re
273 from imageio import imread
274 except ImportError as e:
275 print("error", e)
276 raise ImportError("请安装wordcloud,jieba,numpy,matplotlib,pillow库")
277
278 plt.rcParams['font.sans-serif'] = ['SimHei']
279 plt.rcParams['axes.unicode_minus'] = False
280
281 chat_data["CreateTime"] = pd.to_datetime(chat_data["CreateTime"])
282 chat_data["AdjustedTime"] = pd.to_datetime(chat_data["AdjustedTime"])
283
284 # interval = interval.lower()
285 interval_dict = {"day": "%Y-%m-%d", "month": "%Y-%m", "year": "%Y", "week": "%Y-%W",
286 "d": "%Y-%m-%d", "m": "%Y-%m", "y": "%Y", "W": "%Y-%W"
287 }
288 if interval not in interval_dict:
289 raise ValueError("interval参数错误,可选值为day、month、year、week")
290 chat_data["interval"] = chat_data["AdjustedTime"].dt.strftime(interval_dict[interval])
291
292 # 根据chat_data["interval"]最大值和最小值,生成一个时间间隔列表
293 interval_list = pd.date_range(chat_data["AdjustedTime"].min(), chat_data["AdjustedTime"].max(), freq=interval)
294 interval_list = interval_list.append(pd.Index([interval_list[-1] + pd.Timedelta(days=1)])) # 最后一天加一天
295
296 # 构建数据集
297 # interval text_all text_sender text_receiver
298 # 2021-01 文本\n合并 聊天记录\n文本\n合并 聊天记录\n文本\n合并 聊天记录\n
299 def merage_text(x):
300 pattern = re.compile("(\[.+?\])") # 匹配表情
301 rt = "\n".join(x)
302 rt = pattern.sub('', rt).replace("\n", " ")
303 return rt
304
305 chat_data["content"] = chat_data.apply(lambda x: x["content"] if x["type_name"] == "文本" else "", axis=1)
306
307 text_data = pd.DataFrame(columns=["interval", "text_all", "text_sender", "text_receiver"])
308 text_data["interval"] = interval_list.strftime(interval_dict[interval])
309 text_data = text_data.set_index("interval")
310 # 使用“\n”合并
311 text_data["text_all"] = chat_data.groupby("interval")["content"].apply(merage_text)
312 text_data["text_sender"] = chat_data[chat_data["IsSender"] == 1].groupby("interval")["content"].apply(merage_text)
313 text_data["text_receiver"] = chat_data[chat_data["IsSender"] == 0].groupby("interval")["content"].apply(merage_text)
314
315 def gen_img(texts,out_path,is_show,bg_img,title=""):
316 words = jieba.lcut(texts)
317 res = [word for word in words if word not in stopwords and word.replace(" ", "") != "" and len(word) > 1]
318 count_dict = dict(Counter(res))
319
320 if bg_img:
321 bgimg = imread(open(bg_img, 'rb'))
322 # 获得词云对象,设定词云背景颜色及其图片和字体
323 wc = WordCloud(background_color='white', mask=bgimg, font_path='simhei.ttf', mode='RGBA', include_numbers=False,
324 random_state=0)
325 else:
326 # 如果你的背景色是透明的,请用这两条语句替换上面两条
327 bgimg = None
328 wc = WordCloud(background_color='white', mode='RGBA', font_path='simhei.ttf', include_numbers=False,
329 random_state=0,width=500, height=500) # 如果不指定中文字体路径,词云会乱码
330 wc = wc.fit_words(count_dict)
331
332 fig = plt.figure(figsize=(8, 8))
333 fig.suptitle(title, fontsize=26)
334 ax = fig.subplots()
335
336 ax.imshow(wc)
337 ax.axis('off')
338
339 if out_path != "":
340 plt.savefig(out_path)
341 if is_show:
342 plt.show()
343 plt.close()
344
345 for i in text_data.index:
346 out_path = f"out/img_{i}.png"
347 gen_img(text_data["text_all"][i], out_path=out_path, is_show=False, bg_img=bg_img, title=f"全部({i})")
348 # gen_img(text_data["text_sender"][i], out_path="", is_show=is_show, bg_img=bg_img, title=f"发送_{i}")
349 # gen_img(text_data["text_receiver"][i], out_path="", is_show=is_show, bg_img=bg_img, title=f"接收_{i}")
350 # time.sleep(1)
351
352# 情感分析
353def sentiment_analysis(chat_data, stopwords="", out_path="", is_show=False, bg_img=None):
354 try:
355 from snownlp import SnowNLP
356 import pandas as pd
357 import matplotlib.pyplot as plt
358 import seaborn as sns
359
360 except ImportError as e:
361 print("error", e)
362 raise ImportError("请安装snownlp,pandas,matplotlib,seaborn库")
363
364 sns.set_style('white', {'font.sans-serif': ['simhei', 'FangSong']})
365
366 chats = []
367 for row in chat_data:
368 if row["type_name"] != "文本" or row["content"] == "":
369 continue
370 chats.append(row)
371
372 scores = []
373 for row in chats:
374 s = SnowNLP(row["content"])
375 scores.append(s.sentiments)
376
377 def draw(data):
378 df = pd.DataFrame({'Sentiment Score': data})
379 plt.figure(figsize=(8, 6))
380 sns.histplot(data=df, x='Sentiment Score', kde=True)
381 plt.title("Sentiment Analysis")
382 plt.xlabel("Sentiment Score")
383 plt.ylabel("Frequency")
384
385 if out_path != "":
386 plt.savefig(out_path)
387 if is_show:
388 plt.show()
389 plt.close()
390
391 draw(scores)
392
393
394if __name__ == '__main__':
395 MSG_PATH = r""
396 selected_talker = "wxid_"
397 start_time = time.time() - 3600 * 24 * 50000
398 end_time = time.time()
399 code, chat_data = read_msgs(MSG_PATH, selected_talker, start_time, end_time)
400 # print(chat_data)
401 # code, data, classify_count, all_type_count = merge_chat_data(chat_data, interval="month")
402 # draw_hist_all_count(chat_data, is_show=True) # 绘制直方图 消息类型分布图
403 # draw_line_type_name(chat_data, is_show=True) # 绘制折线图 消息类型分布图
404
405 # bg_img = 'img.png'
406 stopwords = ['的', '了', '是', '在', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到',
407 '说', '要',
408 '去', '你', '会', '着', '没有', '看', '好', '自己', '这']
409 wordcloud_generator(chat_data, stopwords=stopwords, out_path="", is_show=True)
410 # sentiment_analysis(chat_data)