main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import contextlib
4import os
5
6from loguru import logger
7from pyrogram.client import Client
8from pyrogram.types import Message
9
10from config import ENABLE, TID, cache
11from messages.modify import message_modify
12from utils import i_am_bot, slim_cid, strings_list, to_int, true
13
14
15async def check_permission(client: Client, message: Message) -> dict:
16 """Check if the user has permission to use the bot."""
17 message = message_modify(message)
18 ctype = message.chat.type.name if message.chat and message.chat.type else ""
19 ctype = ctype.upper().removeprefix("SUPER") # SUPERGROUP -> GROUP
20
21 # check permission per category
22 permission = await check_category(client, message, ctype)
23
24 # check permission per service
25 permission |= check_service(cid=message.chat.id, ctype=ctype)
26
27 # skip for service message
28 if message.service:
29 permission["disabled"] = True
30 return permission
31
32
33async def check_category(client: Client, message: Message, ctype: str) -> dict:
34 # ruff: noqa: SIM114
35 permission = {"disabled": False}
36 cid = slim_cid(message.chat.id)
37 if ctype == "GROUP" and not ENABLE.GROUPS:
38 permission["disabled"] = True
39 elif ctype == "CHANNEL" and not ENABLE.CHANNELS:
40 permission["disabled"] = True
41 elif ctype == "BOT" and not ENABLE.BOTS:
42 permission["disabled"] = True
43 elif ctype == "PRIVATE" and not ENABLE.USERS:
44 permission["disabled"] = True
45 is_bot = await i_am_bot(client)
46 """Mark as read for these cid
47 TID_MUTES=111111,234567
48 TID_MUTE_111111=true
49 """
50 if not is_bot and (cid in [slim_cid(x) for x in strings_list(os.getenv("TID_MUTES"))] or true(os.getenv(f"TID_MUTE_{cid}"))):
51 await message.read()
52
53 """Skip process these chats
54 TID_SKIPS=111111,234567
55 TID_SKIP_111111=true
56 """
57 if cid in [slim_cid(x) for x in strings_list(os.getenv("TID_SKIPS"))] or true(os.getenv(f"TID_SKIP_{cid}")):
58 permission["disabled"] = True
59
60 """Only process these chats
61 TID_ONLY_GROUPS=111111,234567,-100234567
62 TID_ONLY_USERS=111111,234567,-100234567
63 """
64 if os.getenv(f"TID_ONLY_{ctype}S") and cid not in [slim_cid(x) for x in strings_list(os.getenv(f"TID_ONLY_{ctype}S"))]:
65 permission["disabled"] = True
66
67 """Whitelist mode for users, if a user is not in the whitelist, skip process
68 TID_WHITELIST_MODE=true # enable whitelist mode for users
69 TID_WHITELIST_USERS=111111,234567 # these are allowed chats
70 TID_WHITELIST_USERS_IN_CHATS=111111,234567,-100234567 # also allow users in these chats
71 """
72 if ctype == "PRIVATE" and true(os.getenv("TID_WHITELIST_MODE")):
73 permission["disabled"] = True
74 if cid in [slim_cid(x) for x in strings_list(os.getenv("TID_WHITELIST_USERS"))]:
75 permission["disabled"] = False
76 # check if user is a member of these chats
77 with contextlib.suppress(Exception):
78 for chat_id in [int(x.strip()) for x in strings_list(os.getenv("TID_WHITELIST_USERS_IN_CHATS")) if x.strip()]:
79 chat_id = to_int(f"-100{slim_cid(chat_id)}") # noqa: PLW2901
80 if await client.get_chat_member(chat_id, cid):
81 permission["disabled"] = False
82 if permission["disabled"]:
83 await message.reply_text(f"⚠️Please contact {TID.ADMIN} to use this bot\n⚠️请联系 {TID.ADMIN} 获得使用权限")
84 # finally, set for specific cid: ENABLE_1234567=true
85 if true(os.getenv(f"ENABLE_{cid}")):
86 permission["disabled"] = False
87 return permission
88
89
90@cache.memoize(ttl=0)
91def global_permissions() -> dict:
92 """Set permissions for all chats.
93
94 GLOBAL_YTDLP_SEND_AUDIO=0 # disable ytdlp_send_audio
95 GLOBAL_TWITTER_PROVIDER=vxtwitter-fxtwitter # set twitter provider to `vxtwitter-fxtwitter`
96 """
97 envs = [x for x in os.environ if x.upper().startswith("GLOBAL_")]
98 permission = {}
99 for key in envs:
100 value = os.environ[key]
101 option = key.removeprefix("GLOBAL_").lower()
102 permission[option] = to_bool(value)
103 logger.warning(f"Set `{option}` to {to_bool(value)}")
104 logger.success(f"Global permission: {permission}")
105 return permission
106
107
108@cache.memoize(ttl=0)
109def check_service(cid: int | str, ctype: str) -> dict:
110 if not cid or not ctype:
111 return {}
112 cid = slim_cid(cid)
113
114 permission = {
115 # default to False
116 "prepend_sender_user": False,
117 # default to True
118 "need_prefix": True,
119 "ai": True,
120 "asr": True,
121 "audio_extract": True,
122 "danmu": True,
123 "subtitle": True,
124 "wget": True,
125 "ocr": True,
126 "price": True,
127 "convert_img": True,
128 "tts": True,
129 "ytb": True,
130 "arxiv": True,
131 "google_search": True,
132 "show_progress": True,
133 "detail_progress": True,
134 "douyin": True,
135 "tiktok": True,
136 "instagram": True,
137 "twitter": True,
138 "weibo": True,
139 "xhs": True,
140 "v2ex": True,
141 "music163": True,
142 "spotify": True,
143 "github": True,
144 "wechat": True,
145 "reddit": True,
146 "ytdlp": True,
147 "ytdlp_bilibili": True,
148 "ytdlp_youtube": True,
149 "history": True,
150 "favorite": True,
151 "convert_chinese": True,
152 "quotly": True,
153 "tmdb": True,
154 "ffmpeg": True,
155 "watermark": True,
156 } | global_permissions()
157
158 if ctype == "PRIVATE":
159 permission["need_prefix"] = False
160
161 # global service permission
162 if not ENABLE.TWITTER:
163 permission["twitter"] = False
164 if not ENABLE.WEIBO:
165 permission["weibo"] = False
166 if not ENABLE.XHS:
167 permission["xhs"] = False
168 if not ENABLE.GITHUB:
169 permission["github"] = False
170 if not ENABLE.MUSIC163:
171 permission["music163"] = False
172 if not ENABLE.SPOTIFY:
173 permission["spotify"] = False
174 if not ENABLE.DOUYIN:
175 permission["douyin"] = False
176 if not ENABLE.TIKTOK:
177 permission["tiktok"] = False
178 if not ENABLE.INSTAGRAM:
179 permission["instagram"] = False
180 if not ENABLE.WECHAT:
181 permission["wechat"] = False
182 if not ENABLE.V2EX:
183 permission["v2ex"] = False
184 if not ENABLE.REDDIT:
185 permission["reddit"] = False
186 if not ENABLE.YTDLP:
187 permission["ytdlp"] = False
188 if not ENABLE.YTDLP_BILIBILI:
189 permission["ytdlp_bilibili"] = False
190 if not ENABLE.YTDLP_YOUTUBE:
191 permission["ytdlp_youtube"] = False
192 if not ENABLE.ARXIV:
193 permission["arxiv"] = False
194 if not ENABLE.AI:
195 permission["ai"] = False
196 if not ENABLE.ASR:
197 permission["asr"] = False
198 if not ENABLE.AUDIO:
199 permission["audio_extract"] = False
200 if not ENABLE.SUBTITLE:
201 permission["subtitle"] = False
202 if not ENABLE.SEARCH_YOUTUBE:
203 permission["ytb"] = False
204 if not ENABLE.SEARCH_GOOGLE:
205 permission["google_search"] = False
206 if not ENABLE.WGET:
207 permission["wget"] = False
208 if not ENABLE.OCR:
209 permission["ocr"] = False
210 if not ENABLE.PRICE:
211 permission["price"] = False
212 if not ENABLE.RAW_IMG_CONVERT:
213 permission["convert_img"] = False
214 if not ENABLE.QUERY_DANMU:
215 permission["danmu"] = False
216 if not ENABLE.HISTORY:
217 permission["history"] = False
218 if not ENABLE.FAVORITE:
219 permission["favorite"] = False
220 if not ENABLE.TTS:
221 permission["tts"] = False
222 if not ENABLE.CONVERT_CHINESE:
223 permission["convert_chinese"] = False
224 if not ENABLE.QUOTLY:
225 permission["quotly"] = False
226 if not ENABLE.TMDB:
227 permission["tmdb"] = False
228 if not ENABLE.FFMPEG:
229 permission["ffmpeg"] = False
230 if not ENABLE.WATERMARK:
231 permission["watermark"] = False
232 if not ENABLE.VERSION:
233 permission["version"] = False
234
235 """
236 Set for specific chat
237 SET_111111_AI=1
238 SET_111111_DOUYIN=0
239 SET_111111_DOUYIN_PROVIDER=tikhub
240 """
241 envs = [x for x in os.environ if x.upper().startswith(f"SET_{cid}_")]
242 for key in envs:
243 value = os.environ[key]
244 option = key.removeprefix(f"SET_{cid}_").lower()
245 permission[option] = to_bool(value)
246 logger.warning(f"Set `{option}` for chat={cid} to {to_bool(value)}")
247 logger.success(f"Permission for chat={cid}: {permission}")
248 return permission
249
250
251def to_bool(v: str) -> bool | str:
252 if str(v).lower() in {"1", "true", "t", "yes", "y", "on", "0", "n", "no", "f", "false", "off"}:
253 return true(v)
254 return v