main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3from urllib.parse import quote_plus, unquote_plus
  4
  5from httpx import AsyncClient, AsyncHTTPTransport
  6from loguru import logger
  7
  8from config import DB, cache
  9from networking import hx_req
 10
 11
 12async def get_cf_kv(
 13    key: str,
 14    account_id: str = DB.CF_ACCOUNT_ID,
 15    api_token: str = DB.CF_API_TOKEN,
 16    namespace_id: str = DB.CF_KV_NAMESPACE_ID,
 17    *,
 18    enabled: bool = DB.CF_KV_ENABLED,
 19    silent: bool = False,
 20    cache_ttl: int = 0,
 21) -> dict:
 22    """Get from Cloudflare KV."""
 23    if not all([enabled, account_id, namespace_id, api_token]):
 24        return {}
 25    key = quote_plus(unquote_plus(key))
 26    if kv := cache.get(f"CFKV-{key}"):
 27        return kv
 28    api = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}/values/{key}"
 29    headers = {"authorization": f"Bearer {api_token}", "content-type": "application/json"}
 30    async with AsyncClient(http2=True, follow_redirects=True, transport=AsyncHTTPTransport(retries=3, http2=True)) as hx:
 31        try:
 32            resp = await hx.get(api, headers=headers, timeout=30)
 33            if resp.status_code == 404 and not silent:
 34                logger.trace(f"404 Not Found for CF-KV key={key}")
 35                return {}
 36            resp.raise_for_status()
 37            if data := resp.json():
 38                if not silent:
 39                    logger.success(f"GET CF-KV for {key}: {data}")
 40                if cache_ttl:
 41                    cache.set(f"CFKV-{key}", data, ttl=cache_ttl)
 42                return data
 43        except Exception as e:
 44            logger.warning(f"GET CF-KV failed for {key}: {e}")
 45    return {}
 46
 47
 48async def set_cf_kv(
 49    key: str,
 50    data: dict,
 51    ttl: int | None = None,
 52    account_id: str = DB.CF_ACCOUNT_ID,
 53    api_token: str = DB.CF_API_TOKEN,
 54    namespace_id: str = DB.CF_KV_NAMESPACE_ID,
 55    *,
 56    enabled: bool = DB.CF_KV_ENABLED,
 57    silent: bool = False,
 58) -> bool:
 59    """Set to Cloudflare KV."""
 60    if not all([enabled, account_id, namespace_id, api_token]):
 61        return False
 62    key = quote_plus(unquote_plus(key))
 63    api = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}/values/{key}"
 64    if ttl is not None:
 65        api = f"{api}?expiration_ttl={ttl}"
 66    headers = {"authorization": f"Bearer {api_token}", "content-type": "*/*"}
 67    resp = await hx_req(api, method="PUT", headers=headers, json_data=data, timeout=30, silent=True, check_kv={"success": True})
 68    if error := resp.get("hx_error"):
 69        logger.warning(f"SET CF-KV failed for key={key}: {error}")
 70        return False
 71    if not silent:
 72        logger.success(f"Successfully SET CF-KV for key={key}: {data}")
 73    return True
 74
 75
 76async def del_cf_kv(
 77    key: str,
 78    account_id: str = DB.CF_ACCOUNT_ID,
 79    api_token: str = DB.CF_API_TOKEN,
 80    namespace_id: str = DB.CF_KV_NAMESPACE_ID,
 81    *,
 82    enabled: bool = DB.CF_KV_ENABLED,
 83    silent: bool = False,
 84):
 85    """Delete from Cloudflare KV."""
 86    key = quote_plus(unquote_plus(key))
 87    if not all([enabled, account_id, namespace_id, api_token]):
 88        return
 89    api = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/storage/kv/namespaces/{namespace_id}/values/{key}"
 90    headers = {"authorization": f"Bearer {api_token}", "content-type": "application/json"}
 91    async with AsyncClient(http2=True, follow_redirects=True, transport=AsyncHTTPTransport(retries=3, http2=True)) as hx:
 92        try:
 93            resp = await hx.delete(api, headers=headers, timeout=30)
 94            resp.raise_for_status()
 95        except Exception as e:
 96            logger.warning(f"DEL CF-KV failed for key={key}: {e}")
 97            return
 98        if resp.json().get("success") and not silent:
 99            logger.success(f"DEL CF-KV for key={key}")
100    return