main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3from urllib.parse import quote_plus, unquote_plus
4
5from httpx import AsyncClient, AsyncHTTPTransport
6from loguru import logger
7
8from config import DB, cache
9from networking import hx_req
10
11
12async def get_cf_kv(
13 key: str,
14 account_id: str = DB.CF_ACCOUNT_ID,
15 api_token: str = DB.CF_API_TOKEN,
16 namespace_id: str = DB.CF_KV_NAMESPACE_ID,
17 *,
18 enabled: bool = DB.CF_KV_ENABLED,
19 silent: bool = False,
20 cache_ttl: int = 0,
21) -> dict:
22 """Get from Cloudflare KV."""
23 if not all([enabled, account_id, namespace_id, api_token]):
24 return {}
25 key = quote_plus(unquote_plus(key))
26 if kv := cache.get(f"CFKV-{key}"):
27 return kv
28 api = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}/values/{key}"
29 headers = {"authorization": f"Bearer {api_token}", "content-type": "application/json"}
30 async with AsyncClient(http2=True, follow_redirects=True, transport=AsyncHTTPTransport(retries=3, http2=True)) as hx:
31 try:
32 resp = await hx.get(api, headers=headers, timeout=30)
33 if resp.status_code == 404 and not silent:
34 logger.trace(f"404 Not Found for CF-KV key={key}")
35 return {}
36 resp.raise_for_status()
37 if data := resp.json():
38 if not silent:
39 logger.success(f"GET CF-KV for {key}: {data}")
40 if cache_ttl:
41 cache.set(f"CFKV-{key}", data, ttl=cache_ttl)
42 return data
43 except Exception as e:
44 logger.warning(f"GET CF-KV failed for {key}: {e}")
45 return {}
46
47
48async def set_cf_kv(
49 key: str,
50 data: dict,
51 ttl: int | None = None,
52 account_id: str = DB.CF_ACCOUNT_ID,
53 api_token: str = DB.CF_API_TOKEN,
54 namespace_id: str = DB.CF_KV_NAMESPACE_ID,
55 *,
56 enabled: bool = DB.CF_KV_ENABLED,
57 silent: bool = False,
58) -> bool:
59 """Set to Cloudflare KV."""
60 if not all([enabled, account_id, namespace_id, api_token]):
61 return False
62 key = quote_plus(unquote_plus(key))
63 api = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}/values/{key}"
64 if ttl is not None:
65 api = f"{api}?expiration_ttl={ttl}"
66 headers = {"authorization": f"Bearer {api_token}", "content-type": "*/*"}
67 resp = await hx_req(api, method="PUT", headers=headers, json_data=data, timeout=30, silent=True, check_kv={"success": True})
68 if error := resp.get("hx_error"):
69 logger.warning(f"SET CF-KV failed for key={key}: {error}")
70 return False
71 if not silent:
72 logger.success(f"Successfully SET CF-KV for key={key}: {data}")
73 return True
74
75
76async def del_cf_kv(
77 key: str,
78 account_id: str = DB.CF_ACCOUNT_ID,
79 api_token: str = DB.CF_API_TOKEN,
80 namespace_id: str = DB.CF_KV_NAMESPACE_ID,
81 *,
82 enabled: bool = DB.CF_KV_ENABLED,
83 silent: bool = False,
84):
85 """Delete from Cloudflare KV."""
86 key = quote_plus(unquote_plus(key))
87 if not all([enabled, account_id, namespace_id, api_token]):
88 return
89 api = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}/values/{key}"
90 headers = {"authorization": f"Bearer {api_token}", "content-type": "application/json"}
91 async with AsyncClient(http2=True, follow_redirects=True, transport=AsyncHTTPTransport(retries=3, http2=True)) as hx:
92 try:
93 resp = await hx.delete(api, headers=headers, timeout=30)
94 resp.raise_for_status()
95 except Exception as e:
96 logger.warning(f"DEL CF-KV failed for key={key}: {e}")
97 return
98 if resp.json().get("success") and not silent:
99 logger.success(f"DEL CF-KV for key={key}")
100 return