Commit 70bb81d
src/history/d1.py
@@ -15,7 +15,7 @@ from pyrogram.types import Message
from config import DOWNLOAD_DIR, HISTORY, TZ, cache, cutter
from database.d1 import create_d1_table, insert_d1, query_d1
-from history.utils import CHAT_COLUMNS, MSG_COLUMNS, MSG_INDEXES, USER_COLUMNS, USER_INDEXES, check_save_history, fine_grained_check, get_chat
+from history.utils import CHAT_COLUMNS, MSG_COLUMNS, MSG_INDEXES, USER_COLUMNS, USER_INDEXES, can_delete_history, check_save_history, fine_grained_check, get_chat
from messages.parser import parse_chat, parse_msg
from utils import i_am_bot, nowdt, slim_cid, to_int, true
@@ -29,6 +29,9 @@ async def sync_history_to_d1(client: Client, message: Message) -> None:
"""
if not HISTORY.D1_ENABLE:
return
+ if isinstance(message, list): # this is deleted messages
+ await delete_messages(message)
+ return
info = parse_msg(message, silent=True, use_cache=False)
if not check_save_history(info["ctype"], info["cid"]) or not fine_grained_check(info) or message.service:
return
@@ -53,6 +56,29 @@ async def sync_history_to_d1(client: Client, message: Message) -> None:
await query_d1(**insert_d1(chatinfo["tablename"], records, update_on_conflict="mid"), db_name=HISTORY.D1_DATABASE, silent=True)
+async def delete_messages(messages: Message | list[Message]) -> None:
+ """Delete messages from D1 database."""
+ if not isinstance(messages, list):
+ messages = [messages]
+ for message in messages:
+ cid = glom(message, "chat.id", default=0) or 0
+ mid = glom(message, "id", default=0) or 0
+ ctype = glom(message, "chat.type.name", default="") or ""
+ if not check_save_history(ctype, cid) or message.service:
+ return
+ chatinfo = await get_d1_chatinfo(cid)
+ if not chatinfo:
+ continue
+ tablename = chatinfo["tablename"]
+ resp = await query_d1(f"SELECT * FROM '{tablename}' WHERE mid={mid};", db_name=HISTORY.D1_DATABASE, silent=True)
+ uid = glom(resp, "result.0.results.0.uid", default=0) or 0
+ if not uid:
+ continue
+ if can_delete_history(cid, uid):
+ logger.warning(f"Delete message Chat={cid}, ID={mid}: {glom(resp, 'result.0.results.0')}")
+ await query_d1(f"DELETE FROM '{tablename}' WHERE mid={mid};", db_name=HISTORY.D1_DATABASE, silent=True)
+
+
async def backup_chat_history_to_d1(
client: Client,
chat_id: str | int,
src/history/sync.py
@@ -16,7 +16,7 @@ from history.utils import TURSO_KWARGS
from utils import strings_list
-async def sync_chat_history(client: Client, message: Message) -> None:
+async def sync_chat_history(client: Client, message: Message | list[Message]) -> None:
if not HISTORY.ENABLE:
return
if "TURSO" in HISTORY.ENGINE.upper():
src/history/turso.py
@@ -14,7 +14,7 @@ from pyrogram.types import Message
from config import DOWNLOAD_DIR, HISTORY, TZ, cache, cutter
from database.turso import insert_statement, turso_create_table, turso_exec, turso_parse_resp
-from history.utils import CHAT_COLUMNS, MSG_COLUMNS, MSG_INDEXES, TURSO_KWARGS, USER_COLUMNS, USER_INDEXES, check_save_history, fine_grained_check, get_chat
+from history.utils import CHAT_COLUMNS, MSG_COLUMNS, MSG_INDEXES, TURSO_KWARGS, USER_COLUMNS, USER_INDEXES, can_delete_history, check_save_history, fine_grained_check, get_chat
from messages.parser import parse_chat, parse_msg
from utils import i_am_bot, nowdt, slim_cid, to_int, true
@@ -28,6 +28,9 @@ async def sync_history_to_turso(client: Client, message: Message) -> None:
"""
if not HISTORY.TURSO_ENABLE:
return
+ if isinstance(message, list): # this is deleted messages
+ await delete_messages(message)
+ return
info = parse_msg(message, silent=True, use_cache=False)
if not check_save_history(info["ctype"], info["cid"]) or not fine_grained_check(info) or message.service:
return
@@ -53,6 +56,31 @@ async def sync_history_to_turso(client: Client, message: Message) -> None:
await turso_exec([insert_statement(chatinfo["tablename"], records, update_on_conflict="mid")], silent=True, retry=2, **TURSO_KWARGS)
+async def delete_messages(messages: Message | list[Message]) -> None:
+ """Delete messages from Turso database."""
+ if not isinstance(messages, list):
+ messages = [messages]
+ for message in messages:
+ cid = glom(message, "chat.id", default=0) or 0
+ mid = glom(message, "id", default=0) or 0
+ ctype = glom(message, "chat.type.name", default="") or ""
+ if not check_save_history(ctype, cid) or message.service:
+ return
+ chatinfo = await get_turso_chatinfo(cid)
+ if not chatinfo:
+ continue
+ tablename = chatinfo["tablename"]
+ resp = await turso_exec([{"type": "execute", "stmt": {"sql": f"SELECT * FROM '{tablename}' WHERE mid={mid};"}}], silent=True, retry=2, **TURSO_KWARGS)
+ parsed = turso_parse_resp(resp)
+ if not parsed:
+ continue
+ uid = parsed[0]["uid"]
+ if can_delete_history(cid, uid):
+ logger.warning(f"Delete message Chat={cid}, ID={mid}: {parsed[0]}")
+ sql = f"DELETE FROM '{tablename}' WHERE mid={mid};"
+ await turso_exec([{"type": "execute", "stmt": {"sql": sql}}], silent=True, retry=2, **TURSO_KWARGS)
+
+
async def backup_chat_history_to_turso(
client: Client,
chat_id: str | int,
src/history/utils.py
@@ -64,6 +64,19 @@ def check_save_history(ctype: str, cid: int | str) -> bool:
return False
+@cache.memoize(ttl=0)
+def can_delete_history(cid: int | str, uid: int | str) -> bool:
+ # ruff: noqa: SIM103
+ cid = slim_cid(cid)
+ if true(os.getenv(f"HISTORY_CAN_DEL_C{cid}")):
+ return True
+ if true(os.getenv(f"HISTORY_CAN_DEL_U{uid}")):
+ return True
+ if true(os.getenv(f"HISTORY_CAN_DEL_C{cid}_U{uid}")):
+ return True
+ return False
+
+
def fine_grained_check(info: dict) -> bool:
"""由于有些对话不需要保存所有类型的聊天历史, 这里检查是否需要跳过.
src/main.py
@@ -91,7 +91,8 @@ async def main():
@app.on_message(group=1)
@app.on_edited_message(group=1)
- async def save_history(client: Client, message: Message):
+ @app.on_deleted_messages(group=1)
+ async def save_history(client: Client, message: Message | list[Message]):
await sync_chat_history(client, message)
if ENABLE.CRONTAB: