main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import base64
  4import string
  5from pathlib import Path
  6from typing import Literal
  7
  8import anyio
  9from glom import Path as GlomPath
 10from glom import glom
 11from httpx import AsyncClient
 12from loguru import logger
 13
 14from config import DB, PROXY, TOKEN, TZ, cache
 15from networking import download_file, hx_req
 16from utils import nowdt
 17
 18
 19async def list_assets(
 20    tag_name: str = "",
 21    release_name: str = "",
 22    gh_user: str = DB.GH_USER,
 23    gh_repo: str = DB.GH_REPO,
 24    gh_token: str = DB.GH_TOKEN,
 25) -> dict:
 26    """List GitHub assets of a release.
 27
 28    `tag_name` and `release_name` must be specified at least one.
 29
 30    Returns:
 31        {
 32            "asset_name": {
 33                "name": "asset_name",
 34                "url": "asset_url",
 35                "id": "asset_id"
 36                "updated_at": "2025-08-29T06:27:52Z",
 37                }
 38        }
 39    """
 40    if not gh_user:
 41        gh_user = await get_username(gh_token)
 42    release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
 43    if not all([release_id, gh_user, gh_repo, gh_token]):
 44        return {}
 45    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2026-03-10"}
 46    resp = await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases/{release_id}/assets", headers=headers, proxy=PROXY.GITHUB, check_keys=["0.browser_download_url"])
 47    if isinstance(resp, list):
 48        return {x["name"]: {"url": x["browser_download_url"], "id": x["id"], "updated_at": x["updated_at"], "name": x["name"]} for x in resp}
 49    if resp.get("hx_error"):
 50        logger.error(resp["hx_error"])
 51    return {}
 52
 53
 54@cache.memoize(ttl=3)
 55async def list_releases(
 56    gh_user: str = DB.GH_USER,
 57    gh_repo: str = DB.GH_REPO,
 58    gh_token: str = DB.GH_TOKEN,
 59) -> dict:
 60    """List GitHub releases."""
 61    if not gh_user:
 62        gh_user = await get_username(gh_token)
 63    if not all([gh_user, gh_repo, gh_token]):
 64        return {}
 65    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2026-03-10"}
 66    releases = []
 67    per_page = 100  # maximum is 100
 68    page = 1
 69    resp = await hx_req(
 70        f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases",
 71        params={"per_page": per_page, "page": page},
 72        headers=headers,
 73        proxy=PROXY.GITHUB,
 74        check_keys=["0.id"],
 75    )
 76    if isinstance(resp, list):
 77        releases.extend(resp)
 78        while len(resp) == per_page:
 79            page += 1
 80            resp = await hx_req(
 81                f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases",
 82                params={"per_page": per_page, "page": page},
 83                headers=headers,
 84                proxy=PROXY.GITHUB,
 85                check_keys=["0.id"],
 86            )
 87            if not isinstance(resp, list):
 88                break
 89            releases.extend(resp)
 90    logger.success(f"Found {len(releases)} releases of {gh_user}/{gh_repo}")
 91    return {x["name"]: x for x in releases}
 92
 93
 94async def get_release_id(
 95    tag_name: str = "",
 96    release_name: str = "",
 97    gh_user: str = DB.GH_USER,
 98    gh_repo: str = DB.GH_REPO,
 99    gh_token: str = DB.GH_TOKEN,
100) -> int:
101    """Get release id by release name or tag."""
102    if not gh_user:
103        gh_user = await get_username(gh_token)
104    releases = await list_releases(gh_user, gh_repo, gh_token)
105    release_id = 0
106    if release_name:
107        release_id = glom(releases, GlomPath(release_name, "id"), default=0)
108    if release_id == 0 and tag_name:
109        for release in releases.values():
110            if release["tag_name"] == tag_name:
111                release_id = release["id"]
112                break
113    if release_id == 0:
114        logger.warning(f"Release is not found. Release: {release_name}, Tag: {tag_name}")
115    return release_id
116
117
118async def gh_download_asset(
119    asset_name: str,
120    tag_name: str = "",
121    release_name: str = "",
122    gh_user: str = DB.GH_USER,
123    gh_repo: str = DB.GH_REPO,
124    gh_token: str = DB.GH_TOKEN,
125) -> str:
126    """Get asset from GitHub release."""
127    if not gh_user:
128        gh_user = await get_username(gh_token)
129    if not all([asset_name, gh_user, gh_repo, gh_token]):
130        return ""
131    assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
132    asset_url = glom(assets, GlomPath(asset_name, "url"), default="")
133    path = await download_file(asset_url, proxy=PROXY.GITHUB, stream=True)
134    if not Path(path).is_file():
135        logger.error(f"Get github asset failed. Asset Name: {asset_name}, Release: {release_name}, Tag: {tag_name}")
136    return path
137
138
139async def gh_upload_asset(
140    path: str | Path,
141    asset_name: str = "",
142    tag_name: str = "",
143    release_name: str = "",
144    gh_user: str = DB.GH_USER,
145    gh_repo: str = DB.GH_REPO,
146    gh_token: str = DB.GH_TOKEN,
147    *,
148    overwrite: bool = False,
149) -> str:
150    """Upload asset to GitHub release.
151
152    Returns:
153        url of the uploaded asset
154    """
155    if not path or not Path(path).expanduser().resolve().is_file():
156        logger.error(f"File not found: {path}")
157        return ""
158    if Path(path).stat().st_size >= 2 * 1024**3:  # maximum is 2 GB
159        logger.error(f"File {Path(path).name} is too large: {Path(path).stat().st_size} bytes")
160        return ""
161
162    if not gh_user:
163        gh_user = await get_username(gh_token)
164    release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
165    if release_id == 0:
166        logger.info(f"Release not exists. Release: {release_name}, Tag: {tag_name}. Create a new release.")
167        release_id = await gh_create_release(tag_name, release_name, gh_user, gh_repo, gh_token)
168        if release_id == 0:
169            logger.error(f"Create release failed. Release: {release_name}, Tag: {tag_name}")
170            return ""
171    if not asset_name:
172        asset_name = Path(path).name
173    # encode the asset_name if needed. allowed characters: a-z, A-Z, 0-9, -, _, ., ~
174    if any(c not in string.ascii_letters + string.digits + "-_.~" for c in asset_name):
175        stem = Path(asset_name).stem
176        suffix = Path(asset_name).suffix
177        asset_name = base64.urlsafe_b64encode(stem.encode()).decode().rstrip("=") + suffix
178    # check if asset exists
179    assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
180    if asset_name in assets:
181        if not overwrite:
182            logger.info(f"Asset already exists. Asset: {asset_name}, Release: {release_name}, Tag: {tag_name}")
183            return assets[asset_name]["url"]
184        logger.warning(f"Delete existing asset. Asset: {asset_name}, Release: {release_name}, Tag: {tag_name}")
185        await gh_del_asset(asset_name, tag_name, release_name, gh_user, gh_repo, gh_token)
186
187    file_size = Path(path).stat().st_size
188    headers = {
189        "Accept": "application/vnd.github+json",
190        "Authorization": f"Bearer {gh_token}",
191        "X-GitHub-Api-Version": "2026-03-10",
192        "Content-Type": "application/octet-stream",
193        "Content-Length": str(file_size),
194    }
195    # use stream uploading
196    try:
197        logger.info(f"Uploading {asset_name} to GitHub Release: {release_name}, Tag: {tag_name}")
198        async with AsyncClient(headers=headers, proxy=PROXY.GITHUB, follow_redirects=True, timeout=None) as client:  # noqa: S113
199            async with await anyio.open_file(path, "rb") as f:
200                response = await client.post(url=f"https://uploads.github.com/repos/{gh_user}/{gh_repo}/releases/{release_id}/assets?name={asset_name}", headers=headers, content=f)
201                response.raise_for_status()
202            logger.success(f"Uploaded {asset_name} to GitHub Release: {release_name}, Tag: {tag_name}")
203            return response.json()["browser_download_url"]
204    except Exception as e:
205        error = f"Failed to upload to GitHub: {e}"
206        logger.error(error)
207    return ""
208
209
210async def gh_del_asset(
211    asset_name: str = "",
212    tag_name: str = "",
213    release_name: str = "",
214    gh_user: str = DB.GH_USER,
215    gh_repo: str = DB.GH_REPO,
216    gh_token: str = DB.GH_TOKEN,
217) -> bool:
218    """Delete GitHub asset."""
219    if not gh_user:
220        gh_user = await get_username(gh_token)
221
222    assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
223    asset_id = glom(assets, GlomPath(asset_name, "id"), default=0)
224    if asset_id == 0:
225        logger.warning(f"Asset not found. Asset Name: {asset_name}, Release: {release_name}, Tag: {tag_name}")
226        return False
227    if not all([asset_name, gh_user, gh_repo, gh_token]):
228        return False
229    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2026-03-10"}
230    await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases/assets/{asset_id}", "DELETE", headers=headers, proxy=PROXY.GITHUB, rformat="content")
231    logger.success(f"Deleted asset {asset_name} from GitHub Release: {release_name}, Tag: {tag_name}")
232    return True
233
234
235@cache.memoize(ttl=0)
236async def get_username(gh_token: str):
237    """Get github username from API token."""
238    if not gh_token:
239        return ""
240    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2026-03-10"}
241    resp = await hx_req("https://api.github.com/user", headers=headers, proxy=PROXY.GITHUB, check_keys=["login"], check_kv={"type": "User"})
242    token_user = resp.get("login", "")
243    if DB.GH_USER and token_user != DB.GH_USER:
244        logger.warning(f"GitHub username mismatch: {token_user} != {DB.GH_USER}")
245    return token_user
246
247
248async def gh_create_release(
249    tag_name: str,
250    release_name: str = "",
251    gh_user: str = DB.GH_USER,
252    gh_repo: str = DB.GH_REPO,
253    gh_token: str = DB.GH_TOKEN,
254    *,
255    body: str = "",
256    draft: bool = False,
257    prerelease: bool = False,
258    make_latest: Literal["true", "false", "legacy"] = "true",
259) -> int:
260    """Create a new release on GitHub."""
261    if not gh_user:
262        gh_user = await get_username(gh_token)
263    if not all([tag_name, gh_user, gh_repo, gh_token]):
264        return 0
265    # check if release already exists
266    release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
267    if release_id:
268        return release_id
269    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2026-03-10"}
270    payload = {"tag_name": tag_name, "name": release_name or tag_name, "body": body, "draft": draft, "prerelease": prerelease, "make_latest": make_latest}
271    resp = await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases", "POST", headers=headers, proxy=PROXY.GITHUB, json_data=payload, check_kv={"tag_name": tag_name})
272    if resp.get("hx_error"):
273        logger.error(resp["hx_error"])
274        return 0
275    logger.success(f"Created release {tag_name} on {gh_user}/{gh_repo}")
276    return resp.get("id", 0)
277
278
279async def gh_del_release(
280    tag_name: str = "",
281    release_name: str = "",
282    gh_user: str = DB.GH_USER,
283    gh_repo: str = DB.GH_REPO,
284    gh_token: str = DB.GH_TOKEN,
285    *,
286    del_tag: bool = True,
287) -> bool:
288    """Delete a GitHub release."""
289    if not gh_user:
290        gh_user = await get_username(gh_token)
291    if not all([gh_user, gh_repo, gh_token]):
292        return False
293    release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
294    if release_id == 0:
295        logger.warning(f"ReleaseID not found on {gh_user}/{gh_repo}. Release: {release_name}, Tag: {tag_name}")
296        return False
297    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2026-03-10"}
298    releases = await list_releases(gh_user, gh_repo, gh_token)
299    release = next((x for x in releases.values() if x["id"] == release_id), {})
300
301    # del release
302    await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases/{release_id}", "DELETE", headers=headers, proxy=PROXY.GITHUB, rformat="content")
303    logger.success(f"Deleted release {tag_name} {release_name} on {gh_user}/{gh_repo}")
304
305    # del tag
306    if del_tag:
307        tag_name = release["tag_name"]
308        await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/git/refs/tags/{tag_name}", "DELETE", headers=headers, proxy=PROXY.GITHUB, rformat="content")
309        logger.success(f"Deleted tag {tag_name} on {gh_user}/{gh_repo}")
310    return True
311
312
313async def gh_clean_assets(
314    tag_name: str = "",
315    release_name: str = "",
316    gh_user: str = DB.GH_USER,
317    gh_repo: str = DB.GH_REPO,
318    gh_token: str = DB.GH_TOKEN,
319    *,
320    keep_latest: int = 0,
321    only_suffix: list[str] | None = None,
322) -> int:
323    """Delete assets from GitHub.
324
325    Args:
326        keep_latest: keep latest N assets
327        only_suffix: only delete assets with these formats. (e.g. [".mkv", ".mp4"])
328
329    Returns:
330        number of deleted assets
331    """
332    if not gh_user:
333        gh_user = await get_username(gh_token)
334    if not all([gh_user, gh_repo, gh_token]):
335        return 0
336    assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
337    files = list(assets.values())
338    if only_suffix is not None:
339        files = [x for name, x in assets.items() if name.endswith(tuple(only_suffix))]
340    files = sorted(files, key=lambda x: x["updated_at"], reverse=True)
341    for f in files[keep_latest:]:
342        logger.info(f"Deleting asset {f['name']} from {gh_user}/{gh_repo}, Release: {release_name}, Tag: {tag_name}")
343        await gh_del_asset(f["name"], tag_name, release_name, gh_user, gh_repo, gh_token)
344    num_del = len(files) - keep_latest
345    logger.success(f"Deleted {num_del} assets from {gh_user}/{gh_repo}, Release: {release_name}, Tag: {tag_name}")
346    return num_del
347
348
349async def list_gists(gh_token: str = TOKEN.GITHUB) -> list[dict]:
350    """List gists from user on GitHub."""
351    if not gh_token:
352        return []
353    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2026-03-10"}
354    params = {"page": 1, "per_page": 100}
355    gists = []
356    resp = await hx_req("https://api.github.com/gists", headers=headers, params=params, proxy=PROXY.GITHUB)
357    if not isinstance(resp, list):
358        return []
359    gists.extend(resp)
360    while len(resp) == params["per_page"]:
361        params["page"] += 1
362        resp = await hx_req("https://api.github.com/gists", headers=headers, params=params, proxy=PROXY.GITHUB)
363        if not isinstance(resp, list):
364            return gists
365        gists.extend(resp)
366    return gists
367
368
369async def create_gist(content: str, filename: str, gh_token: str = TOKEN.GITHUB) -> str:
370    """Create a gist on GitHub."""
371    if not all([content, filename, gh_token]):
372        return ""
373    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2026-03-10"}
374    today = nowdt(TZ).strftime("%Y-%m-%d")
375    gists = await list_gists(gh_token)
376    gist_id = next((x["id"] for x in gists if f"bennybot-{today}" == x["description"]), None)
377    if not gist_id:
378        # Create new gist
379        resp = await hx_req(
380            "https://api.github.com/gists",
381            "POST",
382            json_data={
383                "public": False,
384                "description": f"bennybot-{today}",
385                "files": {filename: {"content": content}},
386            },
387            check_keys=["id"],
388            headers=headers,
389            proxy=PROXY.GITHUB,
390        )
391        if "id" in resp:
392            return f"https://gists.github.com/{resp['id']}#file-{filename.replace('.', '-')}"
393        return ""
394
395    # update existing gist
396    gist = await hx_req(f"https://api.github.com/gists/{gist_id}", headers=headers, check_kv={"id": gist_id}, proxy=PROXY.GITHUB)
397    if "files" not in gist:
398        return ""
399
400    files = {k: {"content": v["content"]} for k, v in gist["files"].items()}
401    files[filename] = {"content": content}
402    resp = await hx_req(
403        f"https://api.github.com/gists/{gist_id}",
404        "PATCH",
405        json_data={"description": gist["description"], "files": files},
406        check_kv={"id": gist_id},
407        headers=headers,
408        proxy=PROXY.GITHUB,
409    )
410    if "id" in resp:
411        return f"https://gists.github.com/{gist_id}#file-{filename.replace('.', '-')}"
412    return ""