main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import os
4from typing import Literal
5
6from glom import glom
7from loguru import logger
8from pyrogram.client import Client
9from pyrogram.types import Message
10
11from config import HISTORY, cache
12from database.turso import turso_exec, turso_parse_resp
13from history.d1 import backup_chat_history_to_d1, query_d1, sync_history_to_d1
14from history.turso import backup_chat_history_to_turso, sync_history_to_turso
15from history.utils import TURSO_KWARGS
16from utils import strings_list
17
18
19async def sync_chat_history(client: Client, message: Message | list[Message]) -> None:
20 if not HISTORY.ENABLE:
21 return
22 if "TURSO" in HISTORY.ENGINE.upper():
23 await sync_history_to_turso(client, message)
24 if "D1" in HISTORY.ENGINE.upper():
25 await sync_history_to_d1(client, message)
26
27
28async def backup_chat_history(
29 client: Client,
30 chats: str = HISTORY.PERIODICALLY_BACKUP_CHATS,
31 hours: float = HISTORY.BACKUP_CHATS_HOURS,
32 *,
33 start_from: Literal["latest", "oldest"] = "latest",
34) -> None:
35 if not HISTORY.ENABLE:
36 return
37
38 default_interval = 12 * 3600 # 12 hours
39 # if `chats` is set to "full_table", backup all chats in `chatinfo` table
40 if chats == "full_table":
41 if "TURSO" in HISTORY.ENGINE.upper():
42 resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM 'chatinfo';"}}], silent=True, retry=2, **TURSO_KWARGS)
43 tables = turso_parse_resp(resp)
44 for x in tables:
45 interval = os.getenv(f"HISTORY_BACKUP_CHATS_INTERVAL_{x['cid']}", default_interval)
46 if cache.get(f"backup_chat_history_{x['cid']}"):
47 continue
48 cache.set(f"backup_chat_history_{x['cid']}", 1, ttl=int(interval))
49 handle = x["chandle"] or int(x["cid"])
50 logger.info(f"Backup chat history to Turso: {handle}")
51 duration = float(os.getenv(f"HISTORY_BACKUP_CHATS_DURATION_{x['cid']}", hours * 3600)) / 3600
52 await backup_chat_history_to_turso(client, handle, duration, start_from=start_from)
53 if "D1" in HISTORY.ENGINE.upper():
54 resp = await query_d1("SELECT * FROM 'chatinfo';", db_name=HISTORY.D1_DATABASE, silent=True)
55 tables = glom(resp, "result.0.results", default=[])
56 for x in tables:
57 interval = os.getenv(f"HISTORY_BACKUP_CHATS_INTERVAL_{x['cid']}", default_interval)
58 if cache.get(f"backup_chat_history_{x['cid']}"):
59 continue
60 cache.set(f"backup_chat_history_{x['cid']}", 1, ttl=int(interval))
61 handle = x["chandle"] or int(x["cid"])
62 logger.info(f"Backup chat history to D1: {handle}")
63 duration = float(os.getenv(f"HISTORY_BACKUP_CHATS_DURATION_{x['cid']}", hours * 3600)) / 3600
64 await backup_chat_history_to_d1(client, handle, duration, start_from=start_from)
65 else:
66 chat_ids = [x.strip() for x in strings_list(chats)]
67 for cid in chat_ids:
68 logger.info(f"Backup chat history: {cid}")
69 if "TURSO" in HISTORY.ENGINE.upper():
70 await backup_chat_history_to_turso(client, cid, hours, start_from=start_from)
71 if "D1" in HISTORY.ENGINE.upper():
72 await backup_chat_history_to_d1(client, cid, hours, start_from=start_from)