{"contents":"# -*- coding: utf-8 -*-#\n# -------------------------------------------------------------------------------\n# Name: merge_db.py\n# Description: \n# Author: xaoyaoo\n# Date: 2023/12/03\n# -------------------------------------------------------------------------------\nimport os\nimport shutil\nimport sqlite3\nimport time\nfrom typing import List\n\nfrom .decryption import batch_decrypt\nfrom .utils import wx_core_loger, wx_core_error\n\n\n@wx_core_error\ndef execute_sql(connection, sql, params=None):\n \"\"\"\n 执行给定的SQL语句,返回结果。\n 参数:\n - connection: SQLite连接\n - sql:要执行的SQL语句\n - params:SQL语句中的参数\n \"\"\"\n try:\n # connection.text_factory = bytes\n cursor = connection.cursor()\n if params:\n cursor.execute(sql, params)\n else:\n cursor.execute(sql)\n return cursor.fetchall()\n except Exception as e:\n try:\n connection.text_factory = bytes\n cursor = connection.cursor()\n if params:\n cursor.execute(sql, params)\n else:\n cursor.execute(sql)\n rdata = cursor.fetchall()\n connection.text_factory = str\n return rdata\n except Exception as e:\n wx_core_loger.error(f\"**********\\nSQL: {sql}\\nparams: {params}\\n{e}\\n**********\", exc_info=True)\n return None\n\n\n@wx_core_error\ndef check_create_sync_log(connection):\n \"\"\"\n 检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间\n :param connection: SQLite连接\n :return: True or False\n \"\"\"\n\n out_cursor = connection.cursor()\n # 检查是否存在表 sync_log,用于记录同步记录,包括微信数据库路径,表名,记录数,同步时间\n sync_log_status = execute_sql(connection, \"SELECT name FROM sqlite_master WHERE type='table' AND name='sync_log'\")\n if len(sync_log_status) \u003c 1:\n # db_path 微信数据库路径,tbl_name 表名,src_count 源数据库记录数,current_count 当前合并后的数据库对应表记录数\n sync_record_create_sql = (\"CREATE TABLE sync_log (\"\n \"id INTEGER PRIMARY KEY AUTOINCREMENT,\"\n \"db_path TEXT NOT NULL,\"\n \"tbl_name TEXT NOT NULL,\"\n \"src_count INT,\"\n \"current_count INT,\"\n \"createTime INT DEFAULT (strftime('%s', 'now')), \"\n \"updateTime INT DEFAULT (strftime('%s', 'now'))\"\n \");\")\n out_cursor.execute(sync_record_create_sql)\n # 创建索引\n out_cursor.execute(\"CREATE INDEX idx_sync_log_db_path ON sync_log (db_path);\")\n out_cursor.execute(\"CREATE INDEX idx_sync_log_tbl_name ON sync_log (tbl_name);\")\n # 创建联合索引,防止重复\n out_cursor.execute(\"CREATE UNIQUE INDEX idx_sync_log_db_tbl ON sync_log (db_path, tbl_name);\")\n connection.commit()\n out_cursor.close()\n return True\n\n\n@wx_core_error\ndef check_create_file_md5(connection):\n \"\"\"\n 检查是否存在表 file_md5,用于记录文件信息,后续用于去重等操作,暂时闲置\n \"\"\"\n pass\n\n\n@wx_core_error\ndef merge_db(db_paths: List[dict], save_path: str = \"merge.db\", is_merge_data: bool = True,\n startCreateTime: int = 0, endCreateTime: int = 0):\n \"\"\"\n 合并数据库 会忽略主键以及重复的行。\n :param db_paths: [{\"db_path\": \"xxx\", \"de_path\": \"xxx\"},...]\n db_path表示初始路径,de_path表示解密后的路径;初始路径用于保存合并的日志情况,解密后的路径用于读取数据\n :param save_path: str 输出文件路径\n :param is_merge_data: bool 是否合并数据(如果为False,则只解密,并创建表,不插入数据)\n :param startCreateTime: 开始时间戳 主要用于MSG数据库的合并\n :param endCreateTime: 结束时间戳 主要用于MSG数据库的合并\n :return:\n \"\"\"\n if os.path.isdir(save_path):\n save_path = os.path.join(save_path, f\"merge_{int(time.time())}.db\")\n\n if isinstance(db_paths, list):\n # alias, file_path\n databases = {f\"dbi_{i}\": (db['db_path'],\n db.get('de_path', db['db_path'])\n ) for i, db in enumerate(db_paths)\n }\n else:\n raise TypeError(\"db_paths 类型错误\")\n outdb = sqlite3.connect(save_path)\n\n is_sync_log = check_create_sync_log(outdb)\n if not is_sync_log:\n wx_core_loger.warning(\"创建同步记录表失败\")\n\n out_cursor = outdb.cursor()\n\n # 将MSG_db_paths中的数据合并到out_db_path中\n for alias, db in databases.items():\n db_path = db[0]\n de_path = db[1]\n\n # 附加数据库\n sql_attach = f\"ATTACH DATABASE '{de_path}' AS {alias}\"\n out_cursor.execute(sql_attach)\n outdb.commit()\n sql_query_tbl_name = f\"SELECT tbl_name, sql FROM {alias}.sqlite_master WHERE type='table' ORDER BY tbl_name;\"\n tables = execute_sql(outdb, sql_query_tbl_name)\n for table in tables:\n table, init_create_sql = table[0], table[1]\n table = table if isinstance(table, str) else table.decode()\n init_create_sql = init_create_sql if isinstance(init_create_sql, str) else init_create_sql.decode()\n if table == \"sqlite_sequence\":\n continue\n if \"CREATE TABLE\".lower() not in str(init_create_sql).lower():\n continue\n # 获取表中的字段名\n sql_query_columns = f\"PRAGMA table_info({table})\"\n columns = execute_sql(outdb, sql_query_columns)\n if table == \"ChatInfo\" and len(columns) \u003e 12: # bizChat中的ChatInfo表与MicroMsg中的ChatInfo表字段不同\n continue\n col_type = {\n (i[1] if isinstance(i[1], str) else i[1].decode(),\n i[2] if isinstance(i[2], str) else i[2].decode())\n for i in columns}\n columns = [i[0] for i in col_type]\n if not columns or len(columns) \u003c 1:\n continue\n # 创建表table\n sql_create_tbl = f\"CREATE TABLE IF NOT EXISTS {table} AS SELECT * FROM {alias}.{table} WHERE 0 = 1;\"\n out_cursor.execute(sql_create_tbl)\n # 创建包含 NULL 值比较的 UNIQUE 索引\n index_name = f\"{table}_unique_index\"\n coalesce_columns = ','.join(f\"COALESCE({column}, '')\" for column in columns)\n sql = f\"CREATE UNIQUE INDEX IF NOT EXISTS {index_name} ON {table} ({coalesce_columns})\"\n out_cursor.execute(sql)\n\n # 插入sync_log\n sql_query_sync_log = f\"SELECT src_count FROM sync_log WHERE db_path=? AND tbl_name=?\"\n sync_log = execute_sql(outdb, sql_query_sync_log, (db_path, table))\n if not sync_log or len(sync_log) \u003c 1:\n sql_insert_sync_log = \"INSERT INTO sync_log (db_path, tbl_name, src_count, current_count) VALUES (?, ?, ?, ?)\"\n out_cursor.execute(sql_insert_sync_log, (db_path, table, 0, 0))\n outdb.commit()\n\n if is_merge_data:\n # 比较源数据库和合并后的数据库记录数\n log_src_count = execute_sql(outdb, sql_query_sync_log, (db_path, table))[0][0]\n src_count = execute_sql(outdb, f\"SELECT COUNT(*) FROM {alias}.{table}\")[0][0]\n if src_count \u003c= log_src_count:\n wx_core_loger.info(f\"忽略 {db_path} {de_path} {table} {src_count} {log_src_count}\")\n continue\n\n # 构建数据查询sql\n sql_base = f\"SELECT {','.join([i for i in columns])} FROM {alias}.{table} \"\n where_clauses, params = [], []\n if \"CreateTime\" in columns:\n if startCreateTime \u003e 0:\n where_clauses.append(\"CreateTime \u003e ?\")\n params.append(startCreateTime)\n if endCreateTime \u003e 0:\n where_clauses.append(\"CreateTime \u003c ?\")\n params.append(endCreateTime)\n # 如果有WHERE子句,将其添加到SQL语句中,并添加ORDER BY子句\n sql = f\"{sql_base} WHERE {' AND '.join(where_clauses)} ORDER BY CreateTime\" if where_clauses else sql_base\n src_data = execute_sql(outdb, sql, tuple(params))\n if not src_data or len(src_data) \u003c 1:\n continue\n # 插入数据\n sql = f\"INSERT OR IGNORE INTO {table} ({','.join([i for i in columns])}) VALUES ({','.join(['?'] * len(columns))})\"\n try:\n out_cursor.executemany(sql, src_data)\n\n # update sync_log\n sql_update_sync_log = (\"UPDATE sync_log \"\n \"SET src_count = ? ,\"\n f\"current_count=(SELECT COUNT(*) FROM {table}) \"\n \"WHERE db_path=? AND tbl_name=?\")\n out_cursor.execute(sql_update_sync_log, (src_count, db_path, table))\n\n except Exception as e:\n wx_core_loger.error(\n f\"error: {db_path}\\n{de_path}\\n{table}\\n{sql}\\n{src_data}\\n{len(src_data)}\\n{e}\\n\",\n exc_info=True)\n # 分离数据库\n sql_detach = f\"DETACH DATABASE {alias}\"\n out_cursor.execute(sql_detach)\n outdb.commit()\n out_cursor.close()\n outdb.close()\n return save_path\n\n\n@wx_core_error\ndef decrypt_merge(wx_path: str, outpath: str = \"\",\n merge_save_path: str = None,\n is_merge_data=True, is_del_decrypted: bool = True,\n startCreateTime: int = 0, endCreateTime: int = 0,\n db_type=None) -\u003e (bool, str):\n \"\"\"\n 解密合并数据库(Linux 版)。自动从进程内存提取密钥,解密后合并。\n :param wx_path: 微信数据目录\n :param outpath: 输出路径\n :param merge_save_path: 合并后的数据库路径\n :param is_merge_data: 是否合并数据(如果为False,则只解密,并创建表,不插入数据)\n :param is_del_decrypted: 是否删除解密后的数据库(除了合并后的数据库)\n :param startCreateTime: 开始时间戳 主要用于MSG数据库的合并\n :param endCreateTime: 结束时间戳 主要用于MSG数据库的合并\n :param db_type: 数据库类型,从核心数据库中选择\n :return: (true,合并后的数据库路径) or (false,错误信息)\n \"\"\"\n from ..linux.memscan import extract_all_keys\n\n if db_type is None:\n db_type = []\n\n outpath = outpath if outpath else \"decrypt_merge_tmp\"\n merge_save_path = os.path.join(outpath,\n f\"merge_{int(time.time())}.db\") if merge_save_path is None else merge_save_path\n decrypted_path = os.path.join(outpath, \"decrypted\")\n\n if not wx_path or not os.path.exists(wx_path):\n wx_core_loger.error(\"参数错误: wx_path 不存在\", exc_info=True)\n return False, \"参数错误: wx_path 不存在\"\n\n # 从进程内存提取密钥\n try:\n db_keys = extract_all_keys(wx_path)\n except RuntimeError as e:\n wx_core_loger.error(f\"提取密钥失败: {e}\", exc_info=True)\n return False, f\"提取密钥失败: {e}\"\n\n if not db_keys:\n wx_core_loger.error(\"未提取到任何数据库密钥\")\n return False, \"未提取到任何数据库密钥\"\n\n # 判断out_path是否为空目录\n if os.path.exists(decrypted_path) and os.listdir(decrypted_path):\n for root, dirs, files in os.walk(decrypted_path, topdown=False):\n for name in files:\n os.remove(os.path.join(root, name))\n for name in dirs:\n os.rmdir(os.path.join(root, name))\n\n if not os.path.exists(decrypted_path):\n os.makedirs(decrypted_path)\n\n # 解密\n code, ret = batch_decrypt(db_keys, decrypted_path)\n if not code:\n wx_core_loger.error(f\"解密失败{ret}\", exc_info=True)\n return False, ret\n\n out_dbs = []\n for code1, ret1 in ret:\n if code1:\n out_dbs.append(ret1)\n\n parpare_merge_db_path = []\n for db_path, out_path, _ in out_dbs:\n parpare_merge_db_path.append({\"db_path\": db_path, \"de_path\": out_path})\n merge_save_path = merge_db(parpare_merge_db_path, merge_save_path, is_merge_data=is_merge_data,\n startCreateTime=startCreateTime, endCreateTime=endCreateTime)\n if is_del_decrypted:\n shutil.rmtree(decrypted_path, True)\n if isinstance(merge_save_path, str):\n return True, merge_save_path\n else:\n return False, \"未知错误\"\n","is_binary":false,"path":"wxdump_linux/wx_core/merge_db.py","ref":""}