Commit 12f3bb1

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-06-06 13:36:37
feat(database): support `Turso` provider
1 parent 1493f25
Changed files (2)
src
src/database/turso.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from glom import flatten, glom
+from loguru import logger
+
+from config import DB, PROXY, cache
+from networking import hx_req
+
+
+@cache.memoize(ttl=0)
+async def turso_db_url(
+    name: str = "bennybot",
+    group: str = "default",
+    username: str = DB.TURSO_USERNAME,
+    api_token: str = DB.TURSO_API_TOKEN,
+    *,
+    enabled: bool = DB.TURSO_ENABLED,
+    silent: bool = False,
+) -> str:
+    """Get Turso database url."""
+    if not all([enabled, username, api_token]):
+        return ""
+    api = f"https://api.turso.tech/v1/organizations/{username}/databases"
+    headers = {"authorization": f"Bearer {api_token}", "content-type": "application/json"}
+    payload = {"name": name, "group": group}
+    # check if database exists
+    resp = await hx_req(api, method="GET", headers=headers, proxy=PROXY.TURSO, check_keys=["databases"], silent=silent)
+    if name in glom(resp, "databases.*.Name", default=[]):
+        return "https://" + next(x["hostname"] for x in resp["databases"] if x["Name"] == name) + "/v2/pipeline"
+
+    # not exists, create it
+    resp = await hx_req(api, method="POST", post_json=payload, headers=headers, check_kv={"database.Name": name}, check_keys=["database.Hostname"], proxy=PROXY.TURSO, silent=silent)
+    return "https://" + resp["database"]["Hostname"] + "/v2/pipeline"
+
+
+@cache.memoize(ttl=0)
+async def turso_create_table(table_name: str | float, columns: str, db_name: str = "bennybot", *, silent: bool = False) -> None:
+    """Create a turso table."""
+    tables = await turso_list_tables(db_name, silent=silent)
+    if table_name in tables:
+        return
+    resp = await turso_exec(
+        [
+            {
+                "type": "execute",
+                "stmt": {"sql": f'CREATE TABLE IF NOT EXISTS "{table_name}" ({columns});'},
+            }
+        ],
+        db_name=db_name,
+        silent=silent,
+    )
+    if glom(resp, "results.0.type", default="") == "ok":
+        logger.success(f'Create Table "{table_name}" in Turso database "{db_name}"')
+
+
+async def turso_list_tables(db_name: str = "bennybot", *, silent: bool = False) -> list[str]:
+    """List turso tables in a database."""
+    resp = await turso_exec(
+        [
+            {
+                "type": "execute",
+                "stmt": {"sql": "SELECT name FROM sqlite_master WHERE type='table';"},
+            }
+        ],
+        db_name=db_name,
+        silent=silent,
+    )
+    return flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
+
+
+async def turso_exec(
+    statements: list[dict],
+    db_name: str = "bennybot",
+    group_token: str = DB.TURSO_GROUP_TOKEN,
+    *,
+    retry: int = 0,
+    silent: bool = False,
+) -> dict:
+    """Exec turso statements."""
+    if not all([statements, group_token]):
+        return {}
+    db_url = await turso_db_url(db_name, silent=silent)
+    if not db_url:
+        return {}
+    headers = {"authorization": f"Bearer {group_token}", "content-type": "application/json"}
+    if statements[-1] != {"type": "close"}:
+        statements.append({"type": "close"})
+    if not silent:
+        logger.trace(f"Turso Exec: {statements}")
+    return await hx_req(db_url, "POST", post_json={"requests": statements}, headers=headers, check_keys=["results"], proxy=PROXY.TURSO, max_retry=retry, silent=silent, timeout=600)
src/config.py
@@ -105,6 +105,7 @@ class DANMU:
     PROXY = os.getenv("DANMU_PROXY", None)  # socks5://127.0.0.1:7890
     NUM_PER_QUERY = int(os.getenv("DANMU_NUM_PER_QUERY", "100"))  # Number of items per query
     D1_DATABASE = os.getenv("DANMU_D1_DATABASE", "bennybot-danmu")
+    TURSO_DATABASE = os.getenv("DANMU_TURSO_DATABASE", "bennybot-danmu")
     R2_PREFIX = os.getenv("DANMU_R2_PREFIX", "Streaming/")
     R2_SYNC_ENABLE = os.getenv("DANMU_R2_SYNC_ENABLE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
     R2_SYNC_DANMU_YEARS = os.getenv("R2_SYNC_DANMU_YEARS", "")  # comma separated years to sync to R2. e.g. "2025,2024,2023"
@@ -143,6 +144,7 @@ class PROXY:  # format: socks5://127.0.0.1:7890
     XHS = os.getenv("XHS_PROXY", None)  # Banned VPS IP, need residential proxy
     GPT = os.getenv("GPT_PROXY", None)
     D1 = os.getenv("D1_PROXY", None)
+    TURSO = os.getenv("TURSO_PROXY", None)
     WECHAT = os.getenv("WECHAT_PROXY", None)
     DOUYIN = os.getenv("DOUYIN_PROXY", None)
     TIKTOK = os.getenv("TIKTOK_PROXY", None)
@@ -192,6 +194,10 @@ class DB:
     ALIST_BASR_PATH = os.getenv("ALIST_BASR_PATH", "")
     PASTBIN_SERVER = os.getenv("PASTBIN_SERVER", "https://shz.al")
     PASTBIN_MAX_BYTES = int(os.getenv("PASTBIN_MAX_BYTES", "10485760"))  # 10 MB
+    TURSO_ENABLED = os.getenv("TURSO_ENABLED", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
+    TURSO_USERNAME = os.getenv("TURSO_USERNAME", "")  # https://turso.tech
+    TURSO_API_TOKEN = os.getenv("TURSO_API_TOKEN", "")
+    TURSO_GROUP_TOKEN = os.getenv("TURSO_GROUP_TOKEN", "")
 
 
 class HISTORY: