Commit 345a2a8
src/database/README.md
@@ -11,4 +11,5 @@ from database.d1 import create_d1_database, create_d1_table, list_d1_tables, que
from database.alist import list_alist, download_alist, upload_alist, delete_alist
from database.pastbin import upload_pastbin, delete_pastbin
from database.uguu import upload_uguu
+from database.turso import turso_db_url, turso_create_table, turso_list_tables, turso_exec
```
src/database/turso.py
@@ -9,11 +9,11 @@ from networking import hx_req
@cache.memoize(ttl=0)
async def turso_db_url(
- name: str = "bennybot",
+ db_name: str = "bennybot",
group: str = "default",
+ *,
username: str = DB.TURSO_USERNAME,
api_token: str = DB.TURSO_API_TOKEN,
- *,
enabled: bool = DB.TURSO_ENABLED,
silent: bool = False,
) -> str:
@@ -22,38 +22,80 @@ async def turso_db_url(
return ""
api = f"https://api.turso.tech/v1/organizations/{username}/databases"
headers = {"authorization": f"Bearer {api_token}", "content-type": "application/json"}
- payload = {"name": name, "group": group}
+ payload = {"name": db_name, "group": group}
# check if database exists
resp = await hx_req(api, method="GET", headers=headers, proxy=PROXY.TURSO, check_keys=["databases"], silent=silent)
- if name in glom(resp, "databases.*.Name", default=[]):
- return "https://" + next(x["hostname"] for x in resp["databases"] if x["Name"] == name) + "/v2/pipeline"
+ if db_name in glom(resp, "databases.*.Name", default=[]):
+ return "https://" + next(x["hostname"] for x in resp["databases"] if x["Name"] == db_name) + "/v2/pipeline"
# not exists, create it
- resp = await hx_req(api, method="POST", json_data=payload, headers=headers, check_kv={"database.Name": name}, check_keys=["database.Hostname"], proxy=PROXY.TURSO, silent=silent)
+ resp = await hx_req(api, method="POST", json_data=payload, headers=headers, check_kv={"database.Name": db_name}, check_keys=["database.Hostname"], proxy=PROXY.TURSO, silent=silent)
return "https://" + resp["database"]["Hostname"] + "/v2/pipeline"
@cache.memoize(ttl=0)
-async def turso_create_table(table_name: str | float, columns: str, db_name: str = "bennybot", *, silent: bool = False) -> None:
+async def turso_create_table(
+ table_name: str | float,
+ columns: str,
+ db_name: str = "bennybot",
+ idx_cols: list[str] | None = None,
+ *,
+ username: str = DB.TURSO_USERNAME,
+ api_token: str = DB.TURSO_API_TOKEN,
+ group_token: str = DB.TURSO_GROUP_TOKEN,
+ silent: bool = False,
+) -> None:
"""Create a turso table."""
- tables = await turso_list_tables(db_name, silent=silent)
- if table_name in tables:
+ tables = await turso_list_tables(db_name, username=username, api_token=api_token, group_token=group_token, silent=silent)
+ if table_name not in tables:
+ resp = await turso_exec(
+ [{"type": "execute", "stmt": {"sql": f'CREATE TABLE IF NOT EXISTS "{table_name}" ({columns});'}}],
+ db_name=db_name,
+ username=username,
+ api_token=api_token,
+ group_token=group_token,
+ silent=silent,
+ )
+ if glom(resp, "results.0.type", default="") == "ok":
+ logger.success(f'Create Table "{table_name}" in Turso database "{db_name}"')
+
+ # create indexs if idx_cols is not None
+ if idx_cols is None:
return
resp = await turso_exec(
- [
- {
- "type": "execute",
- "stmt": {"sql": f'CREATE TABLE IF NOT EXISTS "{table_name}" ({columns});'},
- }
- ],
+ [{"type": "execute", "stmt": {"sql": "SELECT name FROM sqlite_master WHERE type='index';"}}],
db_name=db_name,
+ username=username,
+ api_token=api_token,
+ group_token=group_token,
silent=silent,
)
- if glom(resp, "results.0.type", default="") == "ok":
- logger.success(f'Create Table "{table_name}" in Turso database "{db_name}"')
+ indexs = flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
+ for idx_name in idx_cols:
+ if idx_name not in columns:
+ logger.warning(f"Index {idx_name} not in columns {columns}")
+ continue
+ if f"idx_{table_name}_{idx_name}" not in indexs:
+ resp = await turso_exec(
+ [{"type": "execute", "stmt": {"sql": f'CREATE INDEX IF NOT EXISTS "idx_{table_name}_{idx_name}" ON "{table_name}"({idx_name})'}}],
+ db_name=db_name,
+ username=username,
+ api_token=api_token,
+ group_token=group_token,
+ silent=silent,
+ )
+ if glom(resp, "results.0.type", default="") == "ok":
+ logger.success(f'Create Index "idx_{table_name}_{idx_name}" of table "{table_name}" in Turso database "{db_name}"')
-async def turso_list_tables(db_name: str = "bennybot", *, silent: bool = False) -> list[str]:
+async def turso_list_tables(
+ db_name: str = "bennybot",
+ *,
+ username: str = DB.TURSO_USERNAME,
+ api_token: str = DB.TURSO_API_TOKEN,
+ group_token: str = DB.TURSO_GROUP_TOKEN,
+ silent: bool = False,
+) -> list[str]:
"""List turso tables in a database."""
resp = await turso_exec(
[
@@ -63,6 +105,9 @@ async def turso_list_tables(db_name: str = "bennybot", *, silent: bool = False)
}
],
db_name=db_name,
+ username=username,
+ api_token=api_token,
+ group_token=group_token,
silent=silent,
)
return flatten(glom(resp, "results.0.response.result.rows.*.*.value", default=[]))
@@ -70,16 +115,18 @@ async def turso_list_tables(db_name: str = "bennybot", *, silent: bool = False)
async def turso_exec(
statements: list[dict],
+ *,
db_name: str = "bennybot",
+ username: str = DB.TURSO_USERNAME,
+ api_token: str = DB.TURSO_API_TOKEN,
group_token: str = DB.TURSO_GROUP_TOKEN,
- *,
retry: int = 0,
silent: bool = False,
) -> dict:
"""Exec turso statements."""
- if not all([statements, group_token]):
+ if not all([statements, db_name, username, api_token, group_token]):
return {}
- db_url = await turso_db_url(db_name, silent=silent)
+ db_url = await turso_db_url(db_name, username=username, api_token=api_token, silent=silent)
if not db_url:
return {}
headers = {"authorization": f"Bearer {group_token}", "content-type": "application/json"}
@@ -87,4 +134,29 @@ async def turso_exec(
statements.append({"type": "close"})
if not silent:
logger.trace(f"Turso Exec: {statements}")
- return await hx_req(db_url, "POST", json_data={"requests": statements}, headers=headers, check_keys=["results"], proxy=PROXY.TURSO, max_retry=retry, silent=silent, timeout=600)
+
+ resp = await hx_req(db_url, "POST", json_data={"requests": statements}, headers=headers, check_keys=["results"], proxy=PROXY.TURSO, max_retry=int(retry), silent=silent, timeout=600)
+ if not silent:
+ rows = glom(resp, "results.0.response.result.rows", default=[])
+ log = f"Found {len(rows)} records in Turso."
+ log += f" Rows read: {glom(resp, 'results.0.response.result.rows_read', default=0)}"
+ log += f", write: {glom(resp, 'results.0.response.result.rows_written', default=0)}"
+ logger.success(log)
+ return resp
+
+
+def insert_statement(table_name: str, records: dict, update_on_conflict: str = "") -> dict:
+ """Create a turso insert statement."""
+ SQL_TYPES = {"str": "text", "int": "integer", "float": "float", "nonetype": "null"}
+ keys = ", ".join(records)
+ values = ", ".join(["?" for _ in range(len(records))])
+ args = [{"type": SQL_TYPES[type(x).__name__.lower()], "value": str(x) if isinstance(x, (int, float)) else x} for x in records.values()]
+
+ sql = f"INSERT INTO '{table_name}' ({keys}) VALUES ({values});" # noqa: S608
+ if update_on_conflict:
+ updates = ", ".join([f"{k} = EXCLUDED.{k}" for k in records if k != update_on_conflict])
+ sql = f"INSERT INTO '{table_name}' ({keys}) VALUES ({values}) ON CONFLICT ({update_on_conflict}) DO UPDATE SET {updates};" # noqa: S608
+ stmt = {"sql": sql}
+ if args:
+ stmt |= {"args": args}
+ return {"type": "execute", "stmt": stmt}