Commit ce5f48e
Changed files (1)
src
history
src/history/sync.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
+import os
from typing import Literal
from glom import glom
@@ -33,23 +34,33 @@ async def backup_chat_history(
if not HISTORY.ENABLE:
return
- if cache.get("backup_chat_history"):
- return
- cache.set("backup_chat_history", 1, ttl=12 * 3600) # backup every 12 hours
+ default_interval = 12 * 3600 # 12 hours
# if `chats` is set to "full_table", backup all chats in `chatinfo` table
if chats == "full_table":
if "TURSO" in HISTORY.ENGINE.upper():
resp = await turso_exec([{"type": "execute", "stmt": {"sql": "SELECT * FROM 'chatinfo';"}}], silent=True, retry=2, **TURSO_KWARGS)
tables = turso_parse_resp(resp)
- for cid in [x["chandle"] or int(x["cid"]) for x in tables]:
- logger.info(f"Backup chat history to Turso: {cid}")
- await backup_chat_history_to_turso(client, cid, hours, start_from=start_from)
+ for x in tables:
+ interval = os.getenv(f"HISTORY_BACKUP_CHATS_INTERVAL_{x['cid']}", default_interval)
+ if cache.get(f"backup_chat_history_{x['cid']}"):
+ continue
+ cache.set(f"backup_chat_history_{x['cid']}", 1, ttl=int(interval))
+ handle = x["chandle"] or int(x["cid"])
+ logger.info(f"Backup chat history to Turso: {handle}")
+ duration = float(os.getenv(f"HISTORY_BACKUP_CHATS_DURATION_{x['cid']}", hours * 3600)) / 3600
+ await backup_chat_history_to_turso(client, handle, duration, start_from=start_from)
if "D1" in HISTORY.ENGINE.upper():
resp = await query_d1("SELECT * FROM 'chatinfo';", db_name=HISTORY.D1_DATABASE, silent=True)
tables = glom(resp, "result.0.results", default=[])
- for cid in [x["chandle"] or int(x["cid"]) for x in tables]:
- logger.info(f"Backup chat history to D1: {cid}")
- await backup_chat_history_to_d1(client, cid, hours, start_from=start_from)
+ for x in tables:
+ interval = os.getenv(f"HISTORY_BACKUP_CHATS_INTERVAL_{x['cid']}", default_interval)
+ if cache.get(f"backup_chat_history_{x['cid']}"):
+ continue
+ cache.set(f"backup_chat_history_{x['cid']}", 1, ttl=int(interval))
+ handle = x["chandle"] or int(x["cid"])
+ logger.info(f"Backup chat history to D1: {handle}")
+ duration = float(os.getenv(f"HISTORY_BACKUP_CHATS_DURATION_{x['cid']}", hours * 3600)) / 3600
+ await backup_chat_history_to_d1(client, handle, duration, start_from=start_from)
else:
chat_ids = [x.strip() for x in chats.split(",") if x.strip()]
for cid in chat_ids: