main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3import contextlib
  4import re
  5from pathlib import Path
  6
  7from glom import Coalesce, glom
  8from loguru import logger
  9from pyrogram.client import Client
 10from pyrogram.types import Message
 11
 12from config import PREFIX
 13from history.query import get_user
 14from messages.sender import send2tg
 15from messages.utils import equal_prefix, set_reaction, startswith_prefix
 16from quotly.api import generate_from_api
 17from quotly.pillow import generate_from_pillow
 18from quotly.utils import download_avatar
 19from utils import to_int
 20
 21HELP = f"""↪️**引用语录**
 22使用说明: `{PREFIX.QUOTLY}` 命令回复消息
 23
 24添加以下后缀切换风格:
 251. `{PREFIX.QUOTLY} @PIL`: 使用PIL本地实现的风格
 262. `{PREFIX.QUOTLY} @API`: 调用 @QuotLyBot 官方API
 27两者区别:
 281. `@PIL` 速度更快, 仅支持文本引用
 292. `@API` 速度稍慢, 支持所有格式的消息
 30
 31如果不指定 `@PIL` 或 `@API`, 则根据引用类型自动选择
 32"""
 33
 34
 35async def quote_message(client: Client, message: Message, **kwargs):
 36    """Quote message and send as sticker.
 37
 38    Simulation of the original @QuotLyBot.
 39
 40    Docs: https://github.com/LyoSU/quote-api
 41    """
 42    if not startswith_prefix(message.text, prefix=PREFIX.QUOTLY):
 43        return
 44    quote_msg = message.reply_to_message or message.external_reply
 45    if equal_prefix(message.text, prefix=PREFIX.QUOTLY) and not quote_msg:
 46        await send2tg(client, message, texts=HELP, **kwargs)
 47        return
 48
 49    text = message.text.removeprefix(PREFIX.QUOTLY).strip()
 50    await set_reaction(client, message, reaction="👌")
 51    # custom chat_id and message_id
 52    quote_msg = quote_msg or Message(id=0)
 53    if message.external_reply or ("cid=" in text and "mid=" in text):
 54        cid = glom(quote_msg, "chat.id", default=0)
 55        mid = glom(quote_msg, Coalesce("id", "message_id"), default=0)
 56        if matched := re.search(r"cid=(-?\w+)", text, re.IGNORECASE):
 57            cid = to_int(matched.group(1))
 58            text = re.sub(r"cid=(-?\w+)", "", text).strip()
 59        if matched := re.search(r"mid=(\d+)", text, re.IGNORECASE):
 60            mid = to_int(matched.group(1))
 61            text = re.sub(r"mid=(\d+)", "", text).strip()
 62        msg = Message(id=0)
 63        with contextlib.suppress(Exception):
 64            msg: Message = await client.get_messages(cid, mid)  # type: ignore
 65        if msg.id != 0 and not msg.empty:
 66            quote_msg = msg
 67        else:
 68            await send2tg(client, message, texts=f"❌获取此消息失败, 可能是消息已被删除或者无访问权限.\nChatID={cid}, MessageID={mid}", **kwargs)
 69            return
 70    # /quote uid=1234 quote_content_text
 71    if matched := re.search(r"uid=([a-zA-Z0-9_]+)", text, re.IGNORECASE):
 72        uid = to_int(matched.group(1))
 73        text = re.sub(r"uid=([a-zA-Z0-9_]+)", "", text).strip()
 74        # get chat id
 75        cid = glom(quote_msg, "chat.id", default=0) or message.chat.id
 76        if matched := re.search(r"cid=(-?\w+)", text, re.IGNORECASE):
 77            cid = to_int(matched.group(1))
 78            text = re.sub(r"cid=(-?\w+)", "", text).strip()
 79        user = await get_user(client, uid, cid)
 80        if user.id == 0:
 81            await send2tg(client, message, texts=f"❌获取用户信息失败, 请重新指定cid和uid\nChatID={cid}, UID={uid}", **kwargs)
 82            return
 83        quote_msg.from_user = user
 84
 85    # user info
 86    user_info = {}
 87    if uid := glom(quote_msg, Coalesce("from_user.id", "chat.id"), default=0):
 88        user_info |= {"uid": uid}
 89    if first_name := glom(quote_msg, Coalesce("from_user.first_name", "chat.title"), default=""):
 90        user_info |= {"first_name": first_name}
 91    if last_name := glom(quote_msg, "from_user.last_name", default=""):
 92        user_info |= {"last_name": last_name}
 93    if username := glom(quote_msg, Coalesce("from_user.username", "chat.username"), default=""):
 94        user_info |= {"username": username}
 95    if emoji_status := glom(quote_msg, "from_user.emoji_status.custom_emoji_id", default=""):
 96        user_info |= {"emoji_status": str(emoji_status)}
 97
 98    # user avatar (DO NOT use file_id directly, because some user hide it for public access)
 99    avatar_id = "AgACAgIAAxUAAWh1vGNwV9ry4BlFLlyCmkVZewcwAAKqpzEbY1mCPTQvRMdEdMXOAAgBAAMCAANiAAceBA"  # QuotLyBot avatar
100    if int(uid) == 1639998668:  # hook CYF
101        async for photo in client.get_chat_photos(1639998668, limit=1):  # type: ignore
102            # this method will ignore my custom avatar
103            avatar_id = glom(photo, "sizes.-1.file_id", default="")
104    elif big_file_id := glom(quote_msg, "from_user.photo.big_file_id", default=""):
105        avatar_id = big_file_id
106    else:  # avatar is not available (Channel hides sender's avatar)
107        async for photo in client.get_chat_photos(message.chat.id, limit=1):  # type: ignore
108            avatar_id = glom(photo, "sizes.-1.file_id", default="")
109            break
110    avatar_path: str = await download_avatar(client, avatar_id)
111    user_info |= {"photo": avatar_path}
112
113    # set prefer style
114    prefer_style = ""
115    if text.upper().startswith("@PIL"):
116        prefer_style = "PIL"
117        text = text[4:].strip()
118    elif text.upper().startswith("@API"):
119        prefer_style = "API"
120        text = text[4:].strip()
121
122    # message info
123    message_info = {"from": user_info, "text": text or quote_msg.content, "avatar": True}
124
125    # parse entities
126    entities = []
127    if quote_msg.entities:
128        for entity in quote_msg.entities:
129            parsed = {}
130            for key in ["type", "offset", "length", "url", "user", "language", "custom_emoji_id"]:
131                if key == "type":
132                    parsed["type"] = entity.type.name.lower()
133                elif glom(entity, key, default=None) is not None:
134                    parsed[key] = glom(entity, key, default=None)
135            entities.append(parsed)
136    if entities:
137        message_info |= {"entities": entities}
138    if user_info.get("emoji_status"):
139        prefer_style = prefer_style or "API"
140
141    # parse media
142    media_path = "/not-exist-path"
143    if media := glom(quote_msg, Coalesce("photo.thumbs.0", "video.thumbs.0"), default=None):
144        media_path: str = await client.download_media(media.file_id)  # type: ignore
145        message_info |= {"media": media_path}
146        prefer_style = prefer_style or "API"
147
148    styles = ["API", "PIL"] if prefer_style == "API" else ["PIL", "API"]
149    for style in styles:
150        if style == "PIL":
151            logger.info("Generate sticker from PIL")
152            webp = await generate_from_pillow(message_info)
153        else:
154            logger.info("Generate sticker from API")
155            webp = await generate_from_api(message_info)
156        if Path(webp).is_file():
157            await client.send_sticker(message.chat.id, webp)
158            Path(webp).unlink(missing_ok=True)
159            Path(media_path).unlink(missing_ok=True)
160            await set_reaction(client, message, reaction="")
161            return