Commit a31b3a4
Changed files (3)
src
src/database/github.py
@@ -0,0 +1,345 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import base64
+import string
+from pathlib import Path
+from typing import Literal
+
+import anyio
+from glom import Path as GlomPath
+from glom import glom
+from httpx import AsyncClient
+from loguru import logger
+
+from config import DB, PROXY, cache
+from networking import download_file, hx_req
+
+
+async def list_assets(
+ tag_name: str = "",
+ release_name: str = "",
+ gh_user: str = DB.GH_USER,
+ gh_repo: str = DB.GH_REPO,
+ gh_token: str = DB.GH_TOKEN,
+) -> dict:
+ """List GitHub assets of a release.
+
+ `tag_name` and `release_name` must be specified at least one.
+
+ Returns:
+ {
+ "asset_name": {
+ "name": "asset_name",
+ "url": "asset_url",
+ "id": "asset_id"
+ "updated_at": "2025-08-29T06:27:52Z",
+ }
+ }
+ """
+ if not gh_user:
+ gh_user = await get_username(gh_token)
+ release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
+ if not all([release_id, gh_user, gh_repo, gh_token]):
+ return {}
+ headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
+ resp = await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases/{release_id}/assets", headers=headers, proxy=PROXY.GITHUB, check_keys=["0.browser_download_url"])
+ if isinstance(resp, list):
+ return {x["name"]: {"url": x["browser_download_url"], "id": x["id"], "updated_at": x["updated_at"], "name": x["name"]} for x in resp} # type: ignore
+ if resp.get("hx_error"):
+ logger.error(resp["hx_error"])
+ return {}
+
+
+@cache.memoize(ttl=3)
+async def list_releases(
+ gh_user: str = DB.GH_USER,
+ gh_repo: str = DB.GH_REPO,
+ gh_token: str = DB.GH_TOKEN,
+) -> dict:
+ """List GitHub releases."""
+ if not gh_user:
+ gh_user = await get_username(gh_token)
+ if not all([gh_user, gh_repo, gh_token]):
+ return {}
+ headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
+ releases = []
+ per_page = 100 # maximum is 100
+ page = 1
+ resp = await hx_req(
+ f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases",
+ params={"per_page": per_page, "page": page},
+ headers=headers,
+ proxy=PROXY.GITHUB,
+ check_keys=["0.id"],
+ )
+ if isinstance(resp, list):
+ releases.extend(resp)
+ while len(resp) == per_page:
+ page += 1
+ resp = await hx_req(
+ f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases",
+ params={"per_page": per_page, "page": page},
+ headers=headers,
+ proxy=PROXY.GITHUB,
+ check_keys=["0.id"],
+ )
+ if not isinstance(resp, list):
+ break
+ releases.extend(resp)
+ logger.success(f"Found {len(releases)} releases of {gh_user}/{gh_repo}")
+ return {x["name"]: x for x in releases}
+
+
+async def get_release_id(
+ tag_name: str = "",
+ release_name: str = "",
+ gh_user: str = DB.GH_USER,
+ gh_repo: str = DB.GH_REPO,
+ gh_token: str = DB.GH_TOKEN,
+) -> int:
+ """Get release id by release name or tag."""
+ if not gh_user:
+ gh_user = await get_username(gh_token)
+ releases = await list_releases(gh_user, gh_repo, gh_token)
+ release_id = 0
+ if release_name:
+ release_id = glom(releases, GlomPath(release_name, "id"), default=0)
+ if release_id == 0 and tag_name:
+ for release in releases.values():
+ if release["tag_name"] == tag_name:
+ release_id = release["id"]
+ break
+ if release_id == 0:
+ logger.warning(f"Release is not found. Release: {release_name}, Tag: {tag_name}")
+ return release_id
+
+
+async def gh_download_asset(
+ asset_name: str,
+ tag_name: str = "",
+ release_name: str = "",
+ gh_user: str = DB.GH_USER,
+ gh_repo: str = DB.GH_REPO,
+ gh_token: str = DB.GH_TOKEN,
+) -> str:
+ """Get asset from GitHub release."""
+ if not gh_user:
+ gh_user = await get_username(gh_token)
+ if not all([asset_name, gh_user, gh_repo, gh_token]):
+ return ""
+ assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
+ asset_url = glom(assets, GlomPath(asset_name, "url"), default="")
+ path = await download_file(asset_url, proxy=PROXY.GITHUB, stream=True)
+ if not Path(path).is_file():
+ logger.error(f"Get github asset failed. Asset Name: {asset_name}, Release: {release_name}, Tag: {tag_name}")
+ return path
+
+
+async def gh_upload_asset(
+ path: str | Path,
+ asset_name: str = "",
+ tag_name: str = "",
+ release_name: str = "",
+ gh_user: str = DB.GH_USER,
+ gh_repo: str = DB.GH_REPO,
+ gh_token: str = DB.GH_TOKEN,
+ *,
+ overwrite: bool = False,
+) -> str:
+ """Upload asset to GitHub release.
+
+ Returns:
+ url of the uploaded asset
+ """
+ if not path or not Path(path).expanduser().resolve().is_file():
+ logger.error(f"File not found: {path}")
+ return ""
+ if Path(path).stat().st_size >= 2 * 1024**3: # maximum is 2 GB
+ logger.error(f"File {Path(path).name} is too large: {Path(path).stat().st_size} bytes")
+ return ""
+
+ if not gh_user:
+ gh_user = await get_username(gh_token)
+ release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
+ if release_id == 0:
+ logger.info(f"Release not exists. Release: {release_name}, Tag: {tag_name}. Create a new release.")
+ release_id = await gh_create_release(tag_name, release_name, gh_user, gh_repo, gh_token)
+ if release_id == 0:
+ logger.error(f"Create release failed. Release: {release_name}, Tag: {tag_name}")
+ return ""
+ if not asset_name:
+ asset_name = Path(path).name
+ # encode the asset_name if needed. allowed characters: a-z, A-Z, 0-9, -, _, ., ~
+ if any(c not in string.ascii_letters + string.digits + "-_.~" for c in asset_name):
+ stem = Path(asset_name).stem
+ suffix = Path(asset_name).suffix
+ asset_name = base64.urlsafe_b64encode(stem.encode()).decode().rstrip("=") + suffix
+ # check if asset exists
+ assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
+ if asset_name in assets:
+ if not overwrite:
+ logger.info(f"Asset already exists. Asset: {asset_name}, Release: {release_name}, Tag: {tag_name}")
+ return assets[asset_name]["url"]
+ logger.warning(f"Delete existing asset. Asset: {asset_name}, Release: {release_name}, Tag: {tag_name}")
+ await gh_del_asset(asset_name, tag_name, release_name, gh_user, gh_repo, gh_token)
+
+ file_size = Path(path).stat().st_size
+ headers = {
+ "Accept": "application/vnd.github+json",
+ "Authorization": f"Bearer {gh_token}",
+ "X-GitHub-Api-Version": "2022-11-28",
+ "Content-Type": "application/octet-stream",
+ "Content-Length": str(file_size),
+ }
+ # use stream uploading
+ try:
+ logger.info(f"Uploading {asset_name} to GitHub Release: {release_name}, Tag: {tag_name}")
+ async with AsyncClient(headers=headers, proxy=PROXY.GITHUB, follow_redirects=True, timeout=None) as client: # noqa: S113
+ async with await anyio.open_file(path, "rb") as f:
+ response = await client.post(url=f"https://uploads.github.com/repos/{gh_user}/{gh_repo}/releases/{release_id}/assets?name={asset_name}", headers=headers, content=f)
+ response.raise_for_status()
+ logger.success(f"Uploaded {asset_name} to GitHub Release: {release_name}, Tag: {tag_name}")
+ return response.json()["browser_download_url"]
+ except Exception as e:
+ error = f"Failed to upload to GitHub: {e}"
+ logger.error(error)
+ return ""
+
+
+async def gh_del_asset(
+ asset_name: str = "",
+ tag_name: str = "",
+ release_name: str = "",
+ gh_user: str = DB.GH_USER,
+ gh_repo: str = DB.GH_REPO,
+ gh_token: str = DB.GH_TOKEN,
+) -> bool:
+ """Delete GitHub asset."""
+ if not gh_user:
+ gh_user = await get_username(gh_token)
+
+ assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
+ asset_id = glom(assets, GlomPath(asset_name, "id"), default=0)
+ if asset_id == 0:
+ logger.warning(f"Asset not found. Asset Name: {asset_name}, Release: {release_name}, Tag: {tag_name}")
+ return False
+ if not all([asset_name, gh_user, gh_repo, gh_token]):
+ return False
+ headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
+ await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases/assets/{asset_id}", "DELETE", headers=headers, proxy=PROXY.GITHUB, rformat="content")
+ logger.success(f"Deleted asset {asset_name} from GitHub Release: {release_name}, Tag: {tag_name}")
+ return True
+
+
+@cache.memoize(ttl=0)
+async def get_username(gh_token: str):
+ """Get github username from API token."""
+ if not gh_token:
+ return ""
+ headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
+ resp = await hx_req("https://api.github.com/user", headers=headers, proxy=PROXY.GITHUB, check_keys=["login"], check_kv={"type": "User"})
+ token_user = resp.get("login", "")
+ if DB.GH_USER and token_user != DB.GH_USER:
+ logger.warning(f"GitHub username mismatch: {token_user} != {DB.GH_USER}")
+ return token_user
+
+
+async def gh_create_release(
+ tag_name: str,
+ release_name: str = "",
+ gh_user: str = DB.GH_USER,
+ gh_repo: str = DB.GH_REPO,
+ gh_token: str = DB.GH_TOKEN,
+ *,
+ body: str = "",
+ draft: bool = False,
+ prerelease: bool = False,
+ make_latest: Literal["true", "false", "legacy"] = "true",
+) -> int:
+ """Create a new release on GitHub."""
+ if not gh_user:
+ gh_user = await get_username(gh_token)
+ if not all([tag_name, gh_user, gh_repo, gh_token]):
+ return 0
+ # check if release already exists
+ release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
+ if release_id:
+ return release_id
+ headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
+ payload = {"tag_name": tag_name, "name": release_name or tag_name, "body": body, "draft": draft, "prerelease": prerelease, "make_latest": make_latest}
+ resp = await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases", "POST", headers=headers, proxy=PROXY.GITHUB, json_data=payload, check_kv={"tag_name": tag_name})
+ if resp.get("hx_error"):
+ logger.error(resp["hx_error"])
+ return 0
+ logger.success(f"Created release {tag_name} on {gh_user}/{gh_repo}")
+ return resp.get("id", 0)
+
+
+async def gh_del_release(
+ tag_name: str = "",
+ release_name: str = "",
+ gh_user: str = DB.GH_USER,
+ gh_repo: str = DB.GH_REPO,
+ gh_token: str = DB.GH_TOKEN,
+ *,
+ del_tag: bool = True,
+) -> bool:
+ """Delete a GitHub release."""
+ if not gh_user:
+ gh_user = await get_username(gh_token)
+ if not all([gh_user, gh_repo, gh_token]):
+ return False
+ release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
+ if release_id == 0:
+ logger.warning(f"ReleaseID not found on {gh_user}/{gh_repo}. Release: {release_name}, Tag: {tag_name}")
+ return False
+ headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
+ releases = await list_releases(gh_user, gh_repo, gh_token)
+ release = next((x for x in releases.values() if x["id"] == release_id), {})
+
+ # del release
+ await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases/{release_id}", "DELETE", headers=headers, proxy=PROXY.GITHUB, rformat="content")
+ logger.success(f"Deleted release {tag_name} {release_name} on {gh_user}/{gh_repo}")
+
+ # del tag
+ if del_tag:
+ tag_name = release["tag_name"]
+ await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/git/refs/tags/{tag_name}", "DELETE", headers=headers, proxy=PROXY.GITHUB, rformat="content")
+ logger.success(f"Deleted tag {tag_name} on {gh_user}/{gh_repo}")
+ return True
+
+
+async def gh_clean_assets(
+ tag_name: str = "",
+ release_name: str = "",
+ gh_user: str = DB.GH_USER,
+ gh_repo: str = DB.GH_REPO,
+ gh_token: str = DB.GH_TOKEN,
+ *,
+ keep_latest: int = 0,
+ only_suffix: list[str] | None = None,
+) -> int:
+ """Delete assets from GitHub.
+
+ Args:
+ keep_latest: keep latest N assets
+ only_suffix: only delete assets with these formats. (e.g. [".mkv", ".mp4"])
+
+ Returns:
+ number of deleted assets
+ """
+ if not gh_user:
+ gh_user = await get_username(gh_token)
+ if not all([gh_user, gh_repo, gh_token]):
+ return 0
+ assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
+ files = list(assets.values())
+ if only_suffix is not None:
+ files = [x for name, x in assets.items() if name.endswith(tuple(only_suffix))]
+ files = sorted(files, key=lambda x: x["updated_at"], reverse=True)
+ for f in files[keep_latest:]:
+ logger.info(f"Deleting asset {f['name']} from {gh_user}/{gh_repo}, Release: {release_name}, Tag: {tag_name}")
+ await gh_del_asset(f["name"], tag_name, release_name, gh_user, gh_repo, gh_token)
+ num_del = len(files) - keep_latest
+ logger.success(f"Deleted {num_del} assets from {gh_user}/{gh_repo}, Release: {release_name}, Tag: {tag_name}")
+ return num_del
src/config.py
@@ -234,6 +234,9 @@ class DB:
TURSO_USERNAME = os.getenv("TURSO_USERNAME", "") # https://turso.tech
TURSO_API_TOKEN = os.getenv("TURSO_API_TOKEN", "")
TURSO_GROUP_TOKEN = os.getenv("TURSO_GROUP_TOKEN", "")
+ GH_USER = os.getenv("DB_GH_USER", "")
+ GH_REPO = os.getenv("DB_GH_REPO", "bennybot")
+ GH_TOKEN = os.getenv("DB_GH_TOKEN", "")
class HISTORY:
src/networking.py
@@ -91,7 +91,7 @@ async def hx_req(
else:
client = AsyncClient(http2=True, proxy=proxy, transport=transport, follow_redirects=follow_redirects, timeout=timeout, event_hooks={"request": [log_req], "response": [log_resp]})
- if method not in ["GET", "POST", "PUT"]:
+ if method not in ["GET", "POST", "PUT", "DELETE"]:
error = f"Invalid method: {method}"
logger.error(error)
return {"hx_error": error}
@@ -101,8 +101,10 @@ async def hx_req(
response = await client.get(url, cookies=cookies, headers=headers, params=params)
elif method == "POST":
response = await client.post(url, cookies=cookies, headers=headers, data=data, json=json_data, files=files, content=content_data, params=params)
- else:
+ elif method == "PUT":
response = await client.put(url, cookies=cookies, headers=headers, data=data, json=json_data, content=content_data, files=files, params=params)
+ else:
+ response = await client.delete(url, cookies=cookies, headers=headers, params=params)
response.raise_for_status()
if rformat == "content":
return {"content": response.content}