Commit ad2629f

benny-dou <60535774+benny-dou@users.noreply.github.com>
2025-06-09 13:49:09
feat(prefix): support multiple commands for a single funciton
1 parent 7230722
src/llm/gpt.py
@@ -24,8 +24,7 @@ HELP = f"""🤖**GPT对话**
 以 `{PREFIX.GPT}` 回复消息可将其加入上下文
 暂不支持视频, 可先用`{PREFIX.ASR}`命令转为文字后再调用`{PREFIX.GPT}`
 
-⚙️模型配置:
-`{PREFIX.GPT}`: 默认使用 **{GPT.DEFAULT_PROVIDER.lower()}** 模型
+⚙️模型配置: 默认使用 **{GPT.DEFAULT_PROVIDER.lower()}** 模型
 
 🔄使用以下命令强制切换模型:
 `/gpt`: **{GPT.OPENAI_MODEL_NAME}** {image_emoji(GPT.OPENAI_ACCEPT_IMAGE)}
@@ -82,7 +81,7 @@ async def gpt_response(client: Client, message: Message, *, gpt_stream: bool = T
     # ruff: noqa: RET502, RET503
     info = parse_msg(message)
     # send docs if message == "/ai", without reply
-    if info["mtype"] == "text" and equal_prefix(info["text"], prefix=[PREFIX.GPT, "/gpt", "/gemini", "/ds", "/qwen", "/grok", "/doubao"]) and not message.reply_to_message:
+    if info["mtype"] == "text" and equal_prefix(info["text"], prefix=[PREFIX.GPT]) and not message.reply_to_message:
         await send2tg(client, message, texts=HELP, **kwargs)
         return {}
     if info["mtype"] == "text" and equal_prefix(info["text"], prefix=[PREFIX.GENIMG]) and not message.reply_to_message:
src/llm/models.py
@@ -58,9 +58,7 @@ def get_model_id(text: str, reply_text: str, context_type: str) -> tuple[str, st
         "gemini": GPT.GEMINI_MODEL,
     }
     # parse from command prefix. If use /ds command, force use DeepSeek model.
-    if startswith_prefix(text, prefix=[PREFIX.GPT]):
-        model_id = providers.get(GPT.DEFAULT_PROVIDER.lower(), GPT.OPENAI_MODEL)
-    elif startswith_prefix(text, prefix=["/gpt"]):
+    if startswith_prefix(text, prefix=["/gpt"]):
         model_id = GPT.OPENAI_MODEL
     elif startswith_prefix(text, prefix=["/ds"]):
         model_id = GPT.DEEPSEEK_MODEL
@@ -74,6 +72,8 @@ def get_model_id(text: str, reply_text: str, context_type: str) -> tuple[str, st
         model_id = GEMINI.IMG_MODEL
     elif startswith_prefix(text, prefix=["/gemini"]):
         model_id = GPT.GEMINI_MODEL
+    else:
+        model_id = providers.get(GPT.DEFAULT_PROVIDER.lower(), GPT.OPENAI_MODEL)
 
     # fallback to omni model if needed
     omni_providers = {
src/messages/utils.py
@@ -14,34 +14,80 @@ from config import TEXT_LENGTH
 from utils import myself, readable_size, to_int
 
 
-def startswith_prefix(text: str | None = None, prefix: list[str] | None = None, ignore_prefix: list[str] | None = None) -> bool:
+def startswith_prefix(text: str, prefix: str | list[str], ignore_prefix: str | list[str] | None = None) -> bool:
     """Check if the message text starts with the given command prefixes.
 
+    support prefix:
+      "/cmd"
+      "/cmd1, /cmd2"
+      ["/cmd1", "/cmd2"]
+      ["/cmd1, /cmd2"]
+
     Args:
         text (str): The message text.
