main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import contextlib
4import importlib.metadata
5import os
6import platform
7import shutil
8import subprocess
9from pathlib import Path
10
11from ffmpeg.asyncio import FFmpeg
12from glom import glom
13from loguru import logger
14from pyrogram.client import Client
15from pyrogram.types import Message
16
17from config import DOWNLOAD_DIR, PREFIX, TID, TOKEN
18from messages.progress import modify_progress
19from messages.sender import send2tg
20from messages.utils import blockquote, startswith_prefix
21from utils import rand_string, strings_list
22
23
24async def get_bot_version(client: Client, message: Message, **kwargs):
25 """Get detail version of this bot."""
26 if not startswith_prefix(message.content, prefix=PREFIX.VERSION):
27 return
28 commit_sha = os.getenv("COMMIT_SHA", "")
29 commit_sha = f"{commit_sha[:7]}" if commit_sha else ""
30 commit_date = os.getenv("COMMIT_DATE", "")
31 python_version = platform.python_version()
32 ffmpeg_version = await get_ffmpeg_version()
33 python_packages = {x.metadata["Name"]: x.version for x in importlib.metadata.distributions()}
34
35 header = f"**Bot**: {commit_sha} ({commit_date})\n"
36 if ffmpeg_version:
37 header += f"**FFmpeg**: {ffmpeg_version}\n"
38 header += f"**Python**: {python_version}\n**Packages**:\n"
39 pkgs = ""
40 for name, version in sorted(python_packages.items(), key=lambda x: x[0].lower()):
41 pkgs += f"{name}=={version}\n"
42 await send2tg(client, message, texts=f"{header}{blockquote(pkgs)}", **kwargs)
43
44
45async def get_ffmpeg_version() -> str:
46 ffmpeg = FFmpeg().option("version")
47 with contextlib.suppress(Exception):
48 res = await ffmpeg.execute()
49 if isinstance(res, bytes):
50 lines = res.decode("utf-8").splitlines()
51 return lines[0].split(" ")[2]
52 return ""
53
54
55async def update_bot(message: Message):
56 """Update this bot."""
57 if message.text != "/update":
58 return
59 if glom(message, "chat.type.name", default="") not in ["PRIVATE", "BOT"]:
60 return
61 handle = glom(message, "from_user.username", default="")
62 uid = glom(message, "from_user.id", default="")
63 admins = [s.lower().removeprefix("@") for s in strings_list(TID.ADMIN)]
64 if str(handle) not in admins and str(uid) not in admins:
65 return
66 if not (Path("/.dockerenv").exists() or Path("/run/.containerenv").exists()):
67 logger.info("Not in container, skip update.")
68 return
69 status = await message.reply_text(text="⌛️开始更新")
70
71 async def run(cmd: list[str]) -> bool:
72 """Run a command."""
73 env = os.environ.copy()
74 env["GH_TOKEN"] = TOKEN.GITHUB
75 env["GH_CONFIG_DIR"] = DOWNLOAD_DIR # must be writable
76 try:
77 result = subprocess.run( # noqa: ASYNC221, S603
78 cmd,
79 env=env,
80 check=True, # 若命令执行失败(返回码非0),抛出 CalledProcessError
81 capture_output=True,
82 text=True, # 输出以字符串形式返回
83 )
84 logger.info(f"{result.stdout}\n{result.stderr}".strip())
85 except subprocess.CalledProcessError as e:
86 logger.error(f"❌升级失败,错误码:{e.returncode}, 错误信息:{e.stderr}")
87 await modify_progress(status, text=f"❌升级失败,错误码:{e.returncode}, 错误信息:{e.stderr}", force_update=True)
88 return False
89 except Exception as e:
90 logger.error(f"❌升级失败\n{e}")
91 await modify_progress(status, text=f"❌升级失败\n{e}", force_update=True)
92 return False
93 return True
94
95 Path(DOWNLOAD_DIR).mkdir(parents=True, exist_ok=True)
96 root = Path(DOWNLOAD_DIR).joinpath(rand_string(10)).as_posix()
97
98 git_clone_cmd = ["gh", "repo", "clone", "https://github.com/zydou/bennybot", root]
99 if not await run(git_clone_cmd):
100 return
101
102 uv_sync_cmd = [
103 "uv",
104 "sync",
105 "--frozen",
106 "--no-dev",
107 "--no-python-downloads",
108 "--no-install-project",
109 "--no-cache",
110 "--no-editable",
111 "--project",
112 root,
113 "--directory",
114 root,
115 "--python",
116 "/venv/bin/python",
117 ]
118 if not await run(uv_sync_cmd):
119 return
120 sync_dir(Path(root).joinpath("src"), "/app")
121 await modify_progress(status, text="✅升级成功\n重启Bot,请稍后...", force_update=True)
122 if Path(root).is_dir():
123 shutil.rmtree(root)
124 os._exit(0) # restarting is managed by s6-overlay
125
126
127def sync_dir(src: str | Path, dst: str | Path):
128 """类似 rsync --delete 的同步操作.
129
130 1. 将 src 中的所有文件移动到 dst(保留目录结构)
131 2. 删除 dst 中存在但 src 中不存在的文件和目录
132
133 Args:
134 src: 源目录(字符串或 Path 对象)
135 dst: 目标目录(字符串或 Path 对象)
136 """
137 src_root = Path(src)
138 dst_root = Path(dst)
139 files = set() # 存储应保留的所有文件路径
140 # --------------------------
141 # 第一步:移动 src 的文件到 dst(保留层级结构)
142 # --------------------------
143 for src_file in src_root.rglob("*"):
144 if src_file.is_file():
145 # 计算相对路径,例如 src/a/b.txt -> a/b.txt
146 rel_path = src_file.relative_to(src_root)
147 # 拼接目标路径:dst / a/b.txt
148 dst_file = dst_root / rel_path
149
150 # 确保目标文件的父目录存在(parents=True 表示递归创建)
151 dst_file.parent.mkdir(parents=True, exist_ok=True)
152
153 # 移动文件(同文件系统下为原子重命名,自动覆盖)
154 files.add(dst_file.as_posix())
155 shutil.move(src_file, dst_file)
156
157 # --------------------------
158 # 第二步:删除 dst 中多余的文件和目录
159 # --------------------------
160 for dst_file in dst_root.rglob("*"):
161 fname = dst_file.as_posix()
162 if dst_file.is_file() and fname not in files and not fname.startswith(DOWNLOAD_DIR):
163 dst_file.unlink()
164 del_empty_dir(dst_root)
165
166
167def del_empty_dir(root: Path):
168 """Delete empty directory."""
169 # 递归遍历:先处理所有子目录(深度优先)
170 # 只有先把子目录删干净,才能判断父目录是否为空
171 for child in root.iterdir():
172 if child.is_dir():
173 del_empty_dir(child)
174
175 # 检查并删除:此时子目录已处理完,检查当前目录是否为空
176 # 使用 next(..., None) 检查是否有内容,比 list(root.iterdir()) 更高效
177 if next(root.iterdir(), None) is None:
178 try:
179 root.rmdir() # rmdir 只能删除空目录,非常安全
180 except PermissionError:
181 logger.error(f"[跳过] 权限不足: {root}")
182 except Exception as e:
183 logger.error(f"[错误] 无法删除 {root}: {e}")