Commit da95f78
Changed files (6)
src/llm/response.py
@@ -8,7 +8,7 @@ from glom import Coalesce, glom
from loguru import logger
from openai import AsyncOpenAI
-from config import ENABLE, GPT
+from config import GPT
from llm.models import openrouter_hook
from llm.prompts import add_search_results_to_prompts
from llm.tools import add_tools, get_online_search_result
@@ -141,8 +141,7 @@ async def parse_response(config: dict, response: dict) -> dict[str, str]:
# do not use `!=` to compare. (deepseek/deepseek-r1:free != deepseek/deepseek-r1, gpt-4o != gpt-4o-2024-07-18)
used_model = beautify_model_name(used_model)
logger.warning(f"Fallback model {primary_model} -> {used_model}")
- if ENABLE.GPT_WARN_FALLBACK:
- response["model"] = used_model
+ response["model"] = used_model
except Exception as e:
logger.error(f"Parse GPT response failed: {e}")
raise
src/llm/tools.py
@@ -6,7 +6,7 @@ from glom import glom
from loguru import logger
from openai import AsyncOpenAI, DefaultAsyncHttpxClient
-from config import ENABLE, GPT, PROXY, TOKEN, TZ
+from config import GPT, PROXY, TOKEN, TZ
from llm.prompts import modify_prompts
from llm.tool_scheme import ONLINE_SEARCH
from networking import hx_req
@@ -89,7 +89,7 @@ def add_tools(params: dict) -> dict:
}
"""
tools = []
- if ENABLE.GPT_ONLINE_SEARCH:
+ if (GPT.PRIMARY_SEARCH_ENGINE == "google" and TOKEN.GOOGLE_SEARCH_API_KEY and TOKEN.GOOGLE_SEARCH_CX) or (GPT.PRIMARY_SEARCH_ENGINE == "glm" and GPT.GLM_API_KEY and GPT.GLM_BASE_URL):
tools = [ONLINE_SEARCH]
system_prompt = f"你是一个具备网络访问能力的智能助手. 在需要时可以访问互联网进行相关搜索获取信息以确保用户得到最新、准确的帮助。当前日期是 {nowdt(TZ):%Y-%m-%d}"
params["messages"] = modify_prompts(params["messages"], system_prompt, method="overwrite")
src/config.py
@@ -27,7 +27,7 @@ DAILY_MESSAGES = os.getenv("DAILY_MESSAGES", "{}") # Useful for daily checkin f
YTDLP_RE_ENCODING_MAX_FILE_BYTES = int(os.getenv("YTDLP_RE_ENCODING_MAX_FILE_BYTES", "0"))
-class ENABLE:
+class ENABLE: # see fine-grained permission in `src/permission.py`
AI_SUMMARY = os.getenv("ENABLE_AI_SUMMARY", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
ASR = os.getenv("ENABLE_ASR", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
AUDIO = os.getenv("ENABLE_AUDIO", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
@@ -35,8 +35,6 @@ class ENABLE:
CRONTAB = os.getenv("ENABLE_CRONTAB", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
DOUYIN = os.getenv("ENABLE_DOUYIN", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
GPT = os.getenv("ENABLE_GPT", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
- GPT_ONLINE_SEARCH = os.getenv("ENABLE_GPT_ONLINE_SEARCH", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
- GPT_WARN_FALLBACK = os.getenv("ENABLE_GPT_WARN_FALLBACK", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
INSTAGRAM = os.getenv("ENABLE_INSTAGRAM", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
OCR = os.getenv("ENABLE_OCR", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
PRICE = os.getenv("ENABLE_PRICE", "1").lower() in ["1", "y", "yes", "t", "true", "on"]
@@ -204,6 +202,7 @@ class GPT: # see `llm/README.md`
class TID: # see more TID usecase in `src/permission.py`
+ ADMIN = os.getenv("TID_ADMIN", "")
# comma separated chat ids of 67373
GROUP67373 = os.getenv("TID_GROUP67373", "")
# back up ytdlp audio if the user does not request it
src/handler.py
@@ -22,6 +22,7 @@ from others.download_external import download_url_in_message
from others.extract_audio import extract_audio_file
from others.raw_img_file import convert_raw_img_file
from others.subtitle import get_subtitle
+from permission import check_service
from preview.douyin import preview_douyin
from preview.instagram import preview_instagram
from preview.twitter import preview_twitter
@@ -143,7 +144,7 @@ async def handle_social_media(
cmd_prefix = PREFIX.MAIN
else:
cmd_prefix.extend(PREFIX.MAIN)
- ignore_prefix = ignore_prefix or []
+ ignore_prefix = ignore_prefix or ["/dl4dw"]
ignore_prefix.extend(["/ai", "/asr", "/audio", "/combine", "/subtitle", "/wget", "/ocr", "/price", "/summary"]) # these commands are handled in `handle_utilities`
info = parse_msg(message)
this_texts = info["text"] # texts of the trigger message
@@ -158,7 +159,7 @@ async def handle_social_media(
if equal_prefix(this_texts, prefix=[*cmd_prefix, "/retry"]):
# without reply, send docs if message only contains prefix command
if not message.reply_to_message:
- help_msg = get_social_media_help(cmd_prefix)
+ help_msg = get_social_media_help(info["cid"], info["ctype"], cmd_prefix)
await send2tg(client, message, texts=help_msg, **kwargs)
return
# with reply, treat the reply_msg as the trigger to preview social media link
@@ -181,20 +182,20 @@ async def handle_social_media(
kwargs |= matched
if startswith_prefix(this_texts, prefix=["/retry"], ignore_prefix=ignore_prefix):
await del_db(matched["db_key"])
- if douyin and matched["platform"] == "douyin" and ENABLE.DOUYIN:
+ if douyin and matched["platform"] == "douyin":
await preview_douyin(client, message, **kwargs)
- if tiktok and matched["platform"] == "tiktok" and ENABLE.TIKTOK:
+ if tiktok and matched["platform"] == "tiktok":
await preview_douyin(client, message, **kwargs)
- if instagram and matched["platform"] == "instagram" and ENABLE.INSTAGRAM:
+ if instagram and matched["platform"] == "instagram":
await preview_instagram(client, message, **kwargs)
- if twitter and matched["platform"] in ["x", "twitter", "fxtwitter", "fixupx"] and ENABLE.TWITTER:
+ if twitter and matched["platform"] in ["x", "twitter", "fxtwitter", "fixupx"]:
await preview_twitter(client, message, **kwargs)
- if weibo and matched["platform"] == "weibo" and ENABLE.WEIBO:
+ if weibo and matched["platform"] == "weibo":
await preview_weibo(client, message, **kwargs)
- if xhs and matched["platform"] == "xiaohongshu" and ENABLE.XHS:
+ if xhs and matched["platform"] == "xiaohongshu":
await preview_xhs(client, message, **kwargs)
try:
- if ytdlp and ENABLE.YTDLP and any(matched["platform"] == x for x in ["bilibili", "youtube", "ytdlp"]):
+ if ytdlp and any(matched["platform"] == x for x in ["bilibili", "youtube", "ytdlp"]):
await preview_ytdlp(client, message, **kwargs)
except ProxyError:
logger.error(f"🚫{matched['platform']}代理错误")
@@ -247,47 +248,49 @@ def params_from_msg_text(texts: str | None = None) -> dict:
return params
-def get_social_media_help(prefixes: list[str] | None = None):
+def get_social_media_help(chat_id: int | str, ctype: str, prefixes: list[str] | None = None):
"""Get the help message for social media preview."""
prefixes = prefixes or []
+ permission = check_service(cid=chat_id, ctype=ctype)
msg = "🔗**链接解析**"
if prefixes:
msg += f" 前缀: {', '.join(prefixes)}"
msg += "\n🔄使用 `/retry` 回复消息强制重试"
- if ENABLE.TWITTER:
+ if permission["twitter"]:
msg += "\n🕊推特"
- if ENABLE.WEIBO:
+ if permission["weibo"]:
msg += "\n🧣微博"
- if ENABLE.XHS:
+ if permission["xhs"]:
msg += "\n🍠小红书"
- if ENABLE.DOUYIN:
+ if permission["douyin"]:
msg += "\n🎶抖音"
- if ENABLE.TIKTOK:
+ if permission["tiktok"]:
msg += "\n🎶TikTok"
- if ENABLE.INSTAGRAM:
+ if permission["instagram"]:
msg += "\n🏞Instagram"
- if ENABLE.YTDLP:
+ if permission["ytdlp"]:
msg += "\n🔴油管"
msg += "\n🅱️哔哩哔哩"
msg += "\n🆕和所有yt-dlp支持的链接\n"
- if ENABLE.ASR:
+ if permission["ai"]:
+ msg += f"\n🤖**GPT对话**: `{PREFIX.GPT} /gpt /gemini /ds` + 提示词"
+ if permission["asr"]:
msg += f"\n🗣**语音转文字**: `{PREFIX.ASR}` 回复语音消息"
- if ENABLE.AUDIO:
+ if permission["audio"]:
msg += f"\n🎧**视频转音频**: `{PREFIX.AUDIO}` 回复视频消息"
- if ENABLE.GPT:
- msg += f"\n🤖**GPT对话**: `{PREFIX.GPT} /gpt /gemini /ds` + 提示词"
- if ENABLE.SUBTITLE:
- msg += f"\n📃**提取字幕**: `{PREFIX.SUBTITLE}` + 油管链接 (或回复油管链接)"
- if ENABLE.WGET:
- msg += f"\n⏬**下载文件**: `{PREFIX.WGET}` + URL"
- if ENABLE.OCR:
+ if permission["combine"]:
+ msg += f"\n💬**合并历史**: `{PREFIX.COMBINATION} #N` 合并最近N条对话历史"
+ if permission["ocr"]:
msg += f"\n🔤**图片转文字**: `{PREFIX.OCR}` 回复图片消息"
- if ENABLE.PRICE:
+ if permission["price"]:
msg += f"\n💵**查询价格**: `{PREFIX.PRICE}` + Symbol"
- if ENABLE.COMBINATION:
- msg += f"\n💬**合并历史**: `{PREFIX.COMBINATION} #N` 合并最近N条对话历史"
- if ENABLE.AI_SUMMARY:
- msg += f"\n🤖**总结历史**: `{PREFIX.AI_SUMMARY} #N` 总结最近N条对话历史"
+ if permission["subtitle"]:
+ msg += f"\n📃**提取字幕**: `{PREFIX.SUBTITLE}` + 油管链接 (或回复油管链接)"
+ if permission["summary"] and permission["ai"]: # summary is dependent on ai
+ msg += f"\n🤖**总结历史**: `{PREFIX.AI_SUMMARY} #N` AI总结最近N条对话历史"
+ if permission["wget"]:
+ msg += f"\n⏬**下载文件**: `{PREFIX.WGET}` + URL"
+
msg += "\n\n单独发送每个命令前缀本身可查看该命令详细使用说明"
return msg
src/main.py
@@ -21,7 +21,7 @@ from pyrogram.types import LinkPreviewOptions, Message
from bridge.chartimg import forward_chartimg_results
from bridge.ocr import forward_ocr_results
from bridge.social import forward_social_media_results
-from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TID, TOKEN, TZ, cache
+from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TOKEN, TZ, cache
from handler import handle_social_media, handle_utilities
from messages.parser import parse_msg
from permission import check_permission
@@ -49,45 +49,47 @@ async def main():
@app.on_message(filters.group)
async def groups(client: Client, message: Message):
- if not await check_permission(client, message, "GROUP"):
+ permission = await check_permission(client, message)
+ if permission["disabled"]:
return
parse_msg(message)
- if TID.GROUP67373 and message.chat.id in [int(x.strip()) for x in TID.GROUP67373.split(",")]:
- await handle_utilities(client, message, detail_progress=False)
- await handle_social_media(client, message, ignore_prefix=["/dl4dw"], prepend_sender_user=True)
- else:
- await handle_utilities(client, message, detail_progress=True)
- await handle_social_media(client, message, detail_progress=True)
+ await handle_utilities(client, message, **permission)
+ await handle_social_media(client, message, **permission)
@app.on_message(filters.channel)
async def channels(client: Client, message: Message):
- if not await check_permission(client, message, "CHANNEL"):
+ permission = await check_permission(client, message)
+ if permission["disabled"]:
return
parse_msg(message)
- await handle_utilities(client, message, detail_progress=True)
- await handle_social_media(client, message, detail_progress=True)
+ await handle_utilities(client, message, **permission)
+ await handle_social_media(client, message, **permission)
@app.on_message(filters.bot)
async def bots(client: Client, message: Message):
- if not await check_permission(client, message, "BOT"):
+ permission = await check_permission(client, message)
+ if permission["disabled"]:
return
parse_msg(message, verbose=True)
await forward_social_media_results(client, message)
await forward_ocr_results(client, message)
await forward_chartimg_results(client, message)
- await handle_utilities(client, message, detail_progress=True)
- await handle_social_media(client, message, detail_progress=True)
+ await handle_utilities(client, message, **permission)
+ await handle_social_media(client, message, **permission)
# filters.private = {user chats + bot chats}
# so the private handler should be placed after the bot handler
@app.on_message(filters.private)
async def private(client: Client, message: Message):
ctype = message.chat.type.name if message.chat and message.chat.type else ""
- if not await check_permission(client, message, "PRIVATE") or ctype != "PRIVATE":
+ if ctype != "PRIVATE":
+ return
+ permission = await check_permission(client, message)
+ if permission["disabled"]:
return
parse_msg(message, verbose=True)
- await handle_utilities(client, message, raw_img=True, detail_progress=True)
- await handle_social_media(client, message, need_prefix=False, detail_progress=True)
+ await handle_utilities(client, message, **permission)
+ await handle_social_media(client, message, **permission)
if ENABLE.CRONTAB:
scheduler = AsyncIOScheduler()
src/permission.py
@@ -3,141 +3,184 @@
import contextlib
import os
+from loguru import logger
from pyrogram.client import Client
from pyrogram.types import Message
-from config import ENABLE, cache
+from config import ENABLE, TID, cache
from utils import to_int, true
# ruff: noqa: SIM103
-async def check_permission(client: Client, message: Message, category: str) -> bool:
+async def check_permission(client: Client, message: Message) -> dict:
"""Check if the user has permission to use the bot."""
- if cached := cache.get(f"permission-{category}-{message.chat.id}"):
- return cached
- category = category.upper()
- if category == "GROUP":
- permission = await check_group(message)
- elif category == "CHANNEL":
- permission = await check_channel(message)
- elif category == "BOT":
- permission = await check_bot(message)
- elif category == "PRIVATE":
- permission = await check_user(client, message)
- else:
- permission = False
- cache.set(f"permission-{category}-{message.chat.id}", permission)
- return permission
+ ctype = message.chat.type.name if message.chat and message.chat.type else ""
+ ctype = ctype.upper().removeprefix("SUPER") # SUPERGROUP -> GROUP
+
+ # check permission per category
+ permission = await check_category(client, message, ctype)
+ # check permission per service
+ permission |= check_service(cid=message.chat.id, ctype=ctype)
-async def check_user(client: Client, message: Message) -> bool:
- if not ENABLE.USERS:
- return False
+ return permission
- cid = message.chat.id
+async def check_category(client: Client, message: Message, ctype: str) -> dict:
+ # ruff: noqa: SIM114
+ permission = {"disabled": False}
+ cid = slim_cid(message.chat.id)
+ if ctype == "GROUP" and not ENABLE.GROUPS:
+ permission["disabled"] = True
+ elif ctype == "CHANNEL" and not ENABLE.CHANNELS:
+ permission["disabled"] = True
+ elif ctype == "BOT" and not ENABLE.BOTS:
+ permission["disabled"] = True
+ elif ctype == "PRIVATE" and not ENABLE.USERS:
+ permission["disabled"] = True
+
+ """Mark as read for these cid
+ TID_MUTES=111111,234567
+ TID_MUTE_111111=true
"""
- mark as read for these user chats
- TID_MUTE_USERS=111111,234567
- TID_MUTE_USER_111111=true
- """
- if cid in [to_int(x.strip()) for x in os.getenv("TID_MUTE_USERS", "").split(",")] or true(os.getenv(f"TID_MUTE_USER_{cid}")):
+ if cid in [slim_cid(x) for x in os.getenv("TID_MUTES", "").split(",")] or true(os.getenv(f"TID_MUTE_{cid}")):
await message.read()
+ """Skip process these chats
+ TID_SKIPS=111111,234567
+ TID_SKIP_111111=true
"""
- do not process these chats
- TID_SKIP_USERS=111111,234567
- TID_SKIP_USER_111111=true
- """
- if cid in [to_int(x.strip()) for x in os.getenv("TID_SKIP_USERS", "").split(",")] or true(os.getenv(f"TID_SKIP_USER_{cid}")):
- return False
+ if cid in [slim_cid(x) for x in os.getenv("TID_SKIPS", "").split(",")] or true(os.getenv(f"TID_SKIP_{cid}")):
+ permission["disabled"] = True
+ """Only process these chats
+ TID_ONLY_GROUPS=111111,234567,-100234567
+ TID_ONLY_USERS=111111,234567,-100234567
"""
- whitelist mode, only allow these users
- TID_USERS_WHITELIST_MODE=true # enable whitelist mode for users
- TID_ALLOW_USERS=111111,234567 # these are allowed users
- TID_ALLOW_USER_111111=true # also allow this user
- TID_ALLOW_USER_IN_CHATS=111111,234567,-100234567 # also allow users in these chats
+ if os.getenv(f"TID_ONLY_{ctype}S") and cid not in [slim_cid(x) for x in os.getenv(f"TID_ONLY_{ctype}S", "").split(",")]:
+ permission["disabled"] = True
+
+ """Whitelist mode for users, if a user is not in the whitelist, skip process
+ TID_WHITELIST_MODE=true # enable whitelist mode for users
+ TID_WHITELIST_USERS=111111,234567 # these are allowed chats
+ TID_WHITELIST_USERS_IN_CHATS=111111,234567,-100234567 # also allow users in these chats
"""
- if true(os.getenv("TID_USERS_WHITELIST_MODE")):
- if cid in [to_int(x.strip()) for x in os.getenv("TID_ALLOW_USERS", "").split(",")]:
- return True
- if true(os.getenv(f"TID_ALLOW_USER_{cid}")):
- return True
+ if ctype == "PRIVATE" and true(os.getenv("TID_WHITELIST_MODE")):
+ permission["disabled"] = True
+ if cid in [slim_cid(x) for x in os.getenv("TID_WHITELIST_USERS", "").split(",")]:
+ permission["disabled"] = False
# check if user is a member of these chats
with contextlib.suppress(Exception):
- for chat_id in [int(x.strip()) for x in os.getenv("TID_ALLOW_USER_IN_CHATS", "").split(",") if x.strip()]:
- if not str(chat_id).startswith("-100"):
- chat_id = to_int(f"-100{chat_id}") # noqa: PLW2901
+ for chat_id in [int(x.strip()) for x in os.getenv("TID_WHITELIST_USERS_IN_CHATS", "").split(",") if x.strip()]:
+ chat_id = to_int(f"-100{slim_cid(chat_id)}") # noqa: PLW2901
if await client.get_chat_member(chat_id, cid):
- return True
- return False
- return True
-
-
-async def check_group(message: Message) -> bool:
- if not ENABLE.GROUPS:
- return False
- cid = to_int(f"{message.chat.id}".removeprefix("-100")) # strip `-100` prefix
+ permission["disabled"] = False
+ if permission["disabled"]:
+ await message.reply_text(f"⚠️Please contact {TID.ADMIN} to use this bot\n⚠️请联系 {TID.ADMIN} 获得使用权限")
+ # finally, set for specific cid: ENABLE_1234567=true
+ if true(os.getenv(f"ENABLE_{cid}")):
+ permission["disabled"] = False
+ return permission
- """
- mark as read for these group chats
- TID_MUTE_GROUPS=111111,234567,-100234567
- TID_MUTE_GROUP_111111=true # no `-100` prefix
- """
- if cid in [to_int(x.strip().removeprefix("-100")) for x in os.getenv("TID_MUTE_GROUPS", "").split(",")] or true(os.getenv(f"TID_MUTE_GROUP_{cid}")):
- await message.read()
- """
- do not process these chats
- TID_SKIP_GROUPS=111111,234567,-100234567
- TID_SKIP_GROUP_111111=true # no `-100` prefix
- """
- if cid in [to_int(x.strip().removeprefix("-100")) for x in os.getenv("TID_SKIP_GROUPS", "").split(",")] or true(os.getenv(f"TID_SKIP_GROUP_{cid}")):
- return False
+@cache.memoize(ttl=0)
+def check_service(cid: int | str, ctype: str) -> dict:
+ if not cid or not ctype:
+ return {}
+ cid = str(cid).removeprefix("-100")
+
+ permission = {
+ # default to False
+ "raw_img": False, # only enable in private chat
+ "ai": False,
+ "prepend_sender_user": False,
+ # default to True
+ "need_prefix": True,
+ "asr": True,
+ "audio": True,
+ "combine": True,
+ "subtitle": True,
+ "wget": True,
+ "ocr": True,
+ "price": True,
+ "summary": True,
+ "show_progress": True,
+ "detail_progress": True,
+ "douyin": True,
+ "tiktok": True,
+ "instagram": True,
+ "twitter": True,
+ "weibo": True,
+ "xhs": True,
+ "ytdlp": True,
+ }
+
+ if ctype == "PRIVATE":
+ permission["ai"] = True
+ permission["raw_img"] = True
+ permission["need_prefix"] = False
+
+ if TID.GROUP67373 and cid in [str(x.strip()).removeprefix("-100") for x in TID.GROUP67373.split(",")]:
+ permission["ai"] = True
+ permission["prepend_sender_user"] = True
+ permission["detail_progress"] = False
+
+ # global service permission
+ if not ENABLE.TWITTER:
+ permission["twitter"] = False
+ if not ENABLE.WEIBO:
+ permission["weibo"] = False
+ if not ENABLE.XHS:
+ permission["xhs"] = False
+ if not ENABLE.DOUYIN:
+ permission["douyin"] = False
+ if not ENABLE.TIKTOK:
+ permission["tiktok"] = False
+ if not ENABLE.INSTAGRAM:
+ permission["instagram"] = False
+ if not ENABLE.YTDLP:
+ permission["ytdlp"] = False
+ if not ENABLE.GPT:
+ permission["ai"] = False
+ if not ENABLE.ASR:
+ permission["asr"] = False
+ if not ENABLE.AUDIO:
+ permission["audio"] = False
+ if not ENABLE.COMBINATION:
+ permission["combine"] = False
+ if not ENABLE.SUBTITLE:
+ permission["subtitle"] = False
+ if not ENABLE.WGET:
+ permission["wget"] = False
+ if not ENABLE.OCR:
+ permission["ocr"] = False
+ if not ENABLE.PRICE:
+ permission["price"] = False
+ if not ENABLE.AI_SUMMARY:
+ permission["summary"] = False
+ if not ENABLE.RAW_IMG_CONVERT:
+ permission["raw_img"] = False
"""
- only process these chats
- TID_ONLY_GROUPS=111111,234567,-100234567
+ Set specific service
+ SET_111111_AI=1
+ SET_111111_DOUYIN=0
+ SET_111111_DOUYIN_PROVIDER=tikhub
"""
- if os.getenv("TID_ONLY_GROUPS") and cid not in [to_int(x.strip().removeprefix("-100")) for x in os.getenv("TID_ONLY_GROUPS", "").split(",")]:
- return False
- return True
-
-
-async def check_channel(message: Message) -> bool:
- if not ENABLE.CHANNELS:
- return False
- cid = to_int(f"{message.chat.id}".removeprefix("-100")) # strip `-100` prefix
-
- # mark as read
- if cid in [to_int(x.strip().removeprefix("-100")) for x in os.getenv("TID_MUTE_CHANNELS", "").split(",")] or true(os.getenv(f"TID_MUTE_CHANNEL_{cid}")):
- await message.read()
- # do not process
- if cid in [to_int(x.strip().removeprefix("-100")) for x in os.getenv("TID_SKIP_CHANNELS", "").split(",")] or true(os.getenv(f"TID_SKIP_CHANNEL_{cid}")):
- return False
-
- # only process
- if os.getenv("TID_ONLY_CHANNELS") and cid not in [to_int(x.strip().removeprefix("-100")) for x in os.getenv("TID_ONLY_CHANNELS", "").split(",")]:
- return False
- return True
-
-
-async def check_bot(message: Message) -> bool:
- if not ENABLE.BOTS:
- return False
- cid = message.chat.id
-
- # mark as read
- if cid in [to_int(x.strip()) for x in os.getenv("TID_MUTE_BOTS", "").split(",")] or true(os.getenv(f"TID_MUTE_BOT_{cid}")):
- await message.read()
+ def to_bool(v: str) -> bool | str:
+ if str(v).lower() in {"1", "true", "t", "yes", "y", "on", "0", "n", "no", "f", "false", "off"}:
+ return true(v)
+ return v
+
+ envs = [x for x in os.environ if x.startswith((f"SET_{cid}_", f"set_{cid}_"))]
+ for key in envs:
+ value = os.environ[key]
+ option = key.removeprefix(f"SET_{cid}_").removeprefix(f"set_{cid}_").lower()
+ permission[option] = to_bool(value) # type: ignore
+ logger.warning(f"Set `{option}` for chat={cid} to {to_bool(value)}")
+ return permission
- # do not process
- if cid in [to_int(x.strip()) for x in os.getenv("TID_SKIP_BOTS", "").split(",")] or true(os.getenv(f"TID_SKIP_BOT_{cid}")):
- return False
- # only process
- if os.getenv("TID_ONLY_BOTS") and cid not in [to_int(x.strip()) for x in os.getenv("TID_ONLY_BOTS", "").split(",")]:
- return False
- return True
+def slim_cid(cid: int | str) -> str:
+ return str(cid).strip().removeprefix("-100")