-        prefix (list[str], optional): Command prefixes that are effective.
-        ignore_prefix (list[str], optional): Ignore these command prefixes.
+        prefix (str | list[str]): Command prefixes that are effective.
+        ignore_prefix (str | list[str], optional): Ignore these command prefixes.
     """
+
+    def norm_cmd(cmd: str) -> str:
+        return cmd.strip().lower().replace("!", "!").replace("?", "?")
+
     if not text:
         return False
-    if ignore_prefix and any(text.strip().lower().startswith(prefix) for prefix in ignore_prefix):
-        return False
-    return bool(prefix and any(text.strip().lower().startswith(prefix) for prefix in prefix))
+    ignore_prefix = ignore_prefix or []
+    if isinstance(ignore_prefix, str):
+        ignore_prefix = [ignore_prefix]
+    if isinstance(prefix, str):
+        prefix = [prefix]
+
+    for ignore_str in ignore_prefix:
+        for pfx in [x.strip() for x in ignore_str.split(",") if x.strip()]:
+            if pfx and norm_cmd(text).startswith(norm_cmd(pfx)):
+                return False
+
+    for prefix_str in prefix:
+        for pfx in [x.strip() for x in prefix_str.split(",") if x.strip()]:
+            if pfx and norm_cmd(text).startswith(norm_cmd(pfx)):
+                return True
+    return False
 
 
-def equal_prefix(text: str | None = None, prefix: list[str] | None = None, ignore_prefix: list[str] | None = None) -> bool:
+def equal_prefix(text: str, prefix: str | list[str], ignore_prefix: str | list[str] | None = None) -> bool:
     """Check if the message text equal with the given command prefixes.
 
+    support prefix:
+      "/cmd"
+      "/cmd1, /cmd2"
+      ["/cmd1", "/cmd2"]
+      ["/cmd1, /cmd2"]
+
     Args:
         text (str): The message text.
