main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import contextlib
4import json
5import os
6import re
7import warnings
8from datetime import timedelta
9from urllib.parse import unquote_plus
10
11from aioboto3 import Session
12from botocore.exceptions import ClientError
13from loguru import logger
14
15from config import DB
16from utils import bare_url, nowdt, stringfy
17
18# hot fix: https://developers.cloudflare.com/r2/examples/aws/boto3/
19os.environ["AWS_REQUEST_CHECKSUM_CALCULATION"] = "when_required"
20os.environ["AWS_RESPONSE_CHECKSUM_VALIDATION"] = "when_required"
21
22warnings.filterwarnings("ignore", category=DeprecationWarning)
23
24
25async def list_cf_r2(
26 prefix: str = "",
27 continuation_token: str | None = None,
28 bucket_name: str = DB.CF_R2_BUCKET_NAME,
29 account_id: str = DB.CF_ACCOUNT_ID,
30 aws_access_key_id: str = DB.CF_R2_ACCESS_KEY_ID,
31 aws_secret_access_key: str = DB.CF_R2_SECRET_ACCESS_KEY,
32 *,
33 enabled: bool = DB.CF_R2_ENABLED,
34) -> dict:
35 """Get from Cloudflare R2."""
36 if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
37 return {}
38 results = {}
39 with contextlib.suppress(Exception):
40 async with Session().client(
41 service_name="s3",
42 endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
43 aws_access_key_id=aws_access_key_id,
44 aws_secret_access_key=aws_secret_access_key,
45 region_name="auto",
46 ) as s3:
47 payload = {"Bucket": bucket_name, "MaxKeys": 1000}
48 if continuation_token:
49 payload["ContinuationToken"] = continuation_token
50 if prefix:
51 payload["Prefix"] = prefix
52 contents = []
53 try:
54 results = await s3.list_objects_v2(**payload)
55 if not results.get("IsTruncated"):
56 return results
57 contents.extend(results.get("Contents", []))
58 while results.get("NextContinuationToken"):
59 payload["ContinuationToken"] = results["NextContinuationToken"]
60 results = await s3.list_objects_v2(**payload)
61 contents.extend(results.get("Contents", []))
62 results["Contents"] = contents
63 except Exception as e:
64 logger.warning(f"List CF-R2 failed for {prefix=}: {e}")
65 return {}
66 return results
67
68
69async def get_cf_r2(
70 key: str,
71 bucket_name: str = DB.CF_R2_BUCKET_NAME,
72 account_id: str = DB.CF_ACCOUNT_ID,
73 aws_access_key_id: str = DB.CF_R2_ACCESS_KEY_ID,
74 aws_secret_access_key: str = DB.CF_R2_SECRET_ACCESS_KEY,
75 *,
76 enabled: bool = DB.CF_R2_ENABLED,
77 rformat: str = "json",
78 silent: bool = False,
79) -> dict:
80 """Get from Cloudflare R2."""
81 if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
82 return {}
83 key = bare_url(unquote_plus(key)) # remove http(s):// prefix
84 async with Session().client(
85 service_name="s3",
86 endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
87 aws_access_key_id=aws_access_key_id,
88 aws_secret_access_key=aws_secret_access_key,
89 region_name="auto",
90 ) as s3:
91 try:
92 obj = await s3.get_object(Bucket=bucket_name, Key=key)
93 if obj.get("Body"):
94 data = await obj["Body"].read()
95 if rformat == "json":
96 data = json.loads(data)
97 if not silent:
98 logger.success(f"GET CF-R2 for {key}: {data}")
99 return data
100 except ClientError as e:
101 if e.response["Error"]["Code"] != "NoSuchKey":
102 logger.warning(f"GET CF-R2 failed for {key}: {e}")
103 except Exception as e:
104 logger.warning(f"GET CF-R2 failed for {key}: {e}")
105 return {}
106
107
108async def set_cf_r2(
109 key: str,
110 data: dict | list | str | bytes | None = None,
111 metadata: dict | None = None,
112 ttl: int | None = None,
113 bucket_name: str = DB.CF_R2_BUCKET_NAME,
114 account_id: str = DB.CF_ACCOUNT_ID,
115 aws_access_key_id: str = DB.CF_R2_ACCESS_KEY_ID,
116 aws_secret_access_key: str = DB.CF_R2_SECRET_ACCESS_KEY,
117 *,
118 mime_type: str = "application/json",
119 enabled: bool = DB.CF_R2_ENABLED,
120 silent: bool = False,
121) -> bool:
122 """Set to Cloudflare R2."""
123 if not data and not metadata:
124 return False
125 if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
126 return False
127 key = bare_url(unquote_plus(key)) # remove http(s):// prefix
128 payload: dict = {
129 "CacheControl": "no-cache",
130 "Bucket": bucket_name,
131 "Key": key,
132 "ContentType": mime_type,
133 }
134 if data:
135 if isinstance(data, dict | list):
136 upload = json.dumps(data, ensure_ascii=False).encode("utf-8")
137 elif isinstance(data, str):
138 upload = data.encode("utf-8")
139 elif isinstance(data, bytes):
140 upload = data
141 payload |= {"Body": upload}
142
143 metadata = metadata or {}
144 if metadata:
145 payload |= {"Metadata": stringfy(metadata)}
146 if ttl is not None:
147 payload |= {"Expires": nowdt() + timedelta(seconds=ttl)}
148 async with Session().client(
149 service_name="s3",
150 endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
151 aws_access_key_id=aws_access_key_id,
152 aws_secret_access_key=aws_secret_access_key,
153 region_name="auto",
154 ) as s3:
155 try:
156 await s3.put_object(**payload)
157 if not silent:
158 logger.success(f"Successfully SET CF-R2 for {key}: {data=}, {metadata=}")
159 except Exception as e:
160 logger.warning(f"SET CF-R2 failed for {key}: {e}")
161 return False
162 return True
163
164
165async def del_cf_r2(
166 key: str,
167 bucket_name: str = DB.CF_R2_BUCKET_NAME,
168 account_id: str = DB.CF_ACCOUNT_ID,
169 aws_access_key_id: str = DB.CF_R2_ACCESS_KEY_ID,
170 aws_secret_access_key: str = DB.CF_R2_SECRET_ACCESS_KEY,
171 *,
172 enabled: bool = DB.CF_R2_ENABLED,
173 silent: bool = False,
174):
175 """Delete from Cloudflare R2."""
176 if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
177 return
178 key = bare_url(unquote_plus(key)) # remove http(s):// prefix
179 async with Session().client(
180 service_name="s3",
181 endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
182 aws_access_key_id=aws_access_key_id,
183 aws_secret_access_key=aws_secret_access_key,
184 region_name="auto",
185 ) as s3:
186 try:
187 await s3.delete_object(Bucket=bucket_name, Key=key)
188 if not silent:
189 logger.success(f"DEL CF-R2 for key={key}")
190 except Exception as e:
191 logger.warning(f"DEL CF-R2 failed for key={key}: {e}")
192
193
194async def head_cf_r2(
195 key: str,
196 bucket_name: str = DB.CF_R2_BUCKET_NAME,
197 account_id: str = DB.CF_ACCOUNT_ID,
198 aws_access_key_id: str = DB.CF_R2_ACCESS_KEY_ID,
199 aws_secret_access_key: str = DB.CF_R2_SECRET_ACCESS_KEY,
200 *,
201 enabled: bool = DB.CF_R2_ENABLED,
202 silent: bool = False,
203) -> dict:
204 """Head Cloudflare R2."""
205 if not all([enabled, bucket_name, account_id, aws_access_key_id, aws_secret_access_key]):
206 return {}
207 async with Session().client(
208 service_name="s3",
209 endpoint_url=f"https://{account_id}.r2.cloudflarestorage.com",
210 aws_access_key_id=aws_access_key_id,
211 aws_secret_access_key=aws_secret_access_key,
212 region_name="auto",
213 ) as s3:
214 try:
215 return await s3.head_object(Bucket=bucket_name, Key=key)
216 except Exception:
217 if not silent:
218 logger.warning(f"`{key}` is not exist in CF-R2")
219 return {}
220
221
222async def clean_r2_expired():
223 """Clean expired files in CF-R2.
224
225 TTL/{num}h # expire in {num} hours
226 TTL/{num}d # expire in {num} days
227 TTL/{num}w # expire in {num} weeks
228 TTL/{num}M # expire in {num} months
229 """
230 now = nowdt("UTC")
231 r2 = await list_cf_r2(prefix="TTL/")
232 items = r2.get("Contents", [])
233 for item in items:
234 key = item.get("Key", "")
235 dt = item.get("LastModified", now)
236 if not dt:
237 continue
238 if matched := re.match(r"TTL/(\d+)([hdwM])", key):
239 num, unit = matched.groups()
240 num = int(num)
241 if unit == "h":
242 expires = dt + timedelta(hours=num)
243 elif unit == "d":
244 expires = dt + timedelta(days=num)
245 elif unit == "w":
246 expires = dt + timedelta(weeks=num)
247 elif unit == "M":
248 expires = dt + timedelta(days=num * 30)
249 else:
250 expires = dt + timedelta(days=365) # default to 1 year
251 if expires < now:
252 logger.debug(f"Delete expired R2 key: {key}")
253 await del_cf_r2(key)