main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import base64
  4import string
  5from pathlib import Path
  6from typing import Literal
  7
  8import anyio
  9from glom import Path as GlomPath
 10from glom import glom
 11from httpx import AsyncClient
 12from loguru import logger
 13
 14from config import DB, PROXY, cache
 15from networking import download_file, hx_req
 16
 17
 18async def list_assets(
 19    tag_name: str = "",
 20    release_name: str = "",
 21    gh_user: str = DB.GH_USER,
 22    gh_repo: str = DB.GH_REPO,
 23    gh_token: str = DB.GH_TOKEN,
 24) -> dict:
 25    """List GitHub assets of a release.
 26
 27    `tag_name` and `release_name` must be specified at least one.
 28
 29    Returns:
 30        {
 31            "asset_name": {
 32                "name": "asset_name",
 33                "url": "asset_url",
 34                "id": "asset_id"
 35                "updated_at": "2025-08-29T06:27:52Z",
 36                }
 37        }
 38    """
 39    if not gh_user:
 40        gh_user = await get_username(gh_token)
 41    release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
 42    if not all([release_id, gh_user, gh_repo, gh_token]):
 43        return {}
 44    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
 45    resp = await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases/{release_id}/assets", headers=headers, proxy=PROXY.GITHUB, check_keys=["0.browser_download_url"])
 46    if isinstance(resp, list):
 47        return {x["name"]: {"url": x["browser_download_url"], "id": x["id"], "updated_at": x["updated_at"], "name": x["name"]} for x in resp}  # type: ignore
 48    if resp.get("hx_error"):
 49        logger.error(resp["hx_error"])
 50    return {}
 51
 52
 53@cache.memoize(ttl=3)
 54async def list_releases(
 55    gh_user: str = DB.GH_USER,
 56    gh_repo: str = DB.GH_REPO,
 57    gh_token: str = DB.GH_TOKEN,
 58) -> dict:
 59    """List GitHub releases."""
 60    if not gh_user:
 61        gh_user = await get_username(gh_token)
 62    if not all([gh_user, gh_repo, gh_token]):
 63        return {}
 64    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
 65    releases = []
 66    per_page = 100  # maximum is 100
 67    page = 1
 68    resp = await hx_req(
 69        f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases",
 70        params={"per_page": per_page, "page": page},
 71        headers=headers,
 72        proxy=PROXY.GITHUB,
 73        check_keys=["0.id"],
 74    )
 75    if isinstance(resp, list):
 76        releases.extend(resp)
 77        while len(resp) == per_page:
 78            page += 1
 79            resp = await hx_req(
 80                f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases",
 81                params={"per_page": per_page, "page": page},
 82                headers=headers,
 83                proxy=PROXY.GITHUB,
 84                check_keys=["0.id"],
 85            )
 86            if not isinstance(resp, list):
 87                break
 88            releases.extend(resp)
 89    logger.success(f"Found {len(releases)} releases of {gh_user}/{gh_repo}")
 90    return {x["name"]: x for x in releases}
 91
 92
 93async def get_release_id(
 94    tag_name: str = "",
 95    release_name: str = "",
 96    gh_user: str = DB.GH_USER,
 97    gh_repo: str = DB.GH_REPO,
 98    gh_token: str = DB.GH_TOKEN,
 99) -> int:
100    """Get release id by release name or tag."""
101    if not gh_user:
102        gh_user = await get_username(gh_token)
103    releases = await list_releases(gh_user, gh_repo, gh_token)
104    release_id = 0
105    if release_name:
106        release_id = glom(releases, GlomPath(release_name, "id"), default=0)
107    if release_id == 0 and tag_name:
108        for release in releases.values():
109            if release["tag_name"] == tag_name:
110                release_id = release["id"]
111                break
112    if release_id == 0:
113        logger.warning(f"Release is not found. Release: {release_name}, Tag: {tag_name}")
114    return release_id
115
116
117async def gh_download_asset(
118    asset_name: str,
119    tag_name: str = "",
120    release_name: str = "",
121    gh_user: str = DB.GH_USER,
122    gh_repo: str = DB.GH_REPO,
123    gh_token: str = DB.GH_TOKEN,
124) -> str:
125    """Get asset from GitHub release."""
126    if not gh_user:
127        gh_user = await get_username(gh_token)
128    if not all([asset_name, gh_user, gh_repo, gh_token]):
129        return ""
130    assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
131    asset_url = glom(assets, GlomPath(asset_name, "url"), default="")
132    path = await download_file(asset_url, proxy=PROXY.GITHUB, stream=True)
133    if not Path(path).is_file():
134        logger.error(f"Get github asset failed. Asset Name: {asset_name}, Release: {release_name}, Tag: {tag_name}")
135    return path
136
137
138async def gh_upload_asset(
139    path: str | Path,
140    asset_name: str = "",
141    tag_name: str = "",
142    release_name: str = "",
143    gh_user: str = DB.GH_USER,
144    gh_repo: str = DB.GH_REPO,
145    gh_token: str = DB.GH_TOKEN,
146    *,
147    overwrite: bool = False,
148) -> str:
149    """Upload asset to GitHub release.
150
151    Returns:
152        url of the uploaded asset
153    """
154    if not path or not Path(path).expanduser().resolve().is_file():
155        logger.error(f"File not found: {path}")
156        return ""
157    if Path(path).stat().st_size >= 2 * 1024**3:  # maximum is 2 GB
158        logger.error(f"File {Path(path).name} is too large: {Path(path).stat().st_size} bytes")
159        return ""
160
161    if not gh_user:
162        gh_user = await get_username(gh_token)
163    release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
164    if release_id == 0:
165        logger.info(f"Release not exists. Release: {release_name}, Tag: {tag_name}. Create a new release.")
166        release_id = await gh_create_release(tag_name, release_name, gh_user, gh_repo, gh_token)
167        if release_id == 0:
168            logger.error(f"Create release failed. Release: {release_name}, Tag: {tag_name}")
169            return ""
170    if not asset_name:
171        asset_name = Path(path).name
172    # encode the asset_name if needed. allowed characters: a-z, A-Z, 0-9, -, _, ., ~
173    if any(c not in string.ascii_letters + string.digits + "-_.~" for c in asset_name):
174        stem = Path(asset_name).stem
175        suffix = Path(asset_name).suffix
176        asset_name = base64.urlsafe_b64encode(stem.encode()).decode().rstrip("=") + suffix
177    # check if asset exists
178    assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
179    if asset_name in assets:
180        if not overwrite:
181            logger.info(f"Asset already exists. Asset: {asset_name}, Release: {release_name}, Tag: {tag_name}")
182            return assets[asset_name]["url"]
183        logger.warning(f"Delete existing asset. Asset: {asset_name}, Release: {release_name}, Tag: {tag_name}")
184        await gh_del_asset(asset_name, tag_name, release_name, gh_user, gh_repo, gh_token)
185
186    file_size = Path(path).stat().st_size
187    headers = {
188        "Accept": "application/vnd.github+json",
189        "Authorization": f"Bearer {gh_token}",
190        "X-GitHub-Api-Version": "2022-11-28",
191        "Content-Type": "application/octet-stream",
192        "Content-Length": str(file_size),
193    }
194    # use stream uploading
195    try:
196        logger.info(f"Uploading {asset_name} to GitHub Release: {release_name}, Tag: {tag_name}")
197        async with AsyncClient(headers=headers, proxy=PROXY.GITHUB, follow_redirects=True, timeout=None) as client:  # noqa: S113
198            async with await anyio.open_file(path, "rb") as f:
199                response = await client.post(url=f"https://uploads.github.com/repos/{gh_user}/{gh_repo}/releases/{release_id}/assets?name={asset_name}", headers=headers, content=f)
200                response.raise_for_status()
201            logger.success(f"Uploaded {asset_name} to GitHub Release: {release_name}, Tag: {tag_name}")
202            return response.json()["browser_download_url"]
203    except Exception as e:
204        error = f"Failed to upload to GitHub: {e}"
205        logger.error(error)
206    return ""
207
208
209async def gh_del_asset(
210    asset_name: str = "",
211    tag_name: str = "",
212    release_name: str = "",
213    gh_user: str = DB.GH_USER,
214    gh_repo: str = DB.GH_REPO,
215    gh_token: str = DB.GH_TOKEN,
216) -> bool:
217    """Delete GitHub asset."""
218    if not gh_user:
219        gh_user = await get_username(gh_token)
220
221    assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
222    asset_id = glom(assets, GlomPath(asset_name, "id"), default=0)
223    if asset_id == 0:
224        logger.warning(f"Asset not found. Asset Name: {asset_name}, Release: {release_name}, Tag: {tag_name}")
225        return False
226    if not all([asset_name, gh_user, gh_repo, gh_token]):
227        return False
228    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
229    await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases/assets/{asset_id}", "DELETE", headers=headers, proxy=PROXY.GITHUB, rformat="content")
230    logger.success(f"Deleted asset {asset_name} from GitHub Release: {release_name}, Tag: {tag_name}")
231    return True
232
233
234@cache.memoize(ttl=0)
235async def get_username(gh_token: str):
236    """Get github username from API token."""
237    if not gh_token:
238        return ""
239    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
240    resp = await hx_req("https://api.github.com/user", headers=headers, proxy=PROXY.GITHUB, check_keys=["login"], check_kv={"type": "User"})
241    token_user = resp.get("login", "")
242    if DB.GH_USER and token_user != DB.GH_USER:
243        logger.warning(f"GitHub username mismatch: {token_user} != {DB.GH_USER}")
244    return token_user
245
246
247async def gh_create_release(
248    tag_name: str,
249    release_name: str = "",
250    gh_user: str = DB.GH_USER,
251    gh_repo: str = DB.GH_REPO,
252    gh_token: str = DB.GH_TOKEN,
253    *,
254    body: str = "",
255    draft: bool = False,
256    prerelease: bool = False,
257    make_latest: Literal["true", "false", "legacy"] = "true",
258) -> int:
259    """Create a new release on GitHub."""
260    if not gh_user:
261        gh_user = await get_username(gh_token)
262    if not all([tag_name, gh_user, gh_repo, gh_token]):
263        return 0
264    # check if release already exists
265    release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
266    if release_id:
267        return release_id
268    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
269    payload = {"tag_name": tag_name, "name": release_name or tag_name, "body": body, "draft": draft, "prerelease": prerelease, "make_latest": make_latest}
270    resp = await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases", "POST", headers=headers, proxy=PROXY.GITHUB, json_data=payload, check_kv={"tag_name": tag_name})
271    if resp.get("hx_error"):
272        logger.error(resp["hx_error"])
273        return 0
274    logger.success(f"Created release {tag_name} on {gh_user}/{gh_repo}")
275    return resp.get("id", 0)
276
277
278async def gh_del_release(
279    tag_name: str = "",
280    release_name: str = "",
281    gh_user: str = DB.GH_USER,
282    gh_repo: str = DB.GH_REPO,
283    gh_token: str = DB.GH_TOKEN,
284    *,
285    del_tag: bool = True,
286) -> bool:
287    """Delete a GitHub release."""
288    if not gh_user:
289        gh_user = await get_username(gh_token)
290    if not all([gh_user, gh_repo, gh_token]):
291        return False
292    release_id = await get_release_id(tag_name, release_name, gh_user, gh_repo, gh_token)
293    if release_id == 0:
294        logger.warning(f"ReleaseID not found on {gh_user}/{gh_repo}. Release: {release_name}, Tag: {tag_name}")
295        return False
296    headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {gh_token}", "X-GitHub-Api-Version": "2022-11-28"}
297    releases = await list_releases(gh_user, gh_repo, gh_token)
298    release = next((x for x in releases.values() if x["id"] == release_id), {})
299
300    # del release
301    await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/releases/{release_id}", "DELETE", headers=headers, proxy=PROXY.GITHUB, rformat="content")
302    logger.success(f"Deleted release {tag_name} {release_name} on {gh_user}/{gh_repo}")
303
304    # del tag
305    if del_tag:
306        tag_name = release["tag_name"]
307        await hx_req(f"https://api.github.com/repos/{gh_user}/{gh_repo}/git/refs/tags/{tag_name}", "DELETE", headers=headers, proxy=PROXY.GITHUB, rformat="content")
308        logger.success(f"Deleted tag {tag_name} on {gh_user}/{gh_repo}")
309    return True
310
311
312async def gh_clean_assets(
313    tag_name: str = "",
314    release_name: str = "",
315    gh_user: str = DB.GH_USER,
316    gh_repo: str = DB.GH_REPO,
317    gh_token: str = DB.GH_TOKEN,
318    *,
319    keep_latest: int = 0,
320    only_suffix: list[str] | None = None,
321) -> int:
322    """Delete assets from GitHub.
323
324    Args:
325        keep_latest: keep latest N assets
326        only_suffix: only delete assets with these formats. (e.g. [".mkv", ".mp4"])
327
328    Returns:
329        number of deleted assets
330    """
331    if not gh_user:
332        gh_user = await get_username(gh_token)
333    if not all([gh_user, gh_repo, gh_token]):
334        return 0
335    assets = await list_assets(tag_name, release_name, gh_user, gh_repo, gh_token)
336    files = list(assets.values())
337    if only_suffix is not None:
338        files = [x for name, x in assets.items() if name.endswith(tuple(only_suffix))]
339    files = sorted(files, key=lambda x: x["updated_at"], reverse=True)
340    for f in files[keep_latest:]:
341        logger.info(f"Deleting asset {f['name']} from {gh_user}/{gh_repo}, Release: {release_name}, Tag: {tag_name}")
342        await gh_del_asset(f["name"], tag_name, release_name, gh_user, gh_repo, gh_token)
343    num_del = len(files) - keep_latest
344    logger.success(f"Deleted {num_del} assets from {gh_user}/{gh_repo}, Release: {release_name}, Tag: {tag_name}")
345    return num_del