main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import contextlib
  4import re
  5
  6from loguru import logger
  7from pyrogram.client import Client
  8from pyrogram.enums import ParseMode
  9from pyrogram.parser.markdown import BLOCKQUOTE_DELIM, BLOCKQUOTE_EXPANDABLE_DELIM
 10from pyrogram.parser.parser import Parser
 11from pyrogram.types import Message, ReactionTypeEmoji, ReplyParameters
 12
 13from config import TEXT_LENGTH
 14from utils import myself, readable_size, strings_list, to_int
 15
 16
 17def startswith_prefix(text: str, prefix: str | list[str], ignore_prefix: str | list[str] | None = None) -> bool:
 18    """Check if the message text starts with the given command prefixes.
 19
 20    support prefix:
 21      "/cmd"
 22      "/cmd1, /cmd2"
 23      ["/cmd1", "/cmd2"]
 24      ["/cmd1, /cmd2"]
 25
 26    Args:
 27        text (str): The message text.
 28        prefix (str | list[str]): Command prefixes that are effective.
 29        ignore_prefix (str | list[str], optional): Ignore these command prefixes.
 30    """
 31
 32    def norm_cmd(cmd: str) -> str:
 33        return cmd.strip().lower().replace("", "!").replace("", "?")
 34
 35    if not text:
 36        return False
 37    ignore_prefix = ignore_prefix or []
 38    if isinstance(ignore_prefix, str):
 39        ignore_prefix = [ignore_prefix]
 40    if isinstance(prefix, str):
 41        prefix = [prefix]
 42
 43    for ignore_str in ignore_prefix:
 44        for pfx in strings_list(ignore_str):
 45            if pfx and norm_cmd(text).startswith(norm_cmd(pfx)):
 46                return False
 47
 48    for prefix_str in prefix:
 49        for pfx in strings_list(prefix_str):
 50            if pfx and norm_cmd(text).startswith(norm_cmd(pfx)):
 51                return True
 52    return False
 53
 54
 55def equal_prefix(text: str, prefix: str | list[str], ignore_prefix: str | list[str] | None = None) -> bool:
 56    """Check if the message text equal with the given command prefixes.
 57
 58    support prefix:
 59      "/cmd"
 60      "/cmd1, /cmd2"
 61      ["/cmd1", "/cmd2"]
 62      ["/cmd1, /cmd2"]
 63
 64    Args:
 65        text (str): The message text.
 66        prefix (str | list[str]): Command prefixes that are effective.
 67        ignore_prefix (str | list[str], optional): Ignore these command prefixes.
 68    """
 69
 70    def norm_cmd(cmd: str) -> str:
 71        return cmd.strip().lower().replace("", "!").replace("", "?")
 72
 73    if not text or not prefix:
 74        return False
 75    ignore_prefix = ignore_prefix or []
 76    if isinstance(ignore_prefix, str):
 77        ignore_prefix = [ignore_prefix]
 78    if isinstance(prefix, str):
 79        prefix = [prefix]
 80
 81    for ignore_str in ignore_prefix:
 82        for pfx in strings_list(ignore_str):
 83            if pfx and norm_cmd(text) == norm_cmd(pfx):
 84                return False
 85
 86    for prefix_str in prefix:
 87        for pfx in strings_list(prefix_str):
 88            if pfx and norm_cmd(text) == norm_cmd(pfx):
 89                return True
 90    return False
 91
 92
 93def remove_prefix(s: str, prefix: str, separator: str = ",") -> str:
 94    """Remove the prefix from the string."""
 95    if not prefix:
 96        return s
 97    final = s
 98    for pfx in strings_list(prefix, separator=separator):
 99        if s.lower().startswith(pfx.lower()):
