main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import contextlib
  4import json
  5import os
  6import re
  7import warnings
  8from datetime import timedelta
  9from urllib.parse import unquote_plus
 10
 11from aioboto3 import Session
 12from botocore.exceptions import ClientError
 13from loguru import logger
 14
 15from config import DB
 16from utils import bare_url, nowdt, stringfy
 17
 18# hot fix: https://developers.cloudflare.com/r2/examples/aws/boto3/
 19os.environ["AWS_REQUEST_CHECKSUM_CALCULATION"] = "when_required"
 20os.environ["AWS_RESPONSE_CHECKSUM_VALIDATION"] = "when_required"
 21
 22warnings.filterwarnings("ignore", category=DeprecationWarning)
 23
 24
 25async def list_cf_r2(
 26    prefix: str = "",
 27    continuation_token: str | None = None,
 28    bucket_name: str = DB.CF_R2_BUCKET_NAME,
 29    account_id: str = DB.CF_ACCOUNT_ID,
 30    aws_access_key_id: str = DB.CF_R2_ACCESS_KEY_ID,
 31    aws_secret_access_key: str = DB.CF_R2_SECRET_ACCESS_KEY,
 32    *,
 33    enabled: bool = DB.CF_R2_ENABLED,
 34) -> dict:
 35    """Get from Cloudflare R2."""
 36    if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
 37        return {}
 38    results = {}
 39    with contextlib.suppress(Exception):
 40        async with Session().client(
 41            service_name="s3",
 42            endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
 43            aws_access_key_id=aws_access_key_id,
 44            aws_secret_access_key=aws_secret_access_key,
 45            region_name="auto",
 46        ) as s3:
 47            payload = {"Bucket": bucket_name, "MaxKeys": 1000}
 48            if continuation_token:
 49                payload["ContinuationToken"] = continuation_token
 50            if prefix:
 51                payload["Prefix"] = prefix
 52            contents = []
 53            try:
 54                results = await s3.list_objects_v2(**payload)
 55                if not results.get("IsTruncated"):
 56                    return results
 57                contents.extend(results.get("Contents", []))
 58                while results.get("NextContinuationToken"):
 59                    payload["ContinuationToken"] = results["NextContinuationToken"]
 60                    results = await s3.list_objects_v2(**payload)
 61                    contents.extend(results.get("Contents", []))
 62                results["Contents"] = contents
 63            except Exception as e:
 64                logger.warning(f"List CF-R2 failed for {prefix=}: {e}")
 65                return {}
 66    return results
 67
 68
 69async def get_cf_r2(
 70    key: str,
 71    bucket_name: str = DB.CF_R2_BUCKET_NAME,
 72    account_id: str = DB.CF_ACCOUNT_ID,
 73    aws_access_key_id: str = DB.CF_R2_ACCESS_KEY_ID,
 74    aws_secret_access_key: str = DB.CF_R2_SECRET_ACCESS_KEY,
 75    *,
 76    enabled: bool = DB.CF_R2_ENABLED,
 77    rformat: str = "json",
 78    silent: bool = False,
 79) -> dict:
 80    """Get from Cloudflare R2."""
 81    if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
 82        return {}
 83    key = bare_url(unquote_plus(key))  # remove http(s):// prefix
 84    async with Session().client(
 85        service_name="s3",
 86        endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
 87        aws_access_key_id=aws_access_key_id,
 88        aws_secret_access_key=aws_secret_access_key,
 89        region_name="auto",
 90    ) as s3:
 91        try:
 92            obj = await s3.get_object(Bucket=bucket_name, Key=key)
 93            if obj.get("Body"):
 94                data = await obj["Body"].read()
 95                if rformat == "json":
 96                    data = json.loads(data)
 97                if not silent:
 98                    logger.success(f"GET CF-R2 for {key}: {data}")
 99                return data
100        except ClientError as e:
101            if e.response["Error"]["Code"] != "NoSuchKey":
102                logger.warning(f"GET CF-R2 failed for {key}: {e}")
103        except Exception as e:
104            logger.warning(f"GET CF-R2 failed for {key}: {e}")
105    return {}
106
107
108async def set_cf_r2(
109    key: str,
110    data: dict | list | str | bytes | None = None,
111    metadata: dict | None = None,
112    ttl: int | None = None,
113    bucket_name: str = DB.CF_R2_BUCKET_NAME,
114    account_id: str = DB.CF_ACCOUNT_ID,
115    aws_access_key_id: str = DB.CF_R2_ACCESS_KEY_ID,
116    aws_secret_access_key: str = DB.CF_R2_SECRET_ACCESS_KEY,
117    *,
118    mime_type: str = "application/json",
119    enabled: bool = DB.CF_R2_ENABLED,
120    silent: bool = False,
121) -> bool:
122    """Set to Cloudflare R2."""
123    if not data and not metadata:
124        return False
125    if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
126        return False
127    key = bare_url(unquote_plus(key))  # remove http(s):// prefix
128    payload: dict = {
129        "CacheControl": "no-cache",
130        "Bucket": bucket_name,
131        "Key": key,
132        "ContentType": mime_type,
133    }
134    if data:
135        if isinstance(data, dict | list):
136            upload = json.dumps(data, ensure_ascii=False).encode("utf-8")
137        elif isinstance(data, str):
138            upload = data.encode("utf-8")
139        elif isinstance(data, bytes):
140            upload = data
141        payload |= {"Body": upload}
142
143    metadata = metadata or {}
144    if metadata:
145        payload |= {"Metadata": stringfy(metadata)}
146    if ttl is not None:
147        payload |= {"Expires": nowdt() + timedelta(seconds=ttl)}
148    async with Session().client(
149        service_name="s3",
150        endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
151        aws_access_key_id=aws_access_key_id,
152        aws_secret_access_key=aws_secret_access_key,
153        region_name="auto",
154    ) as s3:
155        try:
156            await s3.put_object(**payload)
157            if not silent:
158                logger.success(f"Successfully SET CF-R2 for {key}: {data=}, {metadata=}")
159        except Exception as e:
160            logger.warning(f"SET CF-R2 failed for {key}: {e}")
161            return False
162    return True
163
164
165async def del_cf_r2(
166    key: str,
167    bucket_name: str = DB.CF_R2_BUCKET_NAME,
168    account_id: str = DB.CF_ACCOUNT_ID,
169    aws_access_key_id: str = DB.CF_R2_ACCESS_KEY_ID,
170    aws_secret_access_key: str = DB.CF_R2_SECRET_ACCESS_KEY,
171    *,
172    enabled: bool = DB.CF_R2_ENABLED,
173    silent: bool = False,
174):
175    """Delete from Cloudflare R2."""
176    if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
177        return
178    key = bare_url(unquote_plus(key))  # remove http(s):// prefix
179    async with Session().client(
180        service_name="s3",
181        endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
182        aws_access_key_id=aws_access_key_id,
183        aws_secret_access_key=aws_secret_access_key,
184        region_name="auto",
185    ) as s3:
186        try:
187            await s3.delete_object(Bucket=bucket_name, Key=key)
188            if not silent:
189                logger.success(f"DEL CF-R2 for key={key}")
190        except Exception as e:
191            logger.warning(f"DEL CF-R2 failed for key={key}: {e}")
192
193
194async def head_cf_r2(
195    key: str,
196    bucket_name: str = DB.CF_R2_BUCKET_NAME,
197    account_id: str = DB.CF_ACCOUNT_ID,
198    aws_access_key_id: str = DB.CF_R2_ACCESS_KEY_ID,
199    aws_secret_access_key: str = DB.CF_R2_SECRET_ACCESS_KEY,
200    *,
201    enabled: bool = DB.CF_R2_ENABLED,
202    silent: bool = False,
203) -> dict:
204    """Head Cloudflare R2."""
205    if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
206        return {}
207    async with Session().client(
208        service_name="s3",
209        endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
210        aws_access_key_id=aws_access_key_id,
211        aws_secret_access_key=aws_secret_access_key,
212        region_name="auto",
213    ) as s3:
214        try:
215            return await s3.head_object(Bucket=bucket_name, Key=key)
216        except Exception:
217            if not silent:
218                logger.warning(f"`{key}` is not exist in CF-R2")
219    return {}
220
221
222async def clean_r2_expired():
223    """Clean expired files in CF-R2.
224
225    TTL/{num}h  # expire in {num} hours
226    TTL/{num}d  # expire in {num} days
227    TTL/{num}w  # expire in {num} weeks
228    TTL/{num}M  # expire in {num} months
229    """
230    now = nowdt("UTC")
231    r2 = await list_cf_r2(prefix="TTL/")
232    items = r2.get("Contents", [])
233    for item in items:
234        key = item.get("Key", "")
235        dt = item.get("LastModified", now)
236        if not dt:
237            continue
238        if matched := re.match(r"TTL/(\d+)([hdwM])", key):
239            num, unit = matched.groups()
240            num = int(num)
241            if unit == "h":
242                expires = dt + timedelta(hours=num)
243            elif unit == "d":
244                expires = dt + timedelta(days=num)
245            elif unit == "w":
246                expires = dt + timedelta(weeks=num)
247            elif unit == "M":
248                expires = dt + timedelta(days=num * 30)
249            else:
250                expires = dt + timedelta(days=365)  # default to 1 year
251            if expires < now:
252                logger.debug(f"Delete expired R2 key: {key}")
253                await del_cf_r2(key)