Commit 75158b7
Changed files (23)
src/asr/voice_recognition.py
@@ -9,7 +9,9 @@ from pyrogram.types import Message
from asr.tecent_asr import Credential, FlashRecognitionRequest, FlashRecognizer
from config import ASR_MAX_DURATION, ENABLE, PREFIX, TOKEN, cache
-from message_utils import equal_prefix, modify_progress, send2tg, startswith_prefix
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import equal_prefix, startswith_prefix
from multimedia import convert_to_audio, parse_media_info
# ruff: noqa: RUF001
src/bridge/ocr.py
@@ -8,7 +8,8 @@ from pyrogram.client import Client
from pyrogram.types import Message, ReplyParameters
from config import ENABLE, PREFIX, cache
-from message_utils import equal_prefix, send2tg, startswith_prefix
+from messages.sender import send2tg
+from messages.utils import equal_prefix, startswith_prefix
from utils import i_am_bot
OCR_BOT = "GLBetabot"
src/messages/database.py
@@ -0,0 +1,176 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import contextlib
+import json
+import re
+
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import Message, ReplyParameters
+
+from config import DB
+from database import del_db, get_db, set_db
+from messages.progress import modify_progress
+from messages.utils import sender_markdown_to_html
+
+
+async def save_messages(messages: list[Message | None], key: str, metadata: dict | None = None) -> bool:
+ """Save the messages to DB.
+
+ data format:
+ {
+ "data": [
+ {
+ "cid": 111, # chat id
+ "type": "text", # photo, video, media_group, etc.
+ "mid": 222, # message id
+ "text": "html format",
+ },
+ ...
+ ]
+ }
+ """
+ if not metadata:
+ metadata = {}
+ if not messages:
+ logger.error(f"Skip save messages to {DB.ENGINE} due to empty message list")
+ return False
+ valid_messages = [x for x in messages if isinstance(x, Message)]
+ if len(valid_messages) != len(messages):
+ logger.warning(f"Skip save messages to {DB.ENGINE} due to invalid message type")
+ return False
+ time_str = valid_messages[0].date.isoformat()
+ metadata["time"] = time_str
+ data = []
+ media_group_ids = set() # save once
+ for msg in valid_messages:
+ text = ""
+ if msg.text:
+ text = msg.text
+ if msg.caption:
+ text = msg.caption
+ if hasattr(text, "html"): # DO NOT use markdown, because this format has some bugs
+ text = text.html # type: ignore
+ # Caution: this format should be consistent with `handle_social_media` function in `handler.py`
+ # text = re.sub(r"^π€\[@.*?\]\(tg://user\?id=\d+\)//", "", text) # remove markdown send_from_user
+ text = re.sub(r"^π€\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", text) # remove markdown send_from_user
+ msg_extra = {"text": text} if text else {}
+ if msg.media_group_id:
+ if msg.media_group_id not in media_group_ids:
+ logger.trace(f"Saving media group message {msg.id}")
+ media_group_ids.add(msg.media_group_id)
+ data.append({"type": "media_group", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ continue
+ if msg.video:
+ logger.trace(f"Saving video message {msg.id}")
+ data.append({"type": "video", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ continue
+ if msg.photo:
+ logger.trace(f"Saving photo message {msg.id}")
+ data.append({"type": "photo", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ continue
+ if msg.audio:
+ logger.trace(f"Saving audio message {msg.id}")
+ data.append({"type": "audio", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ continue
+ if msg.text:
+ logger.trace(f"Saving text message {msg.id}")
+ data.append({"type": "text", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ continue
+ logger.warning(f"Skip save message {msg.id} to {DB.ENGINE} due to unknown type: {msg}")
+ if data:
+ return await set_db(key, metadata=metadata, data={"data": data})
+ return False
+
+
+async def copy_messages_from_db(client: Client, message: Message, key: str, kv: dict | None = None, **kwargs) -> bool:
+ """Copy messages from database.
+
+ data format:
+ {
+ "data": [
+ {
+ "cid": 111, # chat id
+ "type": "text", # photo, video, media_group, etc.
+ "mid": 222, # message id
+ "text": "html format",
+ },
+ ...
+ ]
+ }
+ OR:
+ { "is_parted": True }
+
+ """
+ if kwargs:
+ logger.debug(f"kwargs: {kwargs}")
+ target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
+ reply_parameters = ReplyParameters(message_id=kwargs.get("reply_msg_id", message.id))
+
+ if kv is None:
+ kv = await get_db(key)
+ if kv.get("is_parted"):
+ logger.warning(f"Parted messages found for key={key}")
+ return await copy_parted_msgs_from_db(client, message, key, **kwargs)
+
+ if not kv.get("data"):
+ logger.error(f"Wrong {DB.ENGINE} data for key={key}: {kv}")
+ return False
+ data: list[dict] = kv.get("data", [])
+ if isinstance(data, str):
+ data = json.loads(data)
+ logger.debug(f"Sending {len(data)} messages from {DB.ENGINE}: {data}")
+ await modify_progress(text=f"πΎε¨{DB.ENGINE}δΈζ₯ε°ηΌε, ζ£ε¨θ½¬ε{len(data)}ζ‘ζΆζ―...", **kwargs)
+ results = []
+ try:
+ for idx, item in enumerate(sorted(data, key=lambda x: x["mid"])):
+ with contextlib.suppress(ValueError):
+ cid = int(item["cid"])
+ if idx != 0:
+ reply_parameters = ReplyParameters() # only send as reply of the first message
+ logger.debug(f"Copying {item['type']} message: ({cid}, {item['mid']}) -> target_chat={target_chat}")
+ text = item.get("text") # str or None
+ if text and kwargs.get("send_from_user"):
+ text = f"{sender_markdown_to_html(kwargs['send_from_user'])}{text}"
+ if item["type"] == "text":
+ if text:
+ results.append(await client.send_message(chat_id=target_chat, text=text, reply_parameters=reply_parameters))
+ else:
+ db_msg: Message = await client.get_messages(chat_id=cid, message_ids=int(item["mid"]), replies=0) # type: ignore
+ results.append(await client.send_message(chat_id=target_chat, text=db_msg.text, reply_parameters=reply_parameters))
+ elif item["type"] in ["photo", "audio", "video"]:
+ results.append(await client.copy_message(chat_id=target_chat, caption=text, from_chat_id=cid, message_id=int(item["mid"]), reply_parameters=reply_parameters)) # type: ignore
+ elif item["type"] == "media_group":
+ results.extend(await client.copy_media_group(chat_id=target_chat, captions=text, from_chat_id=cid, message_id=int(item["mid"]), reply_parameters=reply_parameters)) # type: ignore
+ else:
+ logger.warning(f"Unknown message type: {item}")
+ except Exception as e:
+ logger.error(f"Failed to copy messages for key={key} from {DB.ENGINE}: {e}")
+ await del_db(key)
+ return False
+ if all(isinstance(x, Message) for x in results):
+ logger.success(f"Successfully copied {len(results)} messages for key={key} from {DB.ENGINE}")
+ await modify_progress(del_status=True, **kwargs)
+ return True
+ await del_db(key)
+ return False
+
+
+async def copy_parted_msgs_from_db(client: Client, message: Message, key: str, suffix: str = "-P", **kwargs) -> bool:
+ """Copy parted messages from database.
+
+ For some large video files, they can be sent in parts.
+ The primary key is the video link, and the parted key is suffixed with "P".
+ """
+ for idx in range(1, 100):
+ logger.trace(f"Checking parted message for key={key}{suffix}{idx}")
+ kv = await get_db(f"{key}{suffix}{idx}")
+ if not kv:
+ logger.debug(f"No more parted messages found for key={key}{suffix}{idx}")
+ return True
+ if not await copy_messages_from_db(client, message, key=f"{key}{suffix}{idx}", kv=kv, **kwargs):
+ logger.error(f"Failed to copy parted messages for key={key}{suffix}{idx}")
+ return False
+ logger.warning(f"Too many parted messages found for key={key}")
+ return False
src/messages/parser.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+
+from loguru import logger
+from pyrogram.enums import MessageEntityType
+from pyrogram.types import Message
+
+from config import cache
+
+
+def parse_msg(message: Message, *, verbose: bool = False) -> dict:
+ if cached := cache.get(f"parse_msg-{message.chat.id}-{message.id}"):
+ return cached
+ if verbose:
+ logger.trace(f"{message!r}")
+ chat_type = message.chat.type.name if message.chat and message.chat.type else ""
+ chat_title = message.chat.title if message.chat and message.chat.title else ""
+ uid = message.from_user.id if message.from_user else 0
+ cid = message.chat.id if message.chat else 0
+ mid = message.id if message.id else 0
+ is_bot = bool(message.from_user and message.from_user.is_bot)
+ text = message.text if message.text else ""
+ first_name = message.from_user.first_name if message.from_user and message.from_user.first_name else ""
+ last_name = message.from_user.last_name if message.from_user and message.from_user.last_name else ""
+ handle = message.from_user.username if message.from_user and message.from_user.username else ""
+ full_name = f"{first_name} {last_name}".strip() if message.from_user else ""
+ video_name = message.video.file_name if message.video else ""
+ photo_id = message.photo.file_unique_id if message.photo else ""
+ caption = message.caption if message.caption else ""
+ gif = message.animation.file_name if message.animation else ""
+ sticker = message.sticker.set_name if message.sticker else ""
+ file_name = message.document.file_name if message.document else ""
+ message_url = f"https://t.me/c/{str(cid).removeprefix('-100')}/{mid}"
+
+ # Parse URL from message entities
+ entity_urls = []
+ if message.entities:
+ entity_urls.extend(entity.url for entity in message.entities if entity.type == MessageEntityType.TEXT_LINK)
+ if message.caption_entities:
+ entity_urls.extend(entity.url for entity in message.caption_entities if entity.type == MessageEntityType.TEXT_LINK)
+
+ # log the summary to console
+ chat_type_emoji = {
+ "BOT": "π€",
+ "GROUP": "π₯",
+ "SUPERGROUP": "π₯",
+ "CHANNEL": "π‘",
+ "PRIVATE": "π΄",
+ }.get(chat_type, "")
+ summary = ""
+ if chat_title:
+ summary += f"{chat_type_emoji}{chat_title}[{mid}]"
+ if first_name:
+ summary += f"π€{full_name}[{uid}]" if is_bot else f"π€{full_name}[{uid}]"
+ if video_name:
+ summary += f" π¬{video_name}"
+ if photo_id:
+ summary += f" π{photo_id}"
+ if sticker:
+ summary += f" π¨{sticker}"
+ if gif:
+ summary += f" β¨{gif}"
+ if file_name:
+ summary += f" π{file_name}"
+ if text:
+ summary += f" π{text}"
+ if caption:
+ summary += f" π{caption}"
+ logger.info(f"{summary!r}")
+
+ info = { # ensure the type of each field
+ "chat_type": str(chat_type),
+ "chat_title": str(chat_title),
+ "uid": int(uid),
+ "cid": int(cid),
+ "mid": int(mid),
+ "is_bot": bool(is_bot),
+ "text": str(text),
+ "first_name": str(first_name),
+ "last_name": str(last_name),
+ "full_name": str(full_name),
+ "handle": str(handle),
+ "video_name": str(video_name),
+ "file_name": str(file_name),
+ "photo_id": str(photo_id),
+ "caption": str(caption),
+ "gif": str(gif),
+ "sticker": str(sticker),
+ "summary": str(summary),
+ "message_url": str(message_url),
+ "entity_urls": entity_urls,
+ }
+ cache.set(f"parse_msg-{message.chat.id}-{message.id}", info, ttl=120) # cache the same msg for 2 minutes
+ return info
src/messages/progress.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from pathlib import Path
+
+from loguru import logger
+from pyrogram.errors import Flood
+from pyrogram.types import Message
+
+from config import TEXT_LENGTH, cache
+
+
+async def modify_progress(message: Message | None = None, text: str = "", *, detail_progress: bool = False, del_status: bool = False, force_update: bool = False, **kwargs):
+ """Modify the progress message.
+
+ Args:
+ message (Message): The progress message object.
+ text (str): The new text to update.
+ detail_progress(bool): Whether to show the detail progress.
+ del_status (bool): Whether the progress is done.
+ force_update (bool): Force update the message.
+ """
+ if message is None:
+ message = kwargs.get("progress")
+ if not isinstance(message, Message):
+ return
+ try:
+ if del_status:
+ logger.info("Deleting progress message")
+ await message.delete()
+ return
+ if not text:
+ return
+ if cache.get("modify_progress"): # DO NOT update too frequently
+ detail_progress = False
+ if force_update:
+ detail_progress = True
+ if not detail_progress:
+ return
+ logger.trace(f"Progress: {text!r}")
+ await message.edit_text(text[:TEXT_LENGTH])
+ cache.set("modify_progress", "1", ttl=2)
+ except Flood as e:
+ logger.warning(f"modify_progress: {e}")
+
+
+async def telegram_uploading(current: int, total: int, *args):
+ """Show video uploading progress."""
+ msg = f"δΈδΌ δΈ: {current / 1024 / 1024:.1f} / {total / 1024 / 1024:.1f} MB ({current / total:.2%})"
+ if len(args) != 3:
+ return
+ message = args[0]
+ path = args[1]
+ detail_progress = args[2]
+ if not Path(path).is_file():
+ logger.error(f"File not found: {path}")
+ return
+ _type = "θ§ι’" if Path(path).suffix in [".mp4", ".mkv", ".mov", ".webm", ".avi", ".flv", ".wmv", ".m4v"] else "ι³ι’"
+ emoji = "π¬" if _type == "θ§ι’" else "π§"
+ msg = f"β«{_type}{msg}\n{emoji}{Path(path).name}"
+ await modify_progress(message=message, text=msg, detail_progress=detail_progress)
src/messages/sender.py
@@ -0,0 +1,338 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import asyncio
+from pathlib import Path
+
+from loguru import logger
+from pyrogram.client import Client
+from pyrogram.types import InputMediaPhoto, InputMediaVideo, Message, ReplyParameters
+
+from config import CAPTION_LENGTH
+from messages.progress import modify_progress, telegram_uploading
+from messages.utils import get_reply_to, summay_media
+from multimedia import fix_video_rotation, generate_cover, is_valid_video, parse_media_info, split_large_video, split_long_img, validate_img
+from utils import smart_split, to_int
+
+
+def warp_media_group(media: list[dict], caption: str = "") -> list:
+ """Warp media files into a list of InputMediaPhoto or InputMediaVideo objects.
+
+ item in media:
+ {
+ "photo": "path/to/photo.jpg",
+ }
+ or
+ {
+ "video": "path/to/video.mp4",
+ "width": int,
+ "height": int,
+ "duration": int,
+ "thumb": "path/to/thumbnail.jpg" | None,
+ }
+ """
+ group = []
+ if len(media) < 2:
+ logger.error(f"Media group requires at least 2 items, number of media: {len(media)}")
+ return []
+ if len(caption) > CAPTION_LENGTH:
+ logger.warning(f"Caption too long, length: {len(caption)}, caption: {caption}")
+ caption = caption[:CAPTION_LENGTH]
+ if len(media) > 10:
+ logger.warning(f"Too many media files, number of media: {len(media)}")
+ media = media[:10]
+ # add caption to the first item
+ if media[0].get("photo"):
+ group.append(InputMediaPhoto(media[0]["photo"], caption=caption))
+ elif media[0].get("video"):
+ media[0]["media"] = media[0].pop("video")
+ group.append(InputMediaVideo(caption=caption, **media[0]))
+
+ # DO NOT add captions for remaining media
+ for x in media[1:]:
+ if x.get("photo"):
+ group.append(InputMediaPhoto(x["photo"]))
+ elif x.get("video"):
+ x["media"] = x.pop("video")
+ group.append(InputMediaVideo(**x))
+ return group
+
+
+def preprocess_media(media: list[dict]) -> list[dict]:
+ """Filter out invalid media files.
+
+ - photo must be at most 10 MB in size.
+ - photo's width and height must not exceed 10000 in total.
+ - photo's width and height ratio must be at most 20.
+ - filesize < 2GB for video
+
+ Args:
+ media (list[dict]): The list of media info.
+ format: {"photo": "path/to/photo.jpg"}
+ OR { "video": "path/to/video.mp4",
+ "thumb"(optional): "path/to/thumbnail.jpg" (if thumb is passed, use it. Otherwise, generate a new one)
+ }
+
+ Returns:
+ list[dict]: The filtered media info.
+ {"photo": "path/to/photo.jpg"}
+ or
+ {
+ "video": "path/to/video.mp4",
+ "width": int,
+ "height": int,
+ "duration": int,
+ "thumb": "path/to/thumbnail.jpg" | None,
+ }
+ """
+ num_before = len(media)
+ logger.trace(f"{num_before} media info before preprocess: {media}")
+ results = []
+
+ # Step-1: Photos
+ step1_res = []
+ for data in media:
+ if photo_path := data.get("photo"):
+ valid_photos = [validate_img(photo) for photo in split_long_img(photo_path) if validate_img(photo)]
+ step1_res.extend({"photo": valid_photo} for valid_photo in valid_photos)
+ continue
+ step1_res.append(data) # other type
+
+ # Step-2: Videos
+ for data in step1_res:
+ thumb = data.get("thumb") # thumb is provided
+ if video_path := data.get("video"):
+ video_path = fix_video_rotation(video_path)
+ if not is_valid_video(video_path):
+ logger.warning(f"Video is invalid: {video_path}")
+ continue
+
+ # split large video files ( < 2GB)
+ valid_videos = [x for x in split_large_video(video_path) if is_valid_video(x)]
+
+ # generate thumbnails for each video if thumb is not provided
+ thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := validate_img(thumb)) else [generate_cover(x) for x in valid_videos]
+ for vpath, tpath in zip(valid_videos, thumbs, strict=True):
+ video_info = parse_media_info(vpath)
+ thumb = valid_thumb if (valid_thumb := validate_img(tpath)) else None
+ results.append({"video": vpath.as_posix(), "width": video_info["width"], "height": video_info["height"], "duration": video_info["duration"], "thumb": thumb})
+ continue
+ results.append(data) # other type
+
+ logger.debug(f"Filtered out {num_before - len(results)} invalid media files")
+ logger.trace(f"{len(results)} media info after preprocess: {results}")
+ return results
+
+
+async def send2tg(
+ client: Client,
+ message: Message,
+ target_chat: int | str = "",
+ reply_msg_id: int = 0,
+ *,
+ texts: str = "",
+ media: list[dict] | None = None,
+ comments: list[str] | None = None, # append after texts
+ send_from_user: str | None = None,
+ cooldown: float = 0,
+ **kwargs,
+) -> list[Message | None]:
+ """Send unlimited number of texts and media to Telegram.
+
+ Telegram Message Limitation:
+ - 4096 characters for pure texts
+ - 10 media in a single message
+ - 1024 characters for caption (4096 for premium user)
+
+ Args:
+ client (Client): The Pyrogram client.
+ message (Message): The trigger message object.
+ target_chat (int | str, optional): The chat ID to send the message.
+ reply_msg_id (int, optional): If set to integer > 0, the result is sent as a reply message to this message_id.
+ If set to 0, reply to the trigger message itself.
+ If set to -1, do not send as a reply message.
+ texts (str, optional): The texts to send.
+ media (list[dict], optional): The media files to send.
+ comments (list[str], optional): The comments to append after texts.
+ send_from_user (str, optional): The user name to prefix the texts.
+ cooldown (float, optional): The interval between each media message. Defaults to 0.
+ kwargs: Other keyword arguments. In this function, we use:
+ show_progress (bool, optional): Show a progress message on Telegram. Defaults to True.
+ detail_progress (bool, optional): Show detailed progress (Only if show_proress is set to True). Defaults to False.
+
+ media item format:
+ [
+ {
+ "photo": "path/to/photo.jpg",
+ },
+ {
+ "video": "path/to/video.mp4",
+ }
+ ]
+ TODO: Support to send audio and document
+ """
+ if kwargs:
+ logger.debug(f"kwargs: {kwargs}")
+
+ if not target_chat:
+ target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
+ target_chat = to_int(target_chat)
+ reply_parameters = get_reply_to(message.id, reply_msg_id)
+ sent_messages: list[Message | None] = [] # save sent messages results
+
+ if media is None:
+ media = []
+ media = preprocess_media(media)
+ if comments is None:
+ comments = []
+
+ # no text, but has comments. treat comments as texts
+ texts = texts if texts else "".join(comments).strip()
+
+ if send_from_user: # prefix send_from_user
+ texts = f"{send_from_user}{texts.strip()}"
+
+ if kwargs.get("progress") and len(media) > 0:
+ await modify_progress(text=f"β«ζ£ε¨δΈδΌ :\n{summay_media(media)}", force_update=True, **kwargs)
+
+ # only media
+ if media and not texts:
+ logger.trace(f"Sending {len(media)} media without any texts")
+ if len(media) == 1:
+ if media[0].get("photo"):
+ sent_messages.append(await client.send_photo(chat_id=target_chat, photo=media[0]["photo"], reply_parameters=reply_parameters))
+ elif media[0].get("video"):
+ sent_messages.append(
+ await client.send_video(
+ chat_id=target_chat,
+ reply_parameters=reply_parameters,
+ progress=telegram_uploading,
+ progress_args=(kwargs.get("progress", False), media[0]["video"], kwargs.get("detail_progress", True)),
+ **media[0],
+ )
+ )
+ elif 1 < len(media) <= 10:
+ group = warp_media_group(media)
+ sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=reply_parameters))
+ else:
+ media_chunks = [media[i : i + 10] for i in range(0, len(media), 10)]
+ for idx, chunk in enumerate(media_chunks):
+ if idx == 0:
+ sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id, media=chunk, **kwargs))
+ else:
+ sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, media=chunk, **kwargs)) # disbale reply
+ await asyncio.sleep(cooldown) # cool down
+
+ # append comments to texts
+ # For len(texts) < 1024 , ensure the combined texts and comments remains below 1024 characters to avoid sending a subsequent message containing only the comments.
+ # For long texts, keep all comments
+ if len(texts) < CAPTION_LENGTH:
+ for comment in comments:
+ if len(f"{texts}{comment}") < CAPTION_LENGTH:
+ texts += comment
+ else:
+ texts = texts + "".join(comments)
+
+ videos = [x for x in media if x.get("video")]
+ photos = [x for x in media if x.get("photo")]
+ logger.trace(f"{len(texts)} texts, {len(comments)} comments, {len(videos)} videos, {len(photos)} photos: {texts!r}")
+
+ # only texts
+ if texts and not media:
+ logger.trace(f"Sending {len(texts)} texts without any media")
+ for idx, msg in enumerate(smart_split(texts)):
+ if idx == 0:
+ sent_messages.append(await client.send_message(target_chat, msg, reply_parameters=reply_parameters))
+ else:
+ sent_messages.append(await client.send_message(target_chat, msg, reply_parameters=ReplyParameters()))
+ return sent_messages
+ # both texts and media
+ if texts and media:
+ logger.trace(f"Sending {len(media)} media + {len(texts)} texts")
+ # short text, single media
+ if len(texts) < CAPTION_LENGTH and len(media) == 1:
+ if media[0].get("photo"):
+ sent_messages.append(await client.send_photo(target_chat, photo=media[0]["photo"], caption=texts, reply_parameters=reply_parameters))
+ elif media[0].get("video"):
+ sent_messages.append(
+ await client.send_video(
+ chat_id=target_chat,
+ caption=texts,
+ reply_parameters=reply_parameters,
+ progress=telegram_uploading,
+ progress_args=(kwargs.get("progress", False), media[0]["video"], kwargs.get("detail_progress", True)),
+ **media[0],
+ )
+ )
+ # long text, single media
+ elif len(texts) >= CAPTION_LENGTH and len(media) == 1:
+ caption_text = smart_split(texts, CAPTION_LENGTH)[0]
+ if media[0].get("photo"):
+ sent_messages.append(await client.send_photo(target_chat, photo=media[0]["photo"], caption=caption_text, reply_parameters=reply_parameters))
+ elif media[0].get("video"):
+ sent_messages.append(
+ await client.send_video(
+ chat_id=target_chat,
+ caption=caption_text,
+ reply_parameters=reply_parameters,
+ progress=telegram_uploading,
+ progress_args=(kwargs.get("progress", False), media[0]["video"], kwargs.get("detail_progress", True)),
+ **media[0],
+ )
+ )
+ remaining_texts = texts.removeprefix(caption_text)
+ sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, texts=remaining_texts, **kwargs))
+
+ # short text, multiple media [1, 10]
+ elif len(texts) < CAPTION_LENGTH and 1 < len(media) <= 10:
+ group = warp_media_group(media, caption=texts)
+ sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=reply_parameters))
+
+ # short text, multiple media (10, inf)
+ elif len(texts) < CAPTION_LENGTH and len(media) > 10:
+ media_chunks = [media[i : i + 10] for i in range(0, len(media), 10)]
+ num_chunk = len(media_chunks)
+ # send pure media first, and append captions at the last chunk
+ for idx, batch in enumerate(media_chunks):
+ if idx == 0: # first chunk
+ group = warp_media_group(batch)
+ sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=reply_parameters))
+ elif idx != num_chunk - 1: # disbale reply if not the last chunk
+ group = warp_media_group(batch)
+ sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=ReplyParameters()))
+ else: # last chunk (media <= 10, texts < 1024)
+ sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, texts=texts, media=batch, **kwargs))
+ await asyncio.sleep(cooldown)
+
+ # long text, multiple media [1, 10]
+ elif len(texts) >= CAPTION_LENGTH and 1 < len(media) <= 10:
+ caption_text = smart_split(texts, CAPTION_LENGTH)[0]
+ remaining_texts = texts.removeprefix(caption_text)
+ group = warp_media_group(media, caption=caption_text)
+ sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=reply_parameters))
+ sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, texts=remaining_texts, **kwargs))
+
+ # long text, multiple media (10, inf)
+ else:
+ media_chunks = [media[i : i + 10] for i in range(0, len(media), 10)]
+ num_chunk = len(media_chunks)
+ # send pure media first, and append captions at the last chunk
+ for idx, batch in enumerate(media_chunks):
+ if idx == 0: # first chunk
+ group = warp_media_group(batch)
+ sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=reply_parameters))
+ elif idx != num_chunk - 1: # disbale reply if not the last chunk
+ group = warp_media_group(batch)
+ sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=ReplyParameters()))
+ else: # last chunk (media <= 10, texts >= 1024)
+ sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, texts=texts, media=batch, **kwargs))
+ await asyncio.sleep(cooldown)
+
+ # clean up
+ logger.trace("Cleaning up media files")
+ for x in media:
+ for key in ["path", "thumb", "audio", "photo", "video"]:
+ if x.get(key) and Path(x[key]).is_file():
+ logger.trace(f"Deleting: {x[key]}")
+ Path(x[key]).unlink(missing_ok=True)
+ return sent_messages
src/messages/utils.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import re
+
+from pyrogram.types import ReplyParameters
+
+from config import cache
+from utils import readable_size, to_int
+
+
+@cache.memoize(ttl=60)
+def startswith_prefix(text: str | None = None, prefix: list[str] | None = None, ignore_prefix: list[str] | None = None) -> bool:
+ """Check if the message text starts with the given command prefixes.
+
+ Args:
+ text (str): The message text.
+ prefix (list[str], optional): Command prefixes that are effective.
+ ignore_prefix (list[str], optional): Ignore these command prefixes.
+ """
+ if not text:
+ return False
+ if ignore_prefix and any(text.strip().lower().startswith(prefix) for prefix in ignore_prefix):
+ return False
+ return bool(prefix and any(text.strip().lower().startswith(prefix) for prefix in prefix))
+
+
+@cache.memoize(ttl=60)
+def equal_prefix(text: str | None = None, prefix: list[str] | None = None, ignore_prefix: list[str] | None = None) -> bool:
+ """Check if the message text equal with the given command prefixes.
+
+ Args:
+ text (str): The message text.
+ prefix (list[str], optional): Extra command prefixes that are effective.
+ ignore_prefix (list[str], optional): Ignore these command prefixes.
+ """
+ if not text:
+ return False
+ if ignore_prefix and text.strip().lower() in ignore_prefix:
+ return False
+ return bool(prefix and text.strip().lower() in prefix)
+
+
+def get_reply_to(msg_id: int, reply_msg_id: int | str) -> ReplyParameters:
+ if str(reply_msg_id) == "0":
+ reply_to = msg_id
+ elif str(reply_msg_id).lower() in ["-1", "none", "null", "false"]:
+ reply_to = None
+ else:
+ reply_to = to_int(reply_msg_id)
+ return ReplyParameters(message_id=reply_to) # type: ignore
+
+
+def summay_media(media: list[dict]) -> str:
+ def filesize(path: str) -> str:
+ if not isinstance(path, str):
+ return ""
+ return f": {readable_size(path=path)}"
+
+ msg = ""
+ for idx, info in enumerate(media):
+ if value := info.get("photo"):
+ msg += f"\nπP{idx + 1}{filesize(value)}"
+ elif (value := info.get("video")) or (value := info.get("livephoto")):
+ msg += f"\nπ¬P{idx + 1}{filesize(value)}"
+ elif value := info.get("audio"):
+ msg += f"\nπ§P{idx + 1}{filesize(value)}"
+ return msg.strip()
+
+
+def sender_markdown_to_html(sender: str) -> str:
+ """Convert markdown to html.
+
+ π€[@username](tg://user?id=123456789)// ->
+ π€<a href="tg://user?id=123456789">@username</a>//
+ """
+ if not sender:
+ return ""
+ return re.sub(r"^π€\[@(.*?)\]\(tg://user\?id=(\d+)\)", r'π€<a href="tg://user?id=\2">@\1</a>', sender)
src/others/download_external.py
@@ -11,7 +11,9 @@ from pyrogram.client import Client
from pyrogram.types import Message
from config import ENABLE, MAX_FILE_BYTES, PREFIX
-from message_utils import equal_prefix, get_reply_to, modify_progress, send2tg, startswith_prefix
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import equal_prefix, get_reply_to, startswith_prefix
from multimedia import is_valid_video, validate_img
from networking import download_file
from utils import https_url, readable_size, to_int
src/others/extract_audio.py
@@ -7,7 +7,10 @@ from pyrogram.client import Client
from pyrogram.types import Message
from config import ENABLE, PREFIX, cache
-from message_utils import equal_prefix, get_reply_to, modify_progress, parse_msg, send2tg, startswith_prefix
+from messages.parser import parse_msg
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import equal_prefix, get_reply_to, startswith_prefix
from multimedia import convert_to_audio, parse_media_info
from utils import to_int
src/others/gpt.py
@@ -14,7 +14,9 @@ from pyrogram.client import Client
from pyrogram.types import Message
from config import DOWNLOAD_DIR, ENABLE, GPT, PREFIX, PROXY, cache
-from message_utils import equal_prefix, modify_progress, send2tg, startswith_prefix
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import equal_prefix, startswith_prefix
from multimedia import convert_to_audio
from networking import hx_req
src/others/raw_img_file.py
@@ -7,7 +7,8 @@ from pyrogram.client import Client
from pyrogram.types import Message
from config import ENABLE
-from message_utils import parse_msg, send2tg
+from messages.parser import parse_msg
+from messages.sender import send2tg
from multimedia import split_long_img
src/others/subtitle.py
@@ -14,7 +14,9 @@ from youtube_transcript_api import YouTubeTranscriptApi
from config import API, ENABLE, PREFIX, PROXY, TOKEN
from database import cache
-from message_utils import equal_prefix, modify_progress, send2tg, startswith_prefix
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import equal_prefix, startswith_prefix
from networking import hx_req, match_social_media_link
from utils import to_int
src/preview/douyin.py
@@ -11,7 +11,10 @@ from pyrogram.types import Message
from bridge.social import send_to_social_media_bridge
from config import API, DB, PROVIDER, TOKEN, TZ, cache
from database import get_db
-from message_utils import copy_messages_from_db, modify_progress, save_messages, send2tg, summay_media
+from messages.database import copy_messages_from_db, save_messages
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import summay_media
from networking import download_first_success_urls, download_media, hx_req
from others.emoji import emojify
from utils import true
src/preview/instagram.py
@@ -12,7 +12,10 @@ from pyrogram.types import Message
from bridge.social import send_to_social_media_bridge
from config import API, DB, DOWNLOAD_DIR, PROVIDER, TOKEN, TZ, UA, cache
from database import get_db
-from message_utils import copy_messages_from_db, modify_progress, save_messages, send2tg, summay_media
+from messages.database import copy_messages_from_db, save_messages
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import summay_media
from multimedia import is_valid_video, validate_img
from networking import download_file, download_media, hx_req
from utils import true
src/preview/twitter.py
@@ -12,7 +12,10 @@ from pyrogram.types import Message
from bridge.social import send_to_social_media_bridge
from config import API, DB, PROVIDER, TOKEN, TZ, UA, cache
from database import get_db
-from message_utils import copy_messages_from_db, modify_progress, save_messages, send2tg, summay_media
+from messages.database import copy_messages_from_db, save_messages
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import summay_media
from networking import download_file, download_media, flatten_rediercts, hx_req
from utils import remove_none_values, split_parts, true
src/preview/weibo.py
@@ -16,7 +16,10 @@ from bridge.social import send_to_social_media_bridge
from config import API, DB, DOWNLOAD_DIR, PROVIDER, TOKEN, TZ, cache
from cookies import get_weibo_cookies
from database import get_db
-from message_utils import copy_messages_from_db, modify_progress, save_messages, send2tg, summay_media
+from messages.database import copy_messages_from_db, save_messages
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import summay_media
from networking import download_file, download_first_success_urls, download_media, hx_req
from others.emoji import emojify
from utils import https_url, rand_string, soup_to_text, split_parts, true, ts_to_dt
src/preview/xiaohongshu.py
@@ -12,7 +12,10 @@ from pyrogram.types import Message
from bridge.social import send_to_social_media_bridge
from config import DB, PROXY, TZ, UA, cache
from database import get_db
-from message_utils import copy_messages_from_db, modify_progress, save_messages, send2tg, summay_media
+from messages.database import copy_messages_from_db, save_messages
+from messages.progress import modify_progress
+from messages.sender import send2tg
+from messages.utils import summay_media
from networking import download_file, download_first_success_urls, download_media, hx_req
from others.emoji import emojify
src/preview/ytdlp.py
@@ -17,7 +17,10 @@ from yt_dlp.utils import DownloadError, ExtractorError, YoutubeDLError
from config import API, CAPTION_LENGTH, DB, DOWNLOAD_DIR, MAX_FILE_BYTES, PROVIDER, PROXY, TID, TOKEN, cache
from database import get_db
-from message_utils import copy_messages_from_db, get_reply_to, modify_progress, preprocess_media, save_messages, send2tg, telegram_uploading
+from messages.database import copy_messages_from_db, save_messages
+from messages.progress import modify_progress, telegram_uploading
+from messages.sender import preprocess_media, send2tg
+from messages.utils import get_reply_to
from multimedia import generate_cover
from networking import hx_req
from others.emoji import emojify
src/handler.py
@@ -11,7 +11,9 @@ from asr.voice_recognition import voice_to_text
from bridge.ocr import send_to_ocr_bridge
from config import ENABLE, PREFIX, PROXY, cache
from database import del_db
-from message_utils import equal_prefix, parse_msg, send2tg, startswith_prefix
+from messages.parser import parse_msg
+from messages.sender import send2tg
+from messages.utils import equal_prefix, startswith_prefix
from networking import flatten_rediercts, match_social_media_link
from others.download_external import download_url_in_message
from others.extract_audio import extract_audio_file
@@ -152,7 +154,7 @@ async def handle_social_media(
# add send_from_user.
if prepend_sender_user:
- # Caution: this format should be consistent with `save_messages` function in `message_utils.py`
+ # Caution: this format should be consistent with `save_messages` function in `message.database.py`
kwargs["send_from_user"] = f"π€[@{info['full_name']}](tg://user?id={info['uid']})//"
try:
texts = message.text or message.caption or ""
src/main.py
@@ -22,7 +22,7 @@ from bridge.ocr import forward_ocr_results
from bridge.social import forward_social_media_results
from config import DAILY_MESSAGES, DEVICE_NAME, ENABLE, PROXY, TID, TOKEN, TZ, cache
from handler import handle_social_media, handle_utilities
-from message_utils import parse_msg
+from messages.parser import parse_msg
from utils import cleanup_old_files, nowdt, to_int
# ruff: noqa: RUF001
src/message_utils.py
@@ -1,716 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import asyncio
-import contextlib
-import json
-import re
-from pathlib import Path
-
-from loguru import logger
-from pyrogram.client import Client
-from pyrogram.enums import MessageEntityType
-from pyrogram.errors import Flood
-from pyrogram.types import InputMediaPhoto, InputMediaVideo, Message, ReplyParameters
-
-from config import CAPTION_LENGTH, DB, TEXT_LENGTH, cache
-from database import del_db, get_db, set_db
-from multimedia import fix_video_rotation, generate_cover, is_valid_video, parse_media_info, split_large_video, split_long_img, validate_img
-from utils import readable_size, smart_split, to_int
-
-
-# ruff: noqa: RUF001
-def parse_msg(message: Message, *, verbose: bool = False) -> dict:
- if cached := cache.get(f"parse_msg-{message.chat.id}-{message.id}"):
- return cached
- if verbose:
- logger.trace(f"{message!r}")
- chat_type = message.chat.type.name if message.chat and message.chat.type else ""
- chat_title = message.chat.title if message.chat and message.chat.title else ""
- uid = message.from_user.id if message.from_user else 0
- cid = message.chat.id if message.chat else 0
- mid = message.id if message.id else 0
- is_bot = bool(message.from_user and message.from_user.is_bot)
- text = message.text if message.text else ""
- first_name = message.from_user.first_name if message.from_user and message.from_user.first_name else ""
- last_name = message.from_user.last_name if message.from_user and message.from_user.last_name else ""
- handle = message.from_user.username if message.from_user and message.from_user.username else ""
- full_name = f"{first_name} {last_name}".strip() if message.from_user else ""
- video_name = message.video.file_name if message.video else ""
- photo_id = message.photo.file_unique_id if message.photo else ""
- caption = message.caption if message.caption else ""
- gif = message.animation.file_name if message.animation else ""
- sticker = message.sticker.set_name if message.sticker else ""
- file_name = message.document.file_name if message.document else ""
- message_url = f"https://t.me/c/{str(cid).removeprefix('-100')}/{mid}"
-
- # Parse URL from message entities
- entity_urls = []
- if message.entities:
- entity_urls.extend(entity.url for entity in message.entities if entity.type == MessageEntityType.TEXT_LINK)
- if message.caption_entities:
- entity_urls.extend(entity.url for entity in message.caption_entities if entity.type == MessageEntityType.TEXT_LINK)
-
- # log the summary to console
- chat_type_emoji = {
- "BOT": "π€",
- "GROUP": "π₯",
- "SUPERGROUP": "π₯",
- "CHANNEL": "π‘",
- "PRIVATE": "π΄",
- }.get(chat_type, "")
- summary = ""
- if chat_title:
- summary += f"{chat_type_emoji}{chat_title}[{mid}]"
- if first_name:
- summary += f"π€{full_name}[{uid}]" if is_bot else f"π€{full_name}[{uid}]"
- if video_name:
- summary += f" π¬{video_name}"
- if photo_id:
- summary += f" π{photo_id}"
- if sticker:
- summary += f" π¨{sticker}"
- if gif:
- summary += f" β¨{gif}"
- if file_name:
- summary += f" π{file_name}"
- if text:
- summary += f" π{text}"
- if caption:
- summary += f" π{caption}"
- logger.info(f"{summary!r}")
-
- info = { # ensure the type of each field
- "chat_type": str(chat_type),
- "chat_title": str(chat_title),
- "uid": int(uid),
- "cid": int(cid),
- "mid": int(mid),
- "is_bot": bool(is_bot),
- "text": str(text),
- "first_name": str(first_name),
- "last_name": str(last_name),
- "full_name": str(full_name),
- "handle": str(handle),
- "video_name": str(video_name),
- "file_name": str(file_name),
- "photo_id": str(photo_id),
- "caption": str(caption),
- "gif": str(gif),
- "sticker": str(sticker),
- "summary": str(summary),
- "message_url": str(message_url),
- "entity_urls": entity_urls,
- }
- cache.set(f"parse_msg-{message.chat.id}-{message.id}", info, ttl=120) # cache the same msg for 2 minutes
- return info
-
-
-@cache.memoize(ttl=60)
-def startswith_prefix(text: str | None = None, prefix: list[str] | None = None, ignore_prefix: list[str] | None = None) -> bool:
- """Check if the message text starts with the given command prefixes.
-
- Args:
- text (str): The message text.
- prefix (list[str], optional): Command prefixes that are effective.
- ignore_prefix (list[str], optional): Ignore these command prefixes.
- """
- if not text:
- return False
- if ignore_prefix and any(text.strip().lower().startswith(prefix) for prefix in ignore_prefix):
- return False
- return bool(prefix and any(text.strip().lower().startswith(prefix) for prefix in prefix))
-
-
-@cache.memoize(ttl=60)
-def equal_prefix(text: str | None = None, prefix: list[str] | None = None, ignore_prefix: list[str] | None = None) -> bool:
- """Check if the message text equal with the given command prefixes.
-
- Args:
- text (str): The message text.
- prefix (list[str], optional): Extra command prefixes that are effective.
- ignore_prefix (list[str], optional): Ignore these command prefixes.
- """
- if not text:
- return False
- if ignore_prefix and text.strip().lower() in ignore_prefix:
- return False
- return bool(prefix and text.strip().lower() in prefix)
-
-
-def get_reply_to(msg_id: int, reply_msg_id: int | str) -> ReplyParameters:
- if str(reply_msg_id) == "0":
- reply_to = msg_id
- elif str(reply_msg_id).lower() in ["-1", "none", "null", "false"]:
- reply_to = None
- else:
- reply_to = to_int(reply_msg_id)
- return ReplyParameters(message_id=reply_to) # type: ignore
-
-
-def warp_media_group(media: list[dict], caption: str = "") -> list:
- """Warp media files into a list of InputMediaPhoto or InputMediaVideo objects.
-
- item in media:
- {
- "photo": "path/to/photo.jpg",
- }
- or
- {
- "video": "path/to/video.mp4",
- "width": int,
- "height": int,
- "duration": int,
- "thumb": "path/to/thumbnail.jpg" | None,
- }
- """
- group = []
- if len(media) < 2:
- logger.error(f"Media group requires at least 2 items, number of media: {len(media)}")
- return []
- if len(caption) > CAPTION_LENGTH:
- logger.warning(f"Caption too long, length: {len(caption)}, caption: {caption}")
- caption = caption[:CAPTION_LENGTH]
- if len(media) > 10:
- logger.warning(f"Too many media files, number of media: {len(media)}")
- media = media[:10]
- # add caption to the first item
- if media[0].get("photo"):
- group.append(InputMediaPhoto(media[0]["photo"], caption=caption))
- elif media[0].get("video"):
- media[0]["media"] = media[0].pop("video")
- group.append(InputMediaVideo(caption=caption, **media[0]))
-
- # DO NOT add captions for remaining media
- for x in media[1:]:
- if x.get("photo"):
- group.append(InputMediaPhoto(x["photo"]))
- elif x.get("video"):
- x["media"] = x.pop("video")
- group.append(InputMediaVideo(**x))
- return group
-
-
-def preprocess_media(media: list[dict]) -> list[dict]:
- """Filter out invalid media files.
-
- - photo must be at most 10 MB in size.
- - photo's width and height must not exceed 10000 in total.
- - photo's width and height ratio must be at most 20.
- - filesize < 2GB for video
-
- Args:
- media (list[dict]): The list of media info.
- format: {"photo": "path/to/photo.jpg"}
- OR { "video": "path/to/video.mp4",
- "thumb"(optional): "path/to/thumbnail.jpg" (if thumb is passed, use it. Otherwise, generate a new one)
- }
-
- Returns:
- list[dict]: The filtered media info.
- {"photo": "path/to/photo.jpg"}
- or
- {
- "video": "path/to/video.mp4",
- "width": int,
- "height": int,
- "duration": int,
- "thumb": "path/to/thumbnail.jpg" | None,
- }
- """
- num_before = len(media)
- logger.trace(f"{num_before} media info before preprocess: {media}")
- results = []
-
- # Step-1: Photos
- step1_res = []
- for data in media:
- if photo_path := data.get("photo"):
- valid_photos = [validate_img(photo) for photo in split_long_img(photo_path) if validate_img(photo)]
- step1_res.extend({"photo": valid_photo} for valid_photo in valid_photos)
- continue
- step1_res.append(data) # other type
-
- # Step-2: Videos
- for data in step1_res:
- thumb = data.get("thumb") # thumb is provided
- if video_path := data.get("video"):
- video_path = fix_video_rotation(video_path)
- if not is_valid_video(video_path):
- logger.warning(f"Video is invalid: {video_path}")
- continue
-
- # split large video files ( < 2GB)
- valid_videos = [x for x in split_large_video(video_path) if is_valid_video(x)]
-
- # generate thumbnails for each video if thumb is not provided
- thumbs = [valid_thumb for _ in valid_videos] if (valid_thumb := validate_img(thumb)) else [generate_cover(x) for x in valid_videos]
- for vpath, tpath in zip(valid_videos, thumbs, strict=True):
- video_info = parse_media_info(vpath)
- thumb = valid_thumb if (valid_thumb := validate_img(tpath)) else None
- results.append({"video": vpath.as_posix(), "width": video_info["width"], "height": video_info["height"], "duration": video_info["duration"], "thumb": thumb})
- continue
- results.append(data) # other type
-
- logger.debug(f"Filtered out {num_before - len(results)} invalid media files")
- logger.trace(f"{len(results)} media info after preprocess: {results}")
- return results
-
-
-async def send2tg(
- client: Client,
- message: Message,
- target_chat: int | str = "",
- reply_msg_id: int = 0,
- *,
- texts: str = "",
- media: list[dict] | None = None,
- comments: list[str] | None = None, # append after texts
- send_from_user: str | None = None,
- cooldown: float = 0,
- **kwargs,
-) -> list[Message | None]:
- """Send unlimited number of texts and media to Telegram.
-
- Telegram Message Limitation:
- - 4096 characters for pure texts
- - 10 media in a single message
- - 1024 characters for caption (4096 for premium user)
-
- Args:
- client (Client): The Pyrogram client.
- message (Message): The trigger message object.
- target_chat (int | str, optional): The chat ID to send the message.
- reply_msg_id (int, optional): If set to integer > 0, the result is sent as a reply message to this message_id.
- If set to 0, reply to the trigger message itself.
- If set to -1, do not send as a reply message.
- texts (str, optional): The texts to send.
- media (list[dict], optional): The media files to send.
- comments (list[str], optional): The comments to append after texts.
- send_from_user (str, optional): The user name to prefix the texts.
- cooldown (float, optional): The interval between each media message. Defaults to 0.
- kwargs: Other keyword arguments. In this function, we use:
- show_progress (bool, optional): Show a progress message on Telegram. Defaults to True.
- detail_progress (bool, optional): Show detailed progress (Only if show_proress is set to True). Defaults to False.
-
- media item format:
- [
- {
- "photo": "path/to/photo.jpg",
- },
- {
- "video": "path/to/video.mp4",
- }
- ]
- TODO: Support to send audio and document
- """
- if kwargs:
- logger.debug(f"kwargs: {kwargs}")
-
- if not target_chat:
- target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
- target_chat = to_int(target_chat)
- reply_parameters = get_reply_to(message.id, reply_msg_id)
- sent_messages: list[Message | None] = [] # save sent messages results
-
- if media is None:
- media = []
- media = preprocess_media(media)
- if comments is None:
- comments = []
-
- # no text, but has comments. treat comments as texts
- texts = texts if texts else "".join(comments).strip()
-
- if send_from_user: # prefix send_from_user
- texts = f"{send_from_user}{texts.strip()}"
-
- if kwargs.get("progress") and len(media) > 0:
- await modify_progress(text=f"β«ζ£ε¨δΈδΌ :\n{summay_media(media)}", force_update=True, **kwargs)
-
- # only media
- if media and not texts:
- logger.trace(f"Sending {len(media)} media without any texts")
- if len(media) == 1:
- if media[0].get("photo"):
- sent_messages.append(await client.send_photo(chat_id=target_chat, photo=media[0]["photo"], reply_parameters=reply_parameters))
- elif media[0].get("video"):
- sent_messages.append(
- await client.send_video(
- chat_id=target_chat,
- reply_parameters=reply_parameters,
- progress=telegram_uploading,
- progress_args=(kwargs.get("progress", False), media[0]["video"], kwargs.get("detail_progress", True)),
- **media[0],
- )
- )
- elif 1 < len(media) <= 10:
- group = warp_media_group(media)
- sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=reply_parameters))
- else:
- media_chunks = [media[i : i + 10] for i in range(0, len(media), 10)]
- for idx, chunk in enumerate(media_chunks):
- if idx == 0:
- sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id, media=chunk, **kwargs))
- else:
- sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, media=chunk, **kwargs)) # disbale reply
- await asyncio.sleep(cooldown) # cool down
-
- # append comments to texts
- # For len(texts) < 1024 , ensure the combined texts and comments remains below 1024 characters to avoid sending a subsequent message containing only the comments.
- # For long texts, keep all comments
- if len(texts) < CAPTION_LENGTH:
- for comment in comments:
- if len(f"{texts}{comment}") < CAPTION_LENGTH:
- texts += comment
- else:
- texts = texts + "".join(comments)
-
- videos = [x for x in media if x.get("video")]
- photos = [x for x in media if x.get("photo")]
- logger.trace(f"{len(texts)} texts, {len(comments)} comments, {len(videos)} videos, {len(photos)} photos: {texts!r}")
-
- # only texts
- if texts and not media:
- logger.trace(f"Sending {len(texts)} texts without any media")
- for idx, msg in enumerate(smart_split(texts)):
- if idx == 0:
- sent_messages.append(await client.send_message(target_chat, msg, reply_parameters=reply_parameters))
- else:
- sent_messages.append(await client.send_message(target_chat, msg, reply_parameters=ReplyParameters()))
- return sent_messages
- # both texts and media
- if texts and media:
- logger.trace(f"Sending {len(media)} media + {len(texts)} texts")
- # short text, single media
- if len(texts) < CAPTION_LENGTH and len(media) == 1:
- if media[0].get("photo"):
- sent_messages.append(await client.send_photo(target_chat, photo=media[0]["photo"], caption=texts, reply_parameters=reply_parameters))
- elif media[0].get("video"):
- sent_messages.append(
- await client.send_video(
- chat_id=target_chat,
- caption=texts,
- reply_parameters=reply_parameters,
- progress=telegram_uploading,
- progress_args=(kwargs.get("progress", False), media[0]["video"], kwargs.get("detail_progress", True)),
- **media[0],
- )
- )
- # long text, single media
- elif len(texts) >= CAPTION_LENGTH and len(media) == 1:
- caption_text = smart_split(texts, CAPTION_LENGTH)[0]
- if media[0].get("photo"):
- sent_messages.append(await client.send_photo(target_chat, photo=media[0]["photo"], caption=caption_text, reply_parameters=reply_parameters))
- elif media[0].get("video"):
- sent_messages.append(
- await client.send_video(
- chat_id=target_chat,
- caption=caption_text,
- reply_parameters=reply_parameters,
- progress=telegram_uploading,
- progress_args=(kwargs.get("progress", False), media[0]["video"], kwargs.get("detail_progress", True)),
- **media[0],
- )
- )
- remaining_texts = texts.removeprefix(caption_text)
- sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, texts=remaining_texts, **kwargs))
-
- # short text, multiple media [1, 10]
- elif len(texts) < CAPTION_LENGTH and 1 < len(media) <= 10:
- group = warp_media_group(media, caption=texts)
- sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=reply_parameters))
-
- # short text, multiple media (10, inf)
- elif len(texts) < CAPTION_LENGTH and len(media) > 10:
- media_chunks = [media[i : i + 10] for i in range(0, len(media), 10)]
- num_chunk = len(media_chunks)
- # send pure media first, and append captions at the last chunk
- for idx, batch in enumerate(media_chunks):
- if idx == 0: # first chunk
- group = warp_media_group(batch)
- sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=reply_parameters))
- elif idx != num_chunk - 1: # disbale reply if not the last chunk
- group = warp_media_group(batch)
- sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=ReplyParameters()))
- else: # last chunk (media <= 10, texts < 1024)
- sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, texts=texts, media=batch, **kwargs))
- await asyncio.sleep(cooldown)
-
- # long text, multiple media [1, 10]
- elif len(texts) >= CAPTION_LENGTH and 1 < len(media) <= 10:
- caption_text = smart_split(texts, CAPTION_LENGTH)[0]
- remaining_texts = texts.removeprefix(caption_text)
- group = warp_media_group(media, caption=caption_text)
- sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=reply_parameters))
- sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, texts=remaining_texts, **kwargs))
-
- # long text, multiple media (10, inf)
- else:
- media_chunks = [media[i : i + 10] for i in range(0, len(media), 10)]
- num_chunk = len(media_chunks)
- # send pure media first, and append captions at the last chunk
- for idx, batch in enumerate(media_chunks):
- if idx == 0: # first chunk
- group = warp_media_group(batch)
- sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=reply_parameters))
- elif idx != num_chunk - 1: # disbale reply if not the last chunk
- group = warp_media_group(batch)
- sent_messages.extend(await client.send_media_group(target_chat, media=group, reply_parameters=ReplyParameters()))
- else: # last chunk (media <= 10, texts >= 1024)
- sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, texts=texts, media=batch, **kwargs))
- await asyncio.sleep(cooldown)
-
- # clean up
- logger.trace("Cleaning up media files")
- for x in media:
- for key in ["path", "thumb", "audio", "photo", "video"]:
- if x.get(key) and Path(x[key]).is_file():
- logger.trace(f"Deleting: {x[key]}")
- Path(x[key]).unlink(missing_ok=True)
- return sent_messages
-
-
-def summay_media(media: list[dict]) -> str:
- def filesize(path: str) -> str:
- if not isinstance(path, str):
- return ""
- return f": {readable_size(path=path)}"
-
- msg = ""
- for idx, info in enumerate(media):
- if value := info.get("photo"):
- msg += f"\nπP{idx + 1}{filesize(value)}"
- elif (value := info.get("video")) or (value := info.get("livephoto")):
- msg += f"\nπ¬P{idx + 1}{filesize(value)}"
- elif value := info.get("audio"):
- msg += f"\nπ§P{idx + 1}{filesize(value)}"
- return msg.strip()
-
-
-async def save_messages(messages: list[Message | None], key: str, metadata: dict | None = None) -> bool:
- """Save the messages to DB.
-
- data format:
- {
- "data": [
- {
- "cid": 111, # chat id
- "type": "text", # photo, video, media_group, etc.
- "mid": 222, # message id
- "text": "html format",
- },
- ...
- ]
- }
- """
- if not metadata:
- metadata = {}
- if not messages:
- logger.error(f"Skip save messages to {DB.ENGINE} due to empty message list")
- return False
- valid_messages = [x for x in messages if isinstance(x, Message)]
- if len(valid_messages) != len(messages):
- logger.warning(f"Skip save messages to {DB.ENGINE} due to invalid message type")
- return False
- time_str = valid_messages[0].date.isoformat()
- metadata["time"] = time_str
- data = []
- media_group_ids = set() # save once
- for msg in valid_messages:
- text = ""
- if msg.text:
- text = msg.text
- if msg.caption:
- text = msg.caption
- if hasattr(text, "html"): # DO NOT use markdown, because this format has some bugs
- text = text.html # type: ignore
- # Caution: this format should be consistent with `handle_social_media` function in `handler.py`
- # text = re.sub(r"^π€\[@.*?\]\(tg://user\?id=\d+\)//", "", text) # remove markdown send_from_user
- text = re.sub(r"^π€\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", text) # remove markdown send_from_user
- msg_extra = {"text": text} if text else {}
- if msg.media_group_id:
- if msg.media_group_id not in media_group_ids:
- logger.trace(f"Saving media group message {msg.id}")
- media_group_ids.add(msg.media_group_id)
- data.append({"type": "media_group", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
- continue
- if msg.video:
- logger.trace(f"Saving video message {msg.id}")
- data.append({"type": "video", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
- continue
- if msg.photo:
- logger.trace(f"Saving photo message {msg.id}")
- data.append({"type": "photo", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
- continue
- if msg.audio:
- logger.trace(f"Saving audio message {msg.id}")
- data.append({"type": "audio", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
- continue
- if msg.text:
- logger.trace(f"Saving text message {msg.id}")
- data.append({"type": "text", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
- continue
- logger.warning(f"Skip save message {msg.id} to {DB.ENGINE} due to unknown type: {msg}")
- if data:
- return await set_db(key, metadata=metadata, data={"data": data})
- return False
-
-
-async def copy_messages_from_db(client: Client, message: Message, key: str, kv: dict | None = None, **kwargs) -> bool:
- """Copy messages from database.
-
- data format:
- {
- "data": [
- {
- "cid": 111, # chat id
- "type": "text", # photo, video, media_group, etc.
- "mid": 222, # message id
- "text": "html format",
- },
- ...
- ]
- }
- OR:
- { "is_parted": True }
-
- """
- if kwargs:
- logger.debug(f"kwargs: {kwargs}")
- target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
- reply_parameters = ReplyParameters(message_id=kwargs.get("reply_msg_id", message.id))
-
- if kv is None:
- kv = await get_db(key)
- if kv.get("is_parted"):
- logger.warning(f"Parted messages found for key={key}")
- return await copy_parted_msgs_from_db(client, message, key, **kwargs)
-
- if not kv.get("data"):
- logger.error(f"Wrong {DB.ENGINE} data for key={key}: {kv}")
- return False
- data: list[dict] = kv.get("data", [])
- if isinstance(data, str):
- data = json.loads(data)
- logger.debug(f"Sending {len(data)} messages from {DB.ENGINE}: {data}")
- await modify_progress(text=f"πΎε¨{DB.ENGINE}δΈζ₯ε°ηΌε, ζ£ε¨θ½¬ε{len(data)}ζ‘ζΆζ―...", **kwargs)
- results = []
- try:
- for idx, item in enumerate(sorted(data, key=lambda x: x["mid"])):
- with contextlib.suppress(ValueError):
- cid = int(item["cid"])
- if idx != 0:
- reply_parameters = ReplyParameters() # only send as reply of the first message
- logger.debug(f"Copying {item['type']} message: ({cid}, {item['mid']}) -> target_chat={target_chat}")
- text = item.get("text") # str or None
- if text and kwargs.get("send_from_user"):
- text = f"{sender_markdown_to_html(kwargs['send_from_user'])}{text}"
- if item["type"] == "text":
- if text:
- results.append(await client.send_message(chat_id=target_chat, text=text, reply_parameters=reply_parameters))
- else:
- db_msg: Message = await client.get_messages(chat_id=cid, message_ids=int(item["mid"]), replies=0) # type: ignore
- results.append(await client.send_message(chat_id=target_chat, text=db_msg.text, reply_parameters=reply_parameters))
- elif item["type"] in ["photo", "audio", "video"]:
- results.append(await client.copy_message(chat_id=target_chat, caption=text, from_chat_id=cid, message_id=int(item["mid"]), reply_parameters=reply_parameters)) # type: ignore
- elif item["type"] == "media_group":
- results.extend(await client.copy_media_group(chat_id=target_chat, captions=text, from_chat_id=cid, message_id=int(item["mid"]), reply_parameters=reply_parameters)) # type: ignore
- else:
- logger.warning(f"Unknown message type: {item}")
- except Exception as e:
- logger.error(f"Failed to copy messages for key={key} from {DB.ENGINE}: {e}")
- await del_db(key)
- return False
- if all(isinstance(x, Message) for x in results):
- logger.success(f"Successfully copied {len(results)} messages for key={key} from {DB.ENGINE}")
- await modify_progress(del_status=True, **kwargs)
- return True
- await del_db(key)
- return False
-
-
-async def copy_parted_msgs_from_db(client: Client, message: Message, key: str, suffix: str = "-P", **kwargs) -> bool:
- """Copy parted messages from database.
-
- For some large video files, they can be sent in parts.
- The primary key is the video link, and the parted key is suffixed with "P".
- """
- for idx in range(1, 100):
- logger.trace(f"Checking parted message for key={key}{suffix}{idx}")
- kv = await get_db(f"{key}{suffix}{idx}")
- if not kv:
- logger.debug(f"No more parted messages found for key={key}{suffix}{idx}")
- return True
- if not await copy_messages_from_db(client, message, key=f"{key}{suffix}{idx}", kv=kv, **kwargs):
- logger.error(f"Failed to copy parted messages for key={key}{suffix}{idx}")
- return False
- logger.warning(f"Too many parted messages found for key={key}")
- return False
-
-
-def sender_markdown_to_html(sender: str) -> str:
- """Convert markdown to html.
-
- π€[@username](tg://user?id=123456789)// ->
- π€<a href="tg://user?id=123456789">@username</a>//
- """
- if not sender:
- return ""
- return re.sub(r"^π€\[@(.*?)\]\(tg://user\?id=(\d+)\)", r'π€<a href="tg://user?id=\2">@\1</a>', sender)
-
-
-async def modify_progress(message: Message | None = None, text: str = "", *, detail_progress: bool = False, del_status: bool = False, force_update: bool = False, **kwargs):
- """Modify the progress message.
-
- Args:
- message (Message): The progress message object.
- text (str): The new text to update.
- detail_progress(bool): Whether to show the detail progress.
- del_status (bool): Whether the progress is done.
- force_update (bool): Force update the message.
- """
- if message is None:
- message = kwargs.get("progress")
- if not isinstance(message, Message):
- return
- try:
- if del_status:
- logger.info("Deleting progress message")
- await message.delete()
- return
- if not text:
- return
- if cache.get("modify_progress"): # DO NOT update too frequently
- detail_progress = False
- if force_update:
- detail_progress = True
- if not detail_progress:
- return
- logger.trace(f"Progress: {text!r}")
- await message.edit_text(text[:TEXT_LENGTH])
- cache.set("modify_progress", "1", ttl=2)
- except Flood as e:
- logger.warning(f"modify_progress: {e}")
-
-
-async def telegram_uploading(current: int, total: int, *args):
- """Show video uploading progress."""
- msg = f"δΈδΌ δΈ: {current / 1024 / 1024:.1f} / {total / 1024 / 1024:.1f} MB ({current / total:.2%})"
- if len(args) != 3:
- return
- message = args[0]
- path = args[1]
- detail_progress = args[2]
- if not Path(path).is_file():
- logger.error(f"File not found: {path}")
- return
- _type = "θ§ι’" if Path(path).suffix in [".mp4", ".mkv", ".mov", ".webm", ".avi", ".flv", ".wmv", ".m4v"] else "ι³ι’"
- emoji = "π¬" if _type == "θ§ι’" else "π§"
- msg = f"β«{_type}{msg}\n{emoji}{Path(path).name}"
- await modify_progress(message=message, text=msg, detail_progress=detail_progress)
-
-
-if __name__ == "__main__":
- print(sender_markdown_to_html("π€[@username](tg://user?id=123456789)//"))
- print(preprocess_media([{"video": "~/tests/test.mp4", "thumb": "~/tests/test.jpg"}]))
src/networking.py
@@ -15,7 +15,8 @@ from httpx import AsyncClient, AsyncHTTPTransport, HTTPStatusError, RequestError
from loguru import logger
from config import DOWNLOAD_DIR, PROXY, UA, cache, semaphore
-from message_utils import modify_progress, summay_media
+from messages.progress import modify_progress
+from messages.utils import summay_media
from utils import bare_url, https_url, readable_size
# ruff: noqa: RUF001