main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import contextlib
  4import os
  5
  6from loguru import logger
  7from pyrogram.client import Client
  8from pyrogram.types import Message
  9
 10from config import ENABLE, TID, cache
 11from messages.modify import message_modify
 12from utils import i_am_bot, slim_cid, strings_list, to_int, true
 13
 14
 15async def check_permission(client: Client, message: Message) -> dict:
 16    """Check if the user has permission to use the bot."""
 17    message = message_modify(message)
 18    ctype = message.chat.type.name if message.chat and message.chat.type else ""
 19    ctype = ctype.upper().removeprefix("SUPER")  # SUPERGROUP -> GROUP
 20
 21    # check permission per category
 22    permission = await check_category(client, message, ctype)
 23
 24    # check permission per service
 25    permission |= check_service(cid=message.chat.id, ctype=ctype)
 26
 27    # skip for service message
 28    if message.service:
 29        permission["disabled"] = True
 30    return permission
 31
 32
 33async def check_category(client: Client, message: Message, ctype: str) -> dict:
 34    # ruff: noqa: SIM114
 35    permission = {"disabled": False}
 36    cid = slim_cid(message.chat.id)
 37    if ctype == "GROUP" and not ENABLE.GROUPS:
 38        permission["disabled"] = True
 39    elif ctype == "CHANNEL" and not ENABLE.CHANNELS:
 40        permission["disabled"] = True
 41    elif ctype == "BOT" and not ENABLE.BOTS:
 42        permission["disabled"] = True
 43    elif ctype == "PRIVATE" and not ENABLE.USERS:
 44        permission["disabled"] = True
 45    is_bot = await i_am_bot(client)
 46    """Mark as read for these cid
 47    TID_MUTES=111111,234567
 48    TID_MUTE_111111=true
 49    """
 50    if not is_bot and (cid in [slim_cid(x) for x in strings_list(os.getenv("TID_MUTES"))] or true(os.getenv(f"TID_MUTE_{cid}"))):
 51        await message.read()
 52
 53    """Skip process these chats
 54    TID_SKIPS=111111,234567
 55    TID_SKIP_111111=true
 56    """
 57    if cid in [slim_cid(x) for x in strings_list(os.getenv("TID_SKIPS"))] or true(os.getenv(f"TID_SKIP_{cid}")):
 58        permission["disabled"] = True
 59
 60    """Only process these chats
 61    TID_ONLY_GROUPS=111111,234567,-100234567
 62    TID_ONLY_USERS=111111,234567,-100234567
 63    """
 64    if os.getenv(f"TID_ONLY_{ctype}S") and cid not in [slim_cid(x) for x in strings_list(os.getenv(f"TID_ONLY_{ctype}S"))]:
 65        permission["disabled"] = True
 66
 67    """Whitelist mode for users, if a user is not in the whitelist, skip process
 68    TID_WHITELIST_MODE=true  # enable whitelist mode for users
 69    TID_WHITELIST_USERS=111111,234567  # these are allowed chats
 70    TID_WHITELIST_USERS_IN_CHATS=111111,234567,-100234567  # also allow users in these chats
 71    """
 72    if ctype == "PRIVATE" and true(os.getenv("TID_WHITELIST_MODE")):
 73        permission["disabled"] = True
 74        if cid in [slim_cid(x) for x in strings_list(os.getenv("TID_WHITELIST_USERS"))]:
 75            permission["disabled"] = False
 76        # check if user is a member of these chats
 77        with contextlib.suppress(Exception):
 78            for chat_id in [int(x.strip()) for x in strings_list(os.getenv("TID_WHITELIST_USERS_IN_CHATS")) if x.strip()]:
 79                chat_id = to_int(f"-100{slim_cid(chat_id)}")  # noqa: PLW2901
 80                if await client.get_chat_member(chat_id, cid):
 81                    permission["disabled"] = False
 82        if permission["disabled"]:
 83            await message.reply_text(f"⚠️Please contact {TID.ADMIN} to use this bot\n⚠️请联系 {TID.ADMIN} 获得使用权限")
 84    # finally, set for specific cid:  ENABLE_1234567=true
 85    if true(os.getenv(f"ENABLE_{cid}")):
 86        permission["disabled"] = False
 87    return permission
 88
 89
 90@cache.memoize(ttl=0)
 91def global_permissions() -> dict:
 92    """Set permissions for all chats.
 93
 94    GLOBAL_YTDLP_SEND_AUDIO=0  # disable ytdlp_send_audio
 95    GLOBAL_TWITTER_PROVIDER=vxtwitter-fxtwitter  # set twitter provider to `vxtwitter-fxtwitter`
 96    """
 97    envs = [x for x in os.environ if x.upper().startswith("GLOBAL_")]
 98    permission = {}
 99    for key in envs:
100        value = os.environ[key]
101        option = key.removeprefix("GLOBAL_").lower()
102        permission[option] = to_bool(value)
103        logger.warning(f"Set `{option}` to {to_bool(value)}")
104    logger.success(f"Global permission: {permission}")
105    return permission
106
107
108@cache.memoize(ttl=0)
109def check_service(cid: int | str, ctype: str) -> dict:
110    if not cid or not ctype:
111        return {}
112    cid = slim_cid(cid)
113
114    permission = {
115        # default to False
116        "prepend_sender_user": False,
117        # default to True
118        "need_prefix": True,
119        "ai": True,
120        "asr": True,
121        "audio_extract": True,
122        "danmu": True,
123        "subtitle": True,
124        "wget": True,
125        "ocr": True,
126        "price": True,
127        "convert_img": True,
128        "tts": True,
129        "ytb": True,
130        "arxiv": True,
131        "google_search": True,
132        "show_progress": True,
133        "detail_progress": True,
134        "douyin": True,
135        "tiktok": True,
136        "instagram": True,
137        "twitter": True,
138        "weibo": True,
139        "xhs": True,
140        "v2ex": True,
141        "music163": True,
142        "spotify": True,
143        "github": True,
144        "wechat": True,
145        "reddit": True,
146        "ytdlp": True,
147        "ytdlp_bilibili": True,
148        "ytdlp_youtube": True,
149        "history": True,
150        "favorite": True,
151        "convert_chinese": True,
152        "quotly": True,
153        "tmdb": True,
154        "ffmpeg": True,
155        "watermark": True,
156    } | global_permissions()
157
158    if ctype == "PRIVATE":
159        permission["need_prefix"] = False
160
161    # global service permission
162    if not ENABLE.TWITTER:
163        permission["twitter"] = False
164    if not ENABLE.WEIBO:
165        permission["weibo"] = False
166    if not ENABLE.XHS:
167        permission["xhs"] = False
168    if not ENABLE.GITHUB:
169        permission["github"] = False
170    if not ENABLE.MUSIC163:
171        permission["music163"] = False
172    if not ENABLE.SPOTIFY:
173        permission["spotify"] = False
174    if not ENABLE.DOUYIN:
175        permission["douyin"] = False
176    if not ENABLE.TIKTOK:
177        permission["tiktok"] = False
178    if not ENABLE.INSTAGRAM:
179        permission["instagram"] = False
180    if not ENABLE.WECHAT:
181        permission["wechat"] = False
182    if not ENABLE.V2EX:
183        permission["v2ex"] = False
184    if not ENABLE.REDDIT:
185        permission["reddit"] = False
186    if not ENABLE.YTDLP:
187        permission["ytdlp"] = False
188    if not ENABLE.YTDLP_BILIBILI:
189        permission["ytdlp_bilibili"] = False
190    if not ENABLE.YTDLP_YOUTUBE:
191        permission["ytdlp_youtube"] = False
192    if not ENABLE.ARXIV:
193        permission["arxiv"] = False
194    if not ENABLE.AI:
195        permission["ai"] = False
196    if not ENABLE.ASR:
197        permission["asr"] = False
198    if not ENABLE.AUDIO:
199        permission["audio_extract"] = False
200    if not ENABLE.SUBTITLE:
201        permission["subtitle"] = False
202    if not ENABLE.SEARCH_YOUTUBE:
203        permission["ytb"] = False
204    if not ENABLE.SEARCH_GOOGLE:
205        permission["google_search"] = False
206    if not ENABLE.WGET:
207        permission["wget"] = False
208    if not ENABLE.OCR:
209        permission["ocr"] = False
210    if not ENABLE.PRICE:
211        permission["price"] = False
212    if not ENABLE.RAW_IMG_CONVERT:
213        permission["convert_img"] = False
214    if not ENABLE.QUERY_DANMU:
215        permission["danmu"] = False
216    if not ENABLE.HISTORY:
217        permission["history"] = False
218    if not ENABLE.FAVORITE:
219        permission["favorite"] = False
220    if not ENABLE.TTS:
221        permission["tts"] = False
222    if not ENABLE.CONVERT_CHINESE:
223        permission["convert_chinese"] = False
224    if not ENABLE.QUOTLY:
225        permission["quotly"] = False
226    if not ENABLE.TMDB:
227        permission["tmdb"] = False
228    if not ENABLE.FFMPEG:
229        permission["ffmpeg"] = False
230    if not ENABLE.WATERMARK:
231        permission["watermark"] = False
232    if not ENABLE.VERSION:
233        permission["version"] = False
234
235    """
236    Set for specific chat
237    SET_111111_AI=1
238    SET_111111_DOUYIN=0
239    SET_111111_DOUYIN_PROVIDER=tikhub
240    """
241    envs = [x for x in os.environ if x.upper().startswith(f"SET_{cid}_")]
242    for key in envs:
243        value = os.environ[key]
244        option = key.removeprefix(f"SET_{cid}_").lower()
245        permission[option] = to_bool(value)
246        logger.warning(f"Set `{option}` for chat={cid} to {to_bool(value)}")
247    logger.success(f"Permission for chat={cid}: {permission}")
248    return permission
249
250
251def to_bool(v: str) -> bool | str:
252    if str(v).lower() in {"1", "true", "t", "yes", "y", "on", "0", "n", "no", "f", "false", "off"}:
253        return true(v)
254    return v