Commit a0c231e

benny-dou <60535774+benny-dou@users.noreply.github.com>
2026-01-18 05:41:47
feat(r2): add automatic cleanup for expired files in Cloudflare R2
1 parent 2bfd5fd
Changed files (3)
src/database/r2.py
@@ -1,6 +1,8 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
+import contextlib
 import os
+import re
 import warnings
 from datetime import timedelta
 from urllib.parse import unquote_plus
@@ -34,33 +36,35 @@ async def list_cf_r2(
     """Get from Cloudflare R2."""
     if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
         return {}
-    async with Session().client(
-        service_name="s3",
-        endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
-        aws_access_key_id=aws_access_key_id,
-        aws_secret_access_key=aws_secret_access_key,
-        region_name="auto",
-    ) as s3:  # type: ignore
-        payload = {"Bucket": bucket_name, "MaxKeys": 1000}
-        if continuation_token:
-            payload["ContinuationToken"] = continuation_token
-        if prefix:
-            payload["Prefix"] = prefix
-        contents = []
-        try:
-            results = await s3.list_objects_v2(**payload)
-            if not results.get("IsTruncated"):
-                return results
-            contents.extend(results.get("Contents", []))
-            while results.get("NextContinuationToken"):
-                payload["ContinuationToken"] = results["NextContinuationToken"]
+    results = {}
+    with contextlib.suppress(Exception):
+        async with Session().client(
+            service_name="s3",
+            endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
+            aws_access_key_id=aws_access_key_id,
+            aws_secret_access_key=aws_secret_access_key,
+            region_name="auto",
+        ) as s3:  # type: ignore
+            payload = {"Bucket": bucket_name, "MaxKeys": 1000}
+            if continuation_token:
+                payload["ContinuationToken"] = continuation_token
+            if prefix:
+                payload["Prefix"] = prefix
+            contents = []
+            try:
                 results = await s3.list_objects_v2(**payload)
+                if not results.get("IsTruncated"):
+                    return results
                 contents.extend(results.get("Contents", []))
-            results["Contents"] = contents
-        except Exception as e:
-            logger.warning(f"List CF-R2 failed for {prefix=}: {e}")
-            return {}
-        return results
+                while results.get("NextContinuationToken"):
+                    payload["ContinuationToken"] = results["NextContinuationToken"]
+                    results = await s3.list_objects_v2(**payload)
+                    contents.extend(results.get("Contents", []))
+                results["Contents"] = contents
+            except Exception as e:
+                logger.warning(f"List CF-R2 failed for {prefix=}: {e}")
+                return {}
+    return results
 
 
 async def get_cf_r2(
@@ -217,3 +221,37 @@ async def head_cf_r2(
             if not silent:
                 logger.warning(f"`{key}` is not exist in CF-R2")
     return {}
+
+
+async def clean_r2_expired():
+    """Clean expired files in CF-R2.
+
+    TTL/{num}h  # expire in {num} hours
+    TTL/{num}d  # expire in {num} days
+    TTL/{num}w  # expire in {num} weeks
+    TTL/{num}M  # expire in {num} months
+    """
+    now = nowdt("UTC")
+    r2 = await list_cf_r2(prefix="TTL/")
+    items = r2.get("Contents", [])
+    for item in items:
+        key = item.get("Key", "")
+        dt = item.get("LastModified", now)
+        if not dt:
+            continue
+        if matched := re.match(r"TTL/(\d+)([hdwM])", key):
+            num, unit = matched.groups()
+            num = int(num)
+            if unit == "h":
+                expires = dt + timedelta(hours=num)
+            elif unit == "d":
+                expires = dt + timedelta(days=num)
+            elif unit == "w":
+                expires = dt + timedelta(weeks=num)
+            elif unit == "M":
+                expires = dt + timedelta(days=num * 30)
+            else:
+                expires = dt + timedelta(days=365)  # default to 1 year
+            if expires < now:
+                logger.debug(f"Delete expired R2 key: {key}")
+                await del_cf_r2(key)
src/main.py
@@ -24,6 +24,7 @@ from bridge.ocr import forward_ocr_results
 from bridge.social import forward_social_media_results
 from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TOKEN, TZ, cache
 from danmu.sync import sync_livechats
+from database.r2 import clean_r2_expired
 from history.sync import backup_chat_history, sync_chat_history
 from messages.main import process_message
 from messages.parser import parse_msg
@@ -120,6 +121,7 @@ async def cron_minutely(client: Client):
 async def cron_hourly(client: Client):
     await daily_summary(client)
     await clean_gemini_files()
+    await clean_r2_expired()
     await sync_livechats()
     if ENABLE.CACHE_PRICE_SYMBOLS:
         await match_symbol_category()  # to cache all supported symbols
src/publish.py
@@ -4,7 +4,6 @@ import contextlib
 import io
 import tempfile
 from pathlib import Path
-from typing import Literal
 from urllib.parse import quote_plus
 
 import anyio
@@ -25,7 +24,7 @@ async def publish_telegraph(
     html: str = "",
     author: str | None = None,
     url: str | None = None,
-    ttl: Literal["forever", "1d", "7d", "30d"] = "forever",
+    ttl: str = "forever",  # 12h, 7d, 1M, ...
 ) -> str:
     """Publish to Telegraph."""
 
@@ -70,7 +69,7 @@ async def publish_cf_r2(
     html: str = "",
     author: str | None = None,
     url: str | None = None,
-    ttl: Literal["forever", "1d", "7d", "30d"] = "forever",
+    ttl: str = "forever",
 ) -> str:
     """Publish to CF R2."""
     if not (texts or html):