main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3import contextlib
4import re
5from pathlib import Path
6
7from glom import Coalesce, glom
8from loguru import logger
9from pyrogram.client import Client
10from pyrogram.types import Message
11
12from config import PREFIX
13from history.query import get_user
14from messages.sender import send2tg
15from messages.utils import equal_prefix, set_reaction, startswith_prefix
16from quotly.api import generate_from_api
17from quotly.pillow import generate_from_pillow
18from quotly.utils import download_avatar
19from utils import to_int
20
21HELP = f"""↪️**引用语录**
22使用说明: `{PREFIX.QUOTLY}` 命令回复消息
23
24添加以下后缀切换风格:
251. `{PREFIX.QUOTLY} @PIL`: 使用PIL本地实现的风格
262. `{PREFIX.QUOTLY} @API`: 调用 @QuotLyBot 官方API
27两者区别:
281. `@PIL` 速度更快, 仅支持文本引用
292. `@API` 速度稍慢, 支持所有格式的消息
30
31如果不指定 `@PIL` 或 `@API`, 则根据引用类型自动选择
32"""
33
34
35async def quote_message(client: Client, message: Message, **kwargs):
36 """Quote message and send as sticker.
37
38 Simulation of the original @QuotLyBot.
39
40 Docs: https://github.com/LyoSU/quote-api
41 """
42 if not startswith_prefix(message.text, prefix=PREFIX.QUOTLY):
43 return
44 quote_msg = message.reply_to_message or message.external_reply
45 if equal_prefix(message.text, prefix=PREFIX.QUOTLY) and not quote_msg:
46 await send2tg(client, message, texts=HELP, **kwargs)
47 return
48
49 text = message.text.removeprefix(PREFIX.QUOTLY).strip()
50 await set_reaction(client, message, reaction="👌")
51 # custom chat_id and message_id
52 quote_msg = quote_msg or Message(id=0)
53 if message.external_reply or ("cid=" in text and "mid=" in text):
54 cid = glom(quote_msg, "chat.id", default=0)
55 mid = glom(quote_msg, Coalesce("id", "message_id"), default=0)
56 if matched := re.search(r"cid=(-?\w+)", text, re.IGNORECASE):
57 cid = to_int(matched.group(1))
58 text = re.sub(r"cid=(-?\w+)", "", text).strip()
59 if matched := re.search(r"mid=(\d+)", text, re.IGNORECASE):
60 mid = to_int(matched.group(1))
61 text = re.sub(r"mid=(\d+)", "", text).strip()
62 msg = Message(id=0)
63 with contextlib.suppress(Exception):
64 msg: Message = await client.get_messages(cid, mid) # type: ignore
65 if msg.id != 0 and not msg.empty:
66 quote_msg = msg
67 else:
68 await send2tg(client, message, texts=f"❌获取此消息失败, 可能是消息已被删除或者无访问权限.\nChatID={cid}, MessageID={mid}", **kwargs)
69 return
70 # /quote uid=1234 quote_content_text
71 if matched := re.search(r"uid=([a-zA-Z0-9_]+)", text, re.IGNORECASE):
72 uid = to_int(matched.group(1))
73 text = re.sub(r"uid=([a-zA-Z0-9_]+)", "", text).strip()
74 # get chat id
75 cid = glom(quote_msg, "chat.id", default=0) or message.chat.id
76 if matched := re.search(r"cid=(-?\w+)", text, re.IGNORECASE):
77 cid = to_int(matched.group(1))
78 text = re.sub(r"cid=(-?\w+)", "", text).strip()
79 user = await get_user(client, uid, cid)
80 if user.id == 0:
81 await send2tg(client, message, texts=f"❌获取用户信息失败, 请重新指定cid和uid\nChatID={cid}, UID={uid}", **kwargs)
82 return
83 quote_msg.from_user = user
84
85 # user info
86 user_info = {}
87 if uid := glom(quote_msg, Coalesce("from_user.id", "chat.id"), default=0):
88 user_info |= {"uid": uid}
89 if first_name := glom(quote_msg, Coalesce("from_user.first_name", "chat.title"), default=""):
90 user_info |= {"first_name": first_name}
91 if last_name := glom(quote_msg, "from_user.last_name", default=""):
92 user_info |= {"last_name": last_name}
93 if username := glom(quote_msg, Coalesce("from_user.username", "chat.username"), default=""):
94 user_info |= {"username": username}
95 if emoji_status := glom(quote_msg, "from_user.emoji_status.custom_emoji_id", default=""):
96 user_info |= {"emoji_status": str(emoji_status)}
97
98 # user avatar (DO NOT use file_id directly, because some user hide it for public access)
99 avatar_id = "AgACAgIAAxUAAWh1vGNwV9ry4BlFLlyCmkVZewcwAAKqpzEbY1mCPTQvRMdEdMXOAAgBAAMCAANiAAceBA" # QuotLyBot avatar
100 if int(uid) == 1639998668: # hook CYF
101 async for photo in client.get_chat_photos(1639998668, limit=1): # type: ignore
102 # this method will ignore my custom avatar
103 avatar_id = glom(photo, "sizes.-1.file_id", default="")
104 elif big_file_id := glom(quote_msg, "from_user.photo.big_file_id", default=""):
105 avatar_id = big_file_id
106 else: # avatar is not available (Channel hides sender's avatar)
107 async for photo in client.get_chat_photos(message.chat.id, limit=1): # type: ignore
108 avatar_id = glom(photo, "sizes.-1.file_id", default="")
109 break
110 avatar_path: str = await download_avatar(client, avatar_id)
111 user_info |= {"photo": avatar_path}
112
113 # set prefer style
114 prefer_style = ""
115 if text.upper().startswith("@PIL"):
116 prefer_style = "PIL"
117 text = text[4:].strip()
118 elif text.upper().startswith("@API"):
119 prefer_style = "API"
120 text = text[4:].strip()
121
122 # message info
123 message_info = {"from": user_info, "text": text or quote_msg.content, "avatar": True}
124
125 # parse entities
126 entities = []
127 if quote_msg.entities:
128 for entity in quote_msg.entities:
129 parsed = {}
130 for key in ["type", "offset", "length", "url", "user", "language", "custom_emoji_id"]:
131 if key == "type":
132 parsed["type"] = entity.type.name.lower()
133 elif glom(entity, key, default=None) is not None:
134 parsed[key] = glom(entity, key, default=None)
135 entities.append(parsed)
136 if entities:
137 message_info |= {"entities": entities}
138 if user_info.get("emoji_status"):
139 prefer_style = prefer_style or "API"
140
141 # parse media
142 media_path = "/not-exist-path"
143 if media := glom(quote_msg, Coalesce("photo.thumbs.0", "video.thumbs.0"), default=None):
144 media_path: str = await client.download_media(media.file_id) # type: ignore
145 message_info |= {"media": media_path}
146 prefer_style = prefer_style or "API"
147
148 styles = ["API", "PIL"] if prefer_style == "API" else ["PIL", "API"]
149 for style in styles:
150 if style == "PIL":
151 logger.info("Generate sticker from PIL")
152 webp = await generate_from_pillow(message_info)
153 else:
154 logger.info("Generate sticker from API")
155 webp = await generate_from_api(message_info)
156 if Path(webp).is_file():
157 await client.send_sticker(message.chat.id, webp)
158 Path(webp).unlink(missing_ok=True)
159 Path(media_path).unlink(missing_ok=True)
160 await set_reaction(client, message, reaction="")
161 return