-        prefix (list[str], optional): Extra command prefixes that are effective.
-        ignore_prefix (list[str], optional): Ignore these command prefixes.
+        prefix (str | list[str]): Command prefixes that are effective.
+        ignore_prefix (str | list[str], optional): Ignore these command prefixes.
     """
-    if not text:
-        return False
-    if ignore_prefix and text.strip().lower() in ignore_prefix:
+
+    def norm_cmd(cmd: str) -> str:
+        return cmd.strip().lower().replace("!", "!").replace("?", "?")
+
+    if not text or not prefix:
         return False
-    return bool(prefix and text.strip().lower() in prefix)
+    ignore_prefix = ignore_prefix or []
+    if isinstance(ignore_prefix, str):
+        ignore_prefix = [ignore_prefix]
+    if isinstance(prefix, str):
+        prefix = [prefix]
+
+    for ignore_str in ignore_prefix:
+        for pfx in [x.strip() for x in ignore_str.split(",") if x.strip()]:
+            if pfx and norm_cmd(text) == norm_cmd(pfx):
+                return False
+
+    for prefix_str in prefix:
+        for pfx in [x.strip() for x in prefix_str.split(",") if x.strip()]:
+            if pfx and norm_cmd(text) == norm_cmd(pfx):
+                return True
+    return False
 
 
 def get_reply_to(msg_id: int, reply_msg_id: int | str) -> ReplyParameters:
src/others/download_external.py
@@ -25,11 +25,10 @@ HELP = f"""
 """
 
 
-async def download_url_in_message(client: Client, message: Message, extra_prefix: list[str] | None = None, **kwargs):
+async def download_url_in_message(client: Client, message: Message, extra_prefix: str = "", **kwargs):
     """Download the url from the message."""
     info = parse_msg(message)
-    extra_prefix = extra_prefix or []
-    if not startswith_prefix(info["text"], prefix=[PREFIX.WGET, *extra_prefix]):
+    if not startswith_prefix(info["text"], prefix=[PREFIX.WGET, extra_prefix]):
         return
     # send docs if message == "/wget", without reply
     if equal_prefix(message.text, prefix=[PREFIX.WGET]) and not message.reply_to_message:
src/config.py
@@ -3,7 +3,6 @@
 import asyncio
 import os
 from pathlib import Path
-from typing import ClassVar
 
 from cacheout import Cache
 from cutword import Cutter
@@ -67,21 +66,21 @@ class ENABLE:  # see fine-grained permission in `src/permission.py`
 
 
 class PREFIX:
-    MAIN: ClassVar[list[str]] = [x.strip().lower() for x in os.getenv("PREFIX_MAIN", "/benny,/dl,!dl,!dl,!下载,!下载").split(",")]
+    MAIN = os.getenv("PREFIX_MAIN", "/benny, /dl, !dl")
     AI_SUMMARY = os.getenv("PREFIX_AI_SUMMARY", "/summary").lower()
     ASR = os.getenv("PREFIX_ASR", "/asr").lower()
     AUDIO = os.getenv("PREFIX_AUDIO", "/audio").lower()
     CONVERT = os.getenv("PREFIX_CONVERT", "/convert").lower()  # convert image file to photo
-    GPT = os.getenv("PREFIX_GPT", "/ai").lower()
-    SUBTITLE = os.getenv("PREFIX_SUBTITLE", "/subtitle").lower()
-    WGET = os.getenv("PREFIX_WGET", "/wget").lower()
+    GPT = os.getenv("PREFIX_GPT", "/ai,/gpt,/gemini,/ds,/qwen,/doubao,/grok").lower()
+    SUBTITLE = os.getenv("PREFIX_SUBTITLE", "/subtitle,/sub").lower()
+    WGET = os.getenv("PREFIX_WGET", "/wget,/curl").lower()
     OCR = os.getenv("PREFIX_OCR", "/ocr").lower()
     PRICE = os.getenv("PREFIX_PRICE", "/price").lower()  # unify crypto, stock
     CRYPTO = os.getenv("PREFIX_CRYPTO", "/crypto").lower()  # crypto only
     STOCK = os.getenv("PREFIX_STOCK", "/stock").lower()  # stock only
     COMBINATION = os.getenv("PREFIX_COMBINATION", "/combine").lower()
     VOICE = os.getenv("PREFIX_VOICE", "/voice").lower()
-    SEARCH_YOUTUBE = os.getenv("PREFIX_SEARCH_YOUTUBE", "/ytb").lower()
+    SEARCH_YOUTUBE = os.getenv("PREFIX_SEARCH_YOUTUBE", "/youtube,/ytb").lower()
     SEARCH_GOOGLE = os.getenv("PREFIX_SEARCH_GOOGLE", "/google").lower()
     GENIMG = os.getenv("PREFIX_GENIMG", "/gen").lower()
     DANMU = os.getenv("PREFIX_DANMU", "/danmu").lower()
src/handler.py
@@ -130,8 +130,7 @@ async def handle_social_media(
     reply_msg_id: int = 0,
     *,
     need_prefix: bool = True,
-    cmd_prefix: list[str] | None = None,
-    ignore_prefix: list[str] | None = None,
+    cmd_prefix: str | None = None,
     prepend_sender_user: bool = False,
     douyin: bool = True,
     tiktok: bool = True,
@@ -155,8 +154,7 @@ async def handle_social_media(
                                              If set to 0, reply to the trigger message itself.
                                              If set to -1, do not send as a reply message.
         need_prefix (bool, optional): Need to start with PREFIX to call this funciton. Defaults to True.
-        cmd_prefix (list[str], optional): Extra prefix to call this function. Defaults to None.
-        ignore_prefix (list[str], optional): Ignore prefix to call this function. Defaults to None.
+        cmd_prefix (str, optional): prefix to call this function.
         prepend_sender_user (bool, optional): Prepend the sender's username to the message. Defaults to False.
         show_progress (bool, optional): Show a progress message on Telegram. Defaults to True.
         detail_progress (bool, optional): Show detailed progress (Only if show_proress is set to True). Defaults to False.
@@ -168,50 +166,40 @@ async def handle_social_media(
         kwargs["reply_msg_id"] = -1
     if cmd_prefix is None:
         cmd_prefix = PREFIX.MAIN
-    else:
-        cmd_prefix.extend(PREFIX.MAIN)
-    ignore_prefix = ignore_prefix or ["/dl4dw"]
     # these commands are handled in `handle_utilities`
-    ignore_prefix.extend(
-        [
-            "/doubao",
-            "/ds",
-            "/gemini",
-            "/gpt",
-            "/grok",
-            "/qwen",
-            PREFIX.ASR,
-            PREFIX.AI_SUMMARY,
-            PREFIX.AUDIO,
-            PREFIX.COMBINATION,
-            PREFIX.CONVERT,
-            PREFIX.CRYPTO,
-            PREFIX.DANMU,
-            PREFIX.GENIMG,
-            PREFIX.GPT,
-            PREFIX.OCR,
-            PREFIX.PRICE,
-            PREFIX.SEARCH_GOOGLE,
-            PREFIX.SEARCH_YOUTUBE,
-            PREFIX.STOCK,
-            PREFIX.SUBTITLE,
-            PREFIX.VOICE,
-            PREFIX.WGET,
-            PREFIX.FAYAN,
-        ]
-    )
+    ignore_prefix = [
+        PREFIX.ASR,
+        PREFIX.AI_SUMMARY,
+        PREFIX.AUDIO,
+        PREFIX.COMBINATION,
+        PREFIX.CONVERT,
+        PREFIX.CRYPTO,
+        PREFIX.DANMU,
+        PREFIX.GENIMG,
+        PREFIX.GPT,
+        PREFIX.OCR,
+        PREFIX.PRICE,
+        PREFIX.SEARCH_GOOGLE,
+        PREFIX.SEARCH_YOUTUBE,
+        PREFIX.STOCK,
+        PREFIX.SUBTITLE,
+        PREFIX.VOICE,
+        PREFIX.WGET,
+        PREFIX.FAYAN,
+    ]
+
     info = parse_msg(message)
     this_msg = message
     this_texts = info["text"]  # texts of the trigger message
     if startswith_prefix(this_texts, prefix=ignore_prefix):
         return None
-    if need_prefix and not startswith_prefix(this_texts, prefix=[*cmd_prefix, "/retry"]):
+    if need_prefix and not startswith_prefix(this_texts, prefix=[cmd_prefix, "/help", "/retry"]):
         return None
     kwargs |= params_from_msg_text(this_texts)  # merge the parameters from the message text
     if true(kwargs.get("target_chat")):
         kwargs["target_chat"] = to_int(kwargs["target_chat"])
     # message only contains prefix command
-    if equal_prefix(this_texts, prefix=[*cmd_prefix, "/retry"]):
+    if equal_prefix(this_texts, prefix=[cmd_prefix, "/help", "/retry"]):
         # without reply, send docs if message only contains prefix command
         if not message.reply_to_message:
             help_msg = get_social_media_help(info["cid"], info["ctype"], cmd_prefix)
@@ -221,7 +209,7 @@ async def handle_social_media(
         info = parse_msg(message, silent=True)  # parse again
 
     warn_msg = None
-    if not need_prefix and startswith_prefix(this_texts, prefix=cmd_prefix, ignore_prefix=ignore_prefix):
+    if not need_prefix and startswith_prefix(this_texts, prefix=cmd_prefix):
         warn_msg = await client.send_message(info["cid"], text="⚠️本会话中可直接发送链接, 无需添加命令前缀\n⚠️No need to add command prefix in this chat.")
 
     # add send_from_user.
@@ -233,7 +221,7 @@ async def handle_social_media(
         if matched["platform"]:
             logger.success(f"Matched: {matched}")
         kwargs |= matched
-        if startswith_prefix(this_texts, prefix=["/retry"], ignore_prefix=ignore_prefix):
+        if startswith_prefix(this_texts, prefix="/retry"):
             await del_db(matched["db_key"])
         if douyin and matched["platform"] == "douyin":
             return await preview_douyin(client, message, **kwargs)
@@ -312,14 +300,10 @@ def params_from_msg_text(texts: str | None = None) -> dict:
     return params
 
 
-def get_social_media_help(chat_id: int | str, ctype: str, prefixes: list[str] | None = None):
+def get_social_media_help(chat_id: int | str, ctype: str, prefix: str):
     """Get the help message for social media preview."""
-    prefixes = prefixes or []
     permission = check_service(cid=chat_id, ctype=ctype)
-    msg = "🔗**链接解析**"
-    if prefixes:
-        msg += f": {', '.join(prefixes)}"
-        msg += "\n🔄使用 `/retry` 回复消息强制重试"
+    msg = f"🔗**链接解析**: {prefix}\n🔄使用 `/retry` 回复消息强制重试"
     if permission["twitter"]:
         msg += "\n🕊推特"
     if permission["weibo"]:
@@ -341,7 +325,7 @@ def get_social_media_help(chat_id: int | str, ctype: str, prefixes: list[str] |
         msg += "\n🅱️哔哩哔哩"
         msg += "\n🆕和所有yt-dlp支持的链接\n"
     if permission["ai"]:
-        msg += f"\n🤖**AI对话**: `{PREFIX.GPT} /gpt /gemini /ds /qwen /doubao /grok`"
+        msg += f"\n🤖**AI对话**: `{PREFIX.GPT}`"
         msg += f"\n🌠**AI生图**: `{PREFIX.GENIMG}` + 提示词"
         msg += f"\n📖**AI总结**: 发送 `{PREFIX.AI_SUMMARY}` 查看详细教程"
     if permission["asr"]: