main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import contextlib
4import re
5
6from loguru import logger
7from pyrogram.client import Client
8from pyrogram.enums import ParseMode
9from pyrogram.parser.markdown import BLOCKQUOTE_DELIM, BLOCKQUOTE_EXPANDABLE_DELIM
10from pyrogram.parser.parser import Parser
11from pyrogram.types import Message, ReactionTypeEmoji, ReplyParameters
12
13from config import TEXT_LENGTH
14from utils import myself, readable_size, strings_list, to_int
15
16
17def startswith_prefix(text: str, prefix: str | list[str], ignore_prefix: str | list[str] | None = None) -> bool:
18 """Check if the message text starts with the given command prefixes.
19
20 support prefix:
21 "/cmd"
22 "/cmd1, /cmd2"
23 ["/cmd1", "/cmd2"]
24 ["/cmd1, /cmd2"]
25
26 Args:
27 text (str): The message text.
28 prefix (str | list[str]): Command prefixes that are effective.
29 ignore_prefix (str | list[str], optional): Ignore these command prefixes.
30 """
31
32 def norm_cmd(cmd: str) -> str:
33 return cmd.strip().lower().replace("!", "!").replace("?", "?")
34
35 if not text:
36 return False
37 ignore_prefix = ignore_prefix or []
38 if isinstance(ignore_prefix, str):
39 ignore_prefix = [ignore_prefix]
40 if isinstance(prefix, str):
41 prefix = [prefix]
42
43 for ignore_str in ignore_prefix:
44 for pfx in strings_list(ignore_str):
45 if pfx and norm_cmd(text).startswith(norm_cmd(pfx)):
46 return False
47
48 for prefix_str in prefix:
49 for pfx in strings_list(prefix_str):
50 if pfx and norm_cmd(text).startswith(norm_cmd(pfx)):
51 return True
52 return False
53
54
55def equal_prefix(text: str, prefix: str | list[str], ignore_prefix: str | list[str] | None = None) -> bool:
56 """Check if the message text equal with the given command prefixes.
57
58 support prefix:
59 "/cmd"
60 "/cmd1, /cmd2"
61 ["/cmd1", "/cmd2"]
62 ["/cmd1, /cmd2"]
63
64 Args:
65 text (str): The message text.
66 prefix (str | list[str]): Command prefixes that are effective.
67 ignore_prefix (str | list[str], optional): Ignore these command prefixes.
68 """
69
70 def norm_cmd(cmd: str) -> str:
71 return cmd.strip().lower().replace("!", "!").replace("?", "?")
72
73 if not text or not prefix:
74 return False
75 ignore_prefix = ignore_prefix or []
76 if isinstance(ignore_prefix, str):
77 ignore_prefix = [ignore_prefix]
78 if isinstance(prefix, str):
79 prefix = [prefix]
80
81 for ignore_str in ignore_prefix:
82 for pfx in strings_list(ignore_str):
83 if pfx and norm_cmd(text) == norm_cmd(pfx):
84 return False
85
86 for prefix_str in prefix:
87 for pfx in strings_list(prefix_str):
88 if pfx and norm_cmd(text) == norm_cmd(pfx):
89 return True
90 return False
91
92
93def remove_prefix(s: str, prefix: str, separator: str = ",") -> str:
94 """Remove the prefix from the string."""
95 if not prefix:
96 return s
97 final = s
98 for pfx in strings_list(prefix, separator=separator):
99 if s.lower().startswith(pfx.lower()):
100 final = s[len(pfx) :].lstrip()
101 return final
102
103
104def get_reply_to(msg_id: int, reply_msg_id: int | str) -> ReplyParameters:
105 if str(reply_msg_id) == "0":
106 reply_to = msg_id
107 elif str(reply_msg_id).lower() in ["-1", "none", "null", "false"]:
108 reply_to = None
109 else:
110 reply_to = to_int(reply_msg_id)
111 return ReplyParameters(message_id=reply_to) # type: ignore
112
113
114def summay_media(media: list[dict]) -> str:
115 def filesize(path: str) -> str:
116 if not isinstance(path, str):
117 return ""
118 return f": {readable_size(path=path)}"
119
120 msg = ""
121 for idx, info in enumerate(media):
122 if value := info.get("photo"):
123 msg += f"\n🏞P{idx + 1}{filesize(value)}"
124 elif (value := info.get("video")) or (value := info.get("livephoto")):
125 msg += f"\n🎬P{idx + 1}{filesize(value)}"
126 elif value := info.get("audio"):
127 msg += f"\n🎧P{idx + 1}{filesize(value)}"
128 elif value := info.get("document"):
129 msg += f"\n💾P{idx + 1}{filesize(value)}"
130 return msg.strip()
131
132
133def sender_markdown_to_html(sender: str) -> str:
134 """Convert markdown to html.
135
136 👤[@username](tg://user?id=123456789)// ->
137 👤<a href="tg://user?id=123456789">@username</a>//
138 """
139 if not sender:
140 return ""
141 return re.sub(r"^👤\[@(.*?)\]\(tg://user\?id=(\d+)\)", r'👤<a href="tg://user?id=\2">@\1</a>', sender)
142
143
144async def count_without_entities(strings: str, mode: ParseMode = ParseMode.DEFAULT) -> int:
145 if not strings:
146 return 0
147 parser = Parser(client=None)
148 parsed = await parser.parse(strings, mode=mode)
149 return len(parsed["message"])
150
151
152async def smart_split(text: str, chars_per_string: int = TEXT_LENGTH, mode: ParseMode = ParseMode.DEFAULT) -> list[str]:
153 """Splits one string into multiple strings, with a maximum amount of `chars_per_string` characters per string."""
154
155 def next_sentence(strings: str) -> str:
156 # ruff: noqa: RUF001
157 sentence_enders = r"[。!? \.\)\n\t)]"
158 if matched := re.search(sentence_enders, strings):
159 return strings[: matched.end()]
160 return strings
161
162 chars_per_string = chars_per_string - len(BLOCKQUOTE_EXPANDABLE_DELIM) * text.count(BLOCKQUOTE_EXPANDABLE_DELIM)
163 parts = []
164 while True:
165 if await count_without_entities(text, mode) < chars_per_string:
166 parts.append(text)
167 break
168
169 next_str = next_sentence(text)
170 part = next_str if len(next_str) <= chars_per_string else next_str[:chars_per_string]
171 left = text[len(part) :]
172 while await count_without_entities(f"{part}{next_sentence(left)}", mode) < chars_per_string:
173 part += next_sentence(left)
174 left = text[len(part) :]
175 if not left.strip():
176 break
177 parts.append(part)
178 text = left
179 return parts
180
181
182def blockquote(s: str) -> str:
183 """Block quote texts."""
184 s = s.replace(BLOCKQUOTE_EXPANDABLE_DELIM, "")
185 return BLOCKQUOTE_EXPANDABLE_DELIM + s.replace("\n", f"\n{BLOCKQUOTE_EXPANDABLE_DELIM}")
186
187
188def quote(s: str) -> str:
189 """Quote texts."""
190 s = s.removeprefix(BLOCKQUOTE_DELIM)
191 return BLOCKQUOTE_DELIM + s.replace("\n", f"\n{BLOCKQUOTE_DELIM}")
192
193
194async def sent_from_me(client: Client, message: Message) -> bool:
195 """Check if this clinet is a bot or not."""
196 try:
197 me = await myself(client)
198 uid = message.from_user.id if message.from_user else 1
199 except Exception as e:
200 logger.error(e)
201 return False
202 return me.id == uid
203
204
205async def set_reaction(client: Client, message: Message, reaction: str | list[str] = ""):
206 with contextlib.suppress(Exception):
207 if reaction:
208 emojis = [ReactionTypeEmoji(emoji=reaction)] if isinstance(reaction, str) else [ReactionTypeEmoji(emoji=x) for x in reaction]
209 await client.set_reaction(message.chat.id, message.id, reaction=emojis) # type: ignore
210 else:
211 await client.set_reaction(message.chat.id, message.id)
212
213
214async def delete_message(message: Message | None):
215 if not isinstance(message, Message):
216 return
217 with contextlib.suppress(Exception):
218 await message.delete()
219
220
221def remove_img_tag(markdown: str) -> tuple[str, list[str]]:
222 """Removes all image tags from a markdown string."""
223 image_pattern = r"!\[.*?\]\((.*?)\)" # Matches both with and without alt text
224 clean = re.sub(image_pattern, "", markdown)
225 urls = re.findall(image_pattern, markdown)
226 return clean, urls