main
 1#!/usr/bin/env python
 2# -*- coding: utf-8 -*-
 3import os
 4from typing import Literal
 5
 6from glom import glom
 7from loguru import logger
 8from pyrogram.client import Client
 9from pyrogram.types import Message
10
11from config import HISTORY, cache
12from database.turso import turso_exec, turso_parse_resp
13from history.d1 import backup_chat_history_to_d1, query_d1, sync_history_to_d1
14from history.turso import backup_chat_history_to_turso, sync_history_to_turso
15from history.utils import TURSO_KWARGS
16from utils import strings_list
17
18
19async def sync_chat_history(client: Client, message: Message | list[Message]) -> None:
20    if not HISTORY.ENABLE:
21        return
22    if "TURSO" in HISTORY.ENGINE.upper():
23        await sync_history_to_turso(client, message)
24    if "D1" in HISTORY.ENGINE.upper():
25        await sync_history_to_d1(client, message)
26
27
28async def backup_chat_history(
29    client: Client,
30    chats: str = HISTORY.PERIODICALLY_BACKUP_CHATS,
31    hours: float = HISTORY.BACKUP_CHATS_HOURS,
32    *,
33    start_from: Literal["latest", "oldest"] = "latest",
34) -> None:
35    if not HISTORY.ENABLE:
36        return
37
38    default_interval = 12 * 3600  # 12 hours
39    # if `chats` is set to "full_table", backup all chats in `chatinfo` table
40    if chats == "full_table":
41        if "TURSO" in HISTORY.ENGINE.upper():
42            resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM 'chatinfo';"}}], silent=True, retry=2, **TURSO_KWARGS)
43            tables = turso_parse_resp(resp)
44            for x in tables:
45                interval = os.getenv(f"HISTORY_BACKUP_CHATS_INTERVAL_{x['cid']}", default_interval)
46                if cache.get(f"backup_chat_history_{x['cid']}"):
47                    continue
48                cache.set(f"backup_chat_history_{x['cid']}", 1, ttl=int(interval))
49                handle = x["chandle"] or int(x["cid"])
50                logger.info(f"Backup chat history to Turso: {handle}")
51                duration = float(os.getenv(f"HISTORY_BACKUP_CHATS_DURATION_{x['cid']}", hours * 3600)) / 3600
52                await backup_chat_history_to_turso(client, handle, duration, start_from=start_from)
53        if "D1" in HISTORY.ENGINE.upper():
54            resp = await query_d1("SELECT * FROM 'chatinfo';", db_name=HISTORY.D1_DATABASE, silent=True)
55            tables = glom(resp, "result.0.results", default=[])
56            for x in tables:
57                interval = os.getenv(f"HISTORY_BACKUP_CHATS_INTERVAL_{x['cid']}", default_interval)
58                if cache.get(f"backup_chat_history_{x['cid']}"):
59                    continue
60                cache.set(f"backup_chat_history_{x['cid']}", 1, ttl=int(interval))
61                handle = x["chandle"] or int(x["cid"])
62                logger.info(f"Backup chat history to D1: {handle}")
63                duration = float(os.getenv(f"HISTORY_BACKUP_CHATS_DURATION_{x['cid']}", hours * 3600)) / 3600
64                await backup_chat_history_to_d1(client, handle, duration, start_from=start_from)
65    else:
66        chat_ids = [x.strip() for x in strings_list(chats)]
67        for cid in chat_ids:
68            logger.info(f"Backup chat history: {cid}")
69            if "TURSO" in HISTORY.ENGINE.upper():
70                await backup_chat_history_to_turso(client, cid, hours, start_from=start_from)
71            if "D1" in HISTORY.ENGINE.upper():
72                await backup_chat_history_to_d1(client, cid, hours, start_from=start_from)