100            final = s[len(pfx) :].lstrip()
101    return final
102
103
104def get_reply_to(msg_id: int, reply_msg_id: int | str) -> ReplyParameters:
105    if str(reply_msg_id) == "0":
106        reply_to = msg_id
107    elif str(reply_msg_id).lower() in ["-1", "none", "null", "false"]:
108        reply_to = None
109    else:
110        reply_to = to_int(reply_msg_id)
111    return ReplyParameters(message_id=reply_to)  # type: ignore
112
113
114def summay_media(media: list[dict]) -> str:
115    def filesize(path: str) -> str:
116        if not isinstance(path, str):
117            return ""
118        return f": {readable_size(path=path)}"
119
120    msg = ""
121    for idx, info in enumerate(media):
122        if value := info.get("photo"):
123            msg += f"\n🏞P{idx + 1}{filesize(value)}"
124        elif (value := info.get("video")) or (value := info.get("livephoto")):
125            msg += f"\n🎬P{idx + 1}{filesize(value)}"
126        elif value := info.get("audio"):
127            msg += f"\n🎧P{idx + 1}{filesize(value)}"
128        elif value := info.get("document"):
129            msg += f"\n💾P{idx + 1}{filesize(value)}"
130    return msg.strip()
131
132
133def sender_markdown_to_html(sender: str) -> str:
134    """Convert markdown to html.
135
136    👤[@username](tg://user?id=123456789)// ->
137    👤<a href="tg://user?id=123456789">@username</a>//
138    """
139    if not sender:
140        return ""
141    return re.sub(r"^👤\[@(.*?)\]\(tg://user\?id=(\d+)\)", r'👤<a href="tg://user?id=\2">@\1</a>', sender)
142
143
144async def count_without_entities(strings: str, mode: ParseMode = ParseMode.DEFAULT) -> int:
145    if not strings:
146        return 0
147    parser = Parser(client=None)
148    parsed = await parser.parse(strings, mode=mode)
149    return len(parsed["message"])
150
151
152async def smart_split(text: str, chars_per_string: int = TEXT_LENGTH, mode: ParseMode = ParseMode.DEFAULT) -> list[str]:
153    """Splits one string into multiple strings, with a maximum amount of `chars_per_string` characters per string."""
154
155    def next_sentence(strings: str) -> str:
156        # ruff: noqa: RUF001
157        sentence_enders = r"[。!? \.\)\n\t)]"
158        if matched := re.search(sentence_enders, strings):
159            return strings[: matched.end()]
160        return strings
161
162    chars_per_string = chars_per_string - len(BLOCKQUOTE_EXPANDABLE_DELIM) * text.count(BLOCKQUOTE_EXPANDABLE_DELIM)
163    parts = []
164    while True:
165        if await count_without_entities(text, mode) < chars_per_string:
166            parts.append(text)
167            break
168
169        next_str = next_sentence(text)
170        part = next_str if len(next_str) <= chars_per_string else next_str[:chars_per_string]
171        left = text[len(part) :]
172        while await count_without_entities(f"{part}{next_sentence(left)}", mode) < chars_per_string:
173            part += next_sentence(left)
174            left = text[len(part) :]
175            if not left.strip():
176                break
177        parts.append(part)
178        text = left
179    return parts
180
181
182def blockquote(s: str) -> str:
183    """Block quote texts."""
184    s = s.replace(BLOCKQUOTE_EXPANDABLE_DELIM, "")
185    return BLOCKQUOTE_EXPANDABLE_DELIM + s.replace("\n", f"\n{BLOCKQUOTE_EXPANDABLE_DELIM}")
186
187
188def quote(s: str) -> str:
189    """Quote texts."""
190    s = s.removeprefix(BLOCKQUOTE_DELIM)
191    return BLOCKQUOTE_DELIM + s.replace("\n", f"\n{BLOCKQUOTE_DELIM}")
192
193
194async def sent_from_me(client: Client, message: Message) -> bool:
195    """Check if this clinet is a bot or not."""
196    try:
197        me = await myself(client)
198        uid = message.from_user.id if message.from_user else 1
199    except Exception as e:
200        logger.error(e)
201        return False
202    return me.id == uid
203
204
205async def set_reaction(client: Client, message: Message, reaction: str | list[str] = ""):
206    with contextlib.suppress(Exception):
207        if reaction:
208            emojis = [ReactionTypeEmoji(emoji=reaction)] if isinstance(reaction, str) else [ReactionTypeEmoji(emoji=x) for x in reaction]
209            await client.set_reaction(message.chat.id, message.id, reaction=emojis)  # type: ignore
210        else:
211            await client.set_reaction(message.chat.id, message.id)
212
213
214async def delete_message(message: Message | None):
215    if not isinstance(message, Message):
216        return
217    with contextlib.suppress(Exception):
218        await message.delete()
219
220
221def remove_img_tag(markdown: str) -> tuple[str, list[str]]:
222    """Removes all image tags from a markdown string."""
223    image_pattern = r"!\[.*?\]\((.*?)\)"  # Matches both with and without alt text
224    clean = re.sub(image_pattern, "", markdown)
225    urls = re.findall(image_pattern, markdown)
226    return clean, urls