main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import asyncio
5from pathlib import Path
6
7from loguru import logger
8from pyrogram.client import Client
9from pyrogram.errors import FloodWait
10from pyrogram.parser.markdown import BLOCKQUOTE_EXPANDABLE_DELIM
11from pyrogram.types import Message, ReplyParameters
12
13from config import CAPTION_LENGTH, TID
14from messages.preprocess import preprocess_media, warp_media_group
15from messages.progress import modify_progress, telegram_uploading
16from messages.utils import delete_message, get_reply_to, smart_split, summay_media
17from utils import to_int
18
19
20async def send2tg(
21 client: Client,
22 message: Message,
23 target_chat: int | str = "",
24 reply_msg_id: int = 0,
25 *,
26 texts: str = "",
27 media: list[dict] | None = None,
28 comments: list[str] | None = None, # append after texts
29 send_from_user: str | None = None,
30 cooldown: float = 0,
31 caption_above: bool = False,
32 **kwargs,
33) -> list[Message | None]:
34 """Send unlimited number of texts and media to Telegram.
35
36 Telegram Message Limitation:
37 - 4096 characters for pure texts
38 - 10 media in a single message
39 - 1024 characters for caption (4096 for premium user)
40
41 Args:
42 client (Client): The Pyrogram client.
43 message (Message): The trigger message object.
44 target_chat (int | str, optional): The chat ID to send the message.
45 reply_msg_id (int, optional): If set to integer > 0, the result is sent as a reply message to this message_id.
46 If set to 0, reply to the trigger message itself.
47 If set to -1, do not send as a reply message.
48 texts (str, optional): The texts to send.
49 media (list[dict], optional): The media files to send.
50 comments (list[str], optional): The comments to append after texts.
51 send_from_user (str, optional): The user name to prefix the texts.
52 cooldown (float, optional): The interval between each media message. Defaults to 0.
53 caption_above (bool, optional): Show caption above the message media.
54 kwargs: Other keyword arguments. In this function, we use:
55 show_progress (bool, optional): Show a progress message on Telegram. Defaults to True.
56 detail_progress (bool, optional): Show detailed progress (Only if show_proress is set to True). Defaults to False.
57
58 media item format:
59 [
60 {
61 "photo": "path/to/photo.jpg",
62 },
63 {
64 "video": "path/to/video.mp4",
65 }
66 ]
67 """
68 if not target_chat:
69 target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
70 target_chat = to_int(target_chat)
71 reply_parameters = get_reply_to(message.id, reply_msg_id)
72 media = media or []
73 media = await preprocess_media(media)
74 texts = texts or ""
75 if comments: # append comments to texts
76 texts = texts + "".join(comments)
77
78 if send_from_user: # prefix send_from_user
79 texts = f"{send_from_user}{texts.strip()}"
80 if kwargs.get("progress") and len(media) > 0:
81 await modify_progress(text=f"⏫正在上传:\n{summay_media(media)}", force_update=True, **kwargs)
82 sent_messages: list[Message | None] = [] # return sent messages
83 logger.trace(f"Sending {len(media)} media with {len(texts)} texts")
84 if len(media) == 0:
85 return await send_texts(client, target_chat, reply_parameters, texts=texts, cooldown=cooldown)
86 if len(media) == 1:
87 return await send_single_media(client, target_chat, reply_parameters, media=media[0], texts=texts, cooldown=cooldown, caption_above=caption_above, **kwargs)
88
89 caption = (await smart_split(texts, CAPTION_LENGTH))[0]
90 remaining_texts = texts.removeprefix(caption)
91 if 1 < len(media) <= 10:
92 group = await warp_media_group(media, caption=caption, caption_above=caption_above)
93 sent_messages.extend(await send_media_group(client, target_chat, group, reply_parameters))
94 else: # media > 10
95 media_chunks = [media[i : i + 10] for i in range(0, len(media), 10)]
96 num_chunk = len(media_chunks)
97 # send pure media first, and append captions at the last chunk
98 for idx, batch in enumerate(media_chunks):
99 if idx == 0: # first chunk
100 group = await warp_media_group(batch)
101 sent_messages.extend(await send_media_group(client, target_chat, group, reply_parameters))
102 elif idx != num_chunk - 1: # disbale reply if not the last chunk
103 group = await warp_media_group(batch)
104 sent_messages.extend(await send_media_group(client, target_chat, group, ReplyParameters()))
105 else: # last chunk: media <= 10, add caption here
106 sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, texts=caption, media=batch, caption_above=caption_above, cooldown=cooldown, **kwargs))
107 await asyncio.sleep(cooldown)
108 if remaining_texts:
109 sent_messages.extend(await send_texts(client, target_chat, ReplyParameters(), texts=remaining_texts, cooldown=cooldown))
110 # clean up
111 for x in media:
112 for key in ["path", "media", "thumb", "audio", "photo", "video"]:
113 if x.get(key) and Path(x[key]).is_file():
114 logger.trace(f"Deleting: {x[key]}")
115 Path(x[key]).unlink(missing_ok=True)
116 return sent_messages
117
118
119async def send_texts(
120 client: Client,
121 target_chat: int | str,
122 reply_parameters: ReplyParameters,
123 *,
124 texts: str = "",
125 cooldown: float = 0,
126) -> list[Message | None]:
127 sent_messages: list[Message | None] = []
128 logger.trace(f"Sending {len(texts)} texts only")
129 for idx, msg in enumerate(await smart_split(texts.strip())):
130 if not msg:
131 continue
132 # we do not send comments-only texts
133 if all(s.startswith(BLOCKQUOTE_EXPANDABLE_DELIM) for s in msg.split("\n") if s):
134 continue
135 if f"{BLOCKQUOTE_EXPANDABLE_DELIM}💬" in msg and "💬**点此展开评论区**:" not in msg:
136 continue
137
138 if idx != 0:
139 reply_parameters = ReplyParameters()
140 try:
141 sent_messages.append(await client.send_message(target_chat, texts, reply_parameters=reply_parameters))
142 except FloodWait as e:
143 logger.warning(e)
144 await asyncio.sleep(e.value) # type: ignore
145 sent_messages.append(await client.send_message(target_chat, texts, reply_parameters=reply_parameters))
146 except Exception as e:
147 logger.warning(f"send_texts: {e}")
148 await asyncio.sleep(cooldown)
149 return sent_messages
150
151
152async def send_single_media(
153 client: Client,
154 target_chat: int | str,
155 reply_parameters: ReplyParameters,
156 *,
157 media: dict,
158 texts: str = "",
159 cooldown: float = 0,
160 caption_above: bool = False,
161 **kwargs,
162) -> list[Message | None]:
163 sent_messages: list[Message | None] = []
164 logger.trace(f"Sending single media with {len(texts)} texts")
165 caption = (await smart_split(texts, CAPTION_LENGTH))[0]
166 remaining_texts = texts.removeprefix(caption)
167 message = None
168 try:
169 if media.get("photo"):
170 message = await client.send_photo(chat_id=target_chat, **media, caption=caption, show_caption_above_media=caption_above, reply_parameters=reply_parameters)
171 elif video := media.get("video"):
172 message = await client.send_video(
173 chat_id=target_chat,
174 reply_parameters=reply_parameters,
175 caption=caption,
176 show_caption_above_media=caption_above,
177 progress=telegram_uploading,
178 progress_args=(kwargs.get("progress", False), video, kwargs.get("detail_progress", True)),
179 **media,
180 )
181
182 elif audio := media.get("audio"):
183 message = await client.send_audio(
184 chat_id=target_chat,
185 reply_parameters=reply_parameters,
186 caption=caption,
187 progress=telegram_uploading,
188 progress_args=(kwargs.get("progress", False), audio, kwargs.get("detail_progress", True)),
189 **media,
190 )
191
192 elif document := media.get("document"):
193 message = await client.send_document(
194 chat_id=target_chat,
195 reply_parameters=reply_parameters,
196 caption=caption,
197 progress=telegram_uploading,
198 progress_args=(kwargs.get("progress", False), document, kwargs.get("detail_progress", True)),
199 **media,
200 )
201 except FloodWait as e:
202 logger.warning(e)
203 await asyncio.sleep(e.value) # type: ignore
204 return await send_single_media(client, target_chat, reply_parameters, media=media, texts=texts, cooldown=cooldown, **kwargs)
205 except Exception as e:
206 logger.warning(f"send_single_media: {e}")
207 sent_messages.append(message)
208 if remaining_texts:
209 sent_messages.extend(await send_texts(client, target_chat, ReplyParameters(), texts=remaining_texts, cooldown=cooldown))
210
211 for key in ["path", "thumb", "audio", "photo", "video"]:
212 if media.get(key) and Path(media[key]).is_file():
213 logger.trace(f"Deleting: {media[key]}")
214 Path(media[key]).unlink(missing_ok=True)
215 return sent_messages
216
217
218async def send_media_group(
219 client: Client,
220 target_chat: int | str,
221 media_group: list,
222 reply_parameters: ReplyParameters,
223) -> list[Message]:
224 """Send a media group to Telegram.
225
226 If we get FloodWait error, the function will retry after the specified time.
227 But unfortunately, we can not know how many media files have been sent.
228 So the retry mechanism will send the same media group again, casuing the repeated media files.
229 To avoid this, we first send the media group to a temperary chat, and then copy the messages to the target chat.
230 """
231
232 async def send(chat_id: int | str, media_group: list, reply_parameters: ReplyParameters | None = None, retry: int = 0) -> list[Message]:
233 if retry > 2:
234 return []
235 try:
236 return await client.send_media_group(chat_id, media=media_group, reply_parameters=reply_parameters) # type: ignore
237 except FloodWait as e:
238 logger.warning(e)
239 await asyncio.sleep(e.value) # type: ignore
240 except Exception as e:
241 logger.error(f"Failed to send_media_group: {e}")
242 return await send(chat_id, media_group, reply_parameters, retry=retry + 1)
243
244 async def copy_media_group(message_id: int, retry: int = 0) -> list[Message]:
245 if retry > 2:
246 return []
247 try:
248 return await client.copy_media_group(to_int(target_chat), from_chat_id=to_int(TID.TEMP), message_id=message_id, reply_parameters=reply_parameters)
249 except FloodWait as e:
250 logger.warning(e)
251 await asyncio.sleep(e.value) # type: ignore
252 except Exception as e:
253 logger.error(f"Failed to copy_media_group: {e}")
254 return await copy_media_group(message_id, retry=retry + 1)
255
256 # According to our observation, FloodWait usually occurs when len(media_group) > 6
257 # if len(media_group) > 6: # This is no longer working
258 if False:
259 sent = []
260 temp_msgs = await send(to_int(TID.TEMP), media_group)
261 if len(temp_msgs) > 0 and isinstance(temp_msgs[0], Message) and temp_msgs[0].media_group_id:
262 sent = await copy_media_group(message_id=temp_msgs[0].id)
263 [await delete_message(m) for m in temp_msgs]
264 return [m for m in sent if isinstance(m, Message)]
265
266 # send directly
267 return await send(to_int(target_chat), media_group, reply_parameters)