main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import contextlib
  4import importlib.metadata
  5import os
  6import platform
  7import shutil
  8import subprocess
  9from pathlib import Path
 10
 11from ffmpeg.asyncio import FFmpeg
 12from glom import glom
 13from loguru import logger
 14from pyrogram.client import Client
 15from pyrogram.types import Message
 16
 17from config import DOWNLOAD_DIR, PREFIX, TID, TOKEN
 18from messages.progress import modify_progress
 19from messages.sender import send2tg
 20from messages.utils import blockquote, startswith_prefix
 21from utils import rand_string, strings_list
 22
 23
 24async def get_bot_version(client: Client, message: Message, **kwargs):
 25    """Get detail version of this bot."""
 26    if not startswith_prefix(message.content, prefix=PREFIX.VERSION):
 27        return
 28    commit_sha = os.getenv("COMMIT_SHA", "")
 29    commit_sha = f"{commit_sha[:7]}" if commit_sha else ""
 30    commit_date = os.getenv("COMMIT_DATE", "")
 31    python_version = platform.python_version()
 32    ffmpeg_version = await get_ffmpeg_version()
 33    python_packages = {x.metadata["Name"]: x.version for x in importlib.metadata.distributions()}
 34
 35    header = f"**Bot**: {commit_sha} ({commit_date})\n"
 36    if ffmpeg_version:
 37        header += f"**FFmpeg**: {ffmpeg_version}\n"
 38    header += f"**Python**: {python_version}\n**Packages**:\n"
 39    pkgs = ""
 40    for name, version in sorted(python_packages.items(), key=lambda x: x[0].lower()):
 41        pkgs += f"{name}=={version}\n"
 42    await send2tg(client, message, texts=f"{header}{blockquote(pkgs)}", **kwargs)
 43
 44
 45async def get_ffmpeg_version() -> str:
 46    ffmpeg = FFmpeg().option("version")
 47    with contextlib.suppress(Exception):
 48        res = await ffmpeg.execute()
 49        if isinstance(res, bytes):
 50            lines = res.decode("utf-8").splitlines()
 51            return lines[0].split(" ")[2]
 52    return ""
 53
 54
 55async def update_bot(message: Message):
 56    """Update this bot."""
 57    if message.text != "/update":
 58        return
 59    if glom(message, "chat.type.name", default="") not in ["PRIVATE", "BOT"]:
 60        return
 61    handle = glom(message, "from_user.username", default="")
 62    uid = glom(message, "from_user.id", default="")
 63    admins = [s.lower().removeprefix("@") for s in strings_list(TID.ADMIN)]
 64    if str(handle) not in admins and str(uid) not in admins:
 65        return
 66    if not (Path("/.dockerenv").exists() or Path("/run/.containerenv").exists()):
 67        logger.info("Not in container, skip update.")
 68        return
 69    status = await message.reply_text(text="⌛️开始更新")
 70
 71    async def run(cmd: list[str]) -> bool:
 72        """Run a command."""
 73        env = os.environ.copy()
 74        env["GH_TOKEN"] = TOKEN.GITHUB
 75        env["GH_CONFIG_DIR"] = DOWNLOAD_DIR  # must be writable
 76        try:
 77            result = subprocess.run(  # noqa: ASYNC221, S603
 78                cmd,
 79                env=env,
 80                check=True,  # 若命令执行失败(返回码非0),抛出 CalledProcessError
 81                capture_output=True,
 82                text=True,  # 输出以字符串形式返回
 83            )
 84            logger.info(f"{result.stdout}\n{result.stderr}".strip())
 85        except subprocess.CalledProcessError as e:
 86            logger.error(f"❌升级失败,错误码:{e.returncode}, 错误信息:{e.stderr}")
 87            await modify_progress(status, text=f"❌升级失败,错误码:{e.returncode}, 错误信息:{e.stderr}", force_update=True)
 88            return False
 89        except Exception as e:
 90            logger.error(f"❌升级失败\n{e}")
 91            await modify_progress(status, text=f"❌升级失败\n{e}", force_update=True)
 92            return False
 93        return True
 94
 95    Path(DOWNLOAD_DIR).mkdir(parents=True, exist_ok=True)
 96    root = Path(DOWNLOAD_DIR).joinpath(rand_string(10)).as_posix()
 97
 98    git_clone_cmd = ["gh", "repo", "clone", "https://github.com/zydou/bennybot", root]
 99    if not await run(git_clone_cmd):
100        return
101
102    uv_sync_cmd = [
103        "uv",
104        "sync",
105        "--frozen",
106        "--no-dev",
107        "--no-python-downloads",
108        "--no-install-project",
109        "--no-cache",
110        "--no-editable",
111        "--project",
112        root,
113        "--directory",
114        root,
115        "--python",
116        "/venv/bin/python",
117    ]
118    if not await run(uv_sync_cmd):
119        return
120    sync_dir(Path(root).joinpath("src"), "/app")
121    await modify_progress(status, text="✅升级成功\n重启Bot,请稍后...", force_update=True)
122    if Path(root).is_dir():
123        shutil.rmtree(root)
124    os._exit(0)  # restarting is managed by s6-overlay
125
126
127def sync_dir(src: str | Path, dst: str | Path):
128    """类似 rsync --delete 的同步操作.
129
130    1. 将 src 中的所有文件移动到 dst(保留目录结构)
131    2. 删除 dst 中存在但 src 中不存在的文件和目录
132
133    Args:
134        src: 源目录(字符串或 Path 对象)
135        dst: 目标目录(字符串或 Path 对象)
136    """
137    src_root = Path(src)
138    dst_root = Path(dst)
139    files = set()  # 存储应保留的所有文件路径
140    # --------------------------
141    # 第一步:移动 src 的文件到 dst(保留层级结构)
142    # --------------------------
143    for src_file in src_root.rglob("*"):
144        if src_file.is_file():
145            # 计算相对路径,例如 src/a/b.txt -> a/b.txt
146            rel_path = src_file.relative_to(src_root)
147            # 拼接目标路径:dst / a/b.txt
148            dst_file = dst_root / rel_path
149
150            # 确保目标文件的父目录存在(parents=True 表示递归创建)
151            dst_file.parent.mkdir(parents=True, exist_ok=True)
152
153            # 移动文件(同文件系统下为原子重命名,自动覆盖)
154            files.add(dst_file.as_posix())
155            shutil.move(src_file, dst_file)
156
157    # --------------------------
158    # 第二步:删除 dst 中多余的文件和目录
159    # --------------------------
160    for dst_file in dst_root.rglob("*"):
161        fname = dst_file.as_posix()
162        if dst_file.is_file() and fname not in files and not fname.startswith(DOWNLOAD_DIR):
163            dst_file.unlink()
164    del_empty_dir(dst_root)
165
166
167def del_empty_dir(root: Path):
168    """Delete empty directory."""
169    # 递归遍历:先处理所有子目录(深度优先)
170    # 只有先把子目录删干净,才能判断父目录是否为空
171    for child in root.iterdir():
172        if child.is_dir():
173            del_empty_dir(child)
174
175    # 检查并删除:此时子目录已处理完,检查当前目录是否为空
176    # 使用 next(..., None) 检查是否有内容,比 list(root.iterdir()) 更高效
177    if next(root.iterdir(), None) is None:
178        try:
179            root.rmdir()  # rmdir 只能删除空目录,非常安全
180        except PermissionError:
181            logger.error(f"[跳过] 权限不足: {root}")
182        except Exception as e:
183            logger.error(f"[错误] 无法删除 {root}: {e}")