Commit 114a92c

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-06-23 17:58:31
refactor(crontab): refactor crontab jobs to run at different intervals
1 parent d3c9225
Changed files (6)
src/danmu/sync.py
@@ -8,7 +8,7 @@ from zoneinfo import ZoneInfo
 from glom import flatten, glom
 from loguru import logger
 
-from config import DANMU, DB, TZ, cache, cutter
+from config import DANMU, DB, TZ, cutter
 from danmu.utils import merge_json, simplify_json
 from database.r2 import get_cf_r2, list_cf_r2, set_cf_r2
 from database.turso import insert_statement, turso_create_table, turso_exec
@@ -35,7 +35,6 @@ TURSO_KWARGS: dict = {
 }
 
 
-@cache.memoize(ttl=3600)
 async def sync_livechats() -> None:
     if not DANMU.SYNC_ENABLE:
         return
src/llm/summary.py
@@ -295,8 +295,6 @@ async def daily_summary(client: Client):
     if now.hour not in durations:
         return
     duration = durations[now.hour]
-    if now.minute != 0:
-        return
     mapping = {}  # summarize chat id -> send to chat id
     try:
         mapping = json.loads(TID.DAILY_SUMMARY)
src/llm/utils.py
@@ -14,7 +14,7 @@ from loguru import logger
 from markitdown import MarkItDown
 from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM, BLOCKQUOTE_EXPANDABLE_END_DELIM
 
-from config import DOWNLOAD_DIR, GEMINI, GPT, PREFIX, cache
+from config import DOWNLOAD_DIR, GEMINI, GPT, PREFIX
 from utils import nowdt, number_to_emoji, read_text, remove_consecutive_newlines, remove_dash, remove_pound, strings_list, zhcn
 
 BOT_TIPS = "(回复以继续)"  # noqa: RUF001
@@ -292,12 +292,10 @@ def sample_key(keys: str | list[str]) -> str:
     return random.choice(keys)
 
 
-@cache.memoize(ttl=1800)
 async def clean_gemini_files():
     """Clean Gemini files.
 
     Gemini allows only 20 GB of data.
-    Clean every half an hour.
     """
     if GEMINI.CLEAN_FILES_AFTER_SECONDS >= 48 * 3600:
         return
src/others/podcast.py
@@ -36,7 +36,6 @@ HEADERS = {
 AUDIO_EXT = [".aac", ".amr", ".flac", ".m4a", ".mp3", ".oga", ".ogg", ".opus", ".wav", ".wma"]
 
 
-@cache.memoize(ttl=PODCAST.UPDATE_INTERVAL)
 async def summary_pods(client: Client):
     """Summary podcast RSS feeds."""
     pods = await get_all_pods()
src/config.py
@@ -269,7 +269,6 @@ class PODCAST:
     ASR_ENGINE = os.getenv("PODCAST_ASR_ENGINE", "gemini")  # default ASR engine
     ASR_FORCE_GEMINI_TITLES = os.getenv("PODCAST_ASR_FORCE_GEMINI_TITLES", "")  # comma separated titles for force Gemini ASR. (Bypass censorship)
     ASR_FORCE_GEMINI_DOMAINS = os.getenv("PODCAST_ASR_FORCE_GEMINI_DOMAINS", "anchor.fm,feeds.acast.com")  # comma separated domains for force Gemini ASR. (Bypass censorship)
-    UPDATE_INTERVAL = int(os.getenv("PODCAST_UPDATE_INTERVAL", "3600"))  # in seconds
     IGNORE_OLD_THAN_SECONDS = int(os.getenv("PODCAST_IGNORE_OLD_THAN_SECONDS", "14400"))  # in seconds
     KEEP_LATEST_ENTRIES = int(os.getenv("PODCAST_KEEP_LATEST_ENTRIES", "99999999"))  # keep latest entries
 
src/main.py
@@ -2,7 +2,6 @@
 # -*- coding: utf-8 -*-
 import argparse
 import asyncio
-import contextlib
 import json
 import logging
 import os
@@ -31,7 +30,7 @@ from messages.parser import parse_msg
 from others.podcast import summary_pods
 from permission import check_permission
 from price.entrypoint import match_symbol_category
-from utils import cleanup_old_files, nowdt, to_int
+from utils import cleanup_old_files, to_int
 
 
 async def main():
@@ -100,7 +99,10 @@ async def main():
 
     if ENABLE.CRONTAB:
         scheduler = AsyncIOScheduler()
-        scheduler.add_job(scheduling, "interval", args=[app], seconds=60)  # run crontab jobs every 60 seconds
+        scheduler.add_job(cron_secondly, "interval", args=[app], seconds=1)
+        scheduler.add_job(cron_minutely, "cron", args=[app], second=0)
+        scheduler.add_job(cron_hourly, "cron", args=[app], minute=0)
+        scheduler.add_job(cron_daily, "cron", args=[app], hour=8, minute=30, timezone=TZ)
         logging.getLogger("apscheduler.scheduler").setLevel(logging.ERROR)
         scheduler.start()
 
@@ -109,30 +111,36 @@ async def main():
     await app.stop()
 
 
-async def scheduling(client: Client):
+async def cron_secondly(client: Client):
+    pass
+
+
+async def cron_minutely(client: Client):
     cache.evict()  # delete expired cache
     cleanup_old_files()
+    await backup_chat_history(client)
+
+
+async def cron_hourly(client: Client):
+    await daily_summary(client)
+    await clean_gemini_files()
+    await sync_livechats()
     if ENABLE.CACHE_PRICE_SYMBOLS:
         await match_symbol_category()  # to cache all supported symbols
+    await summary_pods(client)
 
-    # custom crontab jobs
-    await daily_summary(client)
 
-    now = nowdt(TZ)
-    if now.hour == 8 and now.minute == 0:  # daliy messages
-        daliy = {}
-        try:
-            daliy = json.loads(DAILY_MESSAGES)
-        except (JSONDecodeError, TypeError):
-            logger.warning(f"Invalid DAILY_MESSAGES: {DAILY_MESSAGES}")
-        with contextlib.suppress(Exception):
-            for chat_id, msg in daliy.items():
-                logger.info(f"Sending daily message to {chat_id}: {msg}")
-                await client.send_message(to_int(chat_id), msg)
-    await summary_pods(client)
-    await sync_livechats()
-    await clean_gemini_files()
-    await backup_chat_history(client)
+async def cron_daily(client: Client):
+    # send daliy messages
+    try:
+        daliy = json.loads(DAILY_MESSAGES)
+        for chat_id, msg in daliy.items():
+            logger.info(f"Sending daily message to {chat_id}: {msg}")
+            await client.send_message(to_int(chat_id), msg)
+    except (JSONDecodeError, TypeError):
+        logger.warning(f"Invalid DAILY_MESSAGES: {DAILY_MESSAGES}")
+    except Exception as e:
+        logger.warning(f"Error sending daily message: {e}")
 
 
 if __name__ == "__main__":