main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3
  4import json
  5import re
  6
  7from loguru import logger
  8from pyrogram.client import Client
  9from pyrogram.types import Message, ReplyParameters
 10
 11from config import DB
 12from database.database import del_db, get_db, set_db
 13from messages.parser import parse_msg
 14from messages.progress import modify_progress
 15from messages.utils import sender_markdown_to_html
 16from utils import to_int, true
 17
 18
 19async def save_messages(messages: list[Message | None], key: str, metadata: dict | None = None) -> bool:
 20    """Save the messages to DB.
 21
 22    data format:
 23    {
 24        "data": [
 25            {
 26                "cid": 111,  # chat id
 27                "type": "text", # photo, video, media_group, etc.
 28                "mid": 222,  # message id
 29                "text": "html format",
 30            },
 31            ...
 32        ]
 33    }
 34    """
 35    if not metadata:
 36        metadata = {}
 37    if not messages:
 38        logger.error(f"Skip save messages to {DB.ENGINE} due to empty message list")
 39        return False
 40    valid_messages = [x for x in messages if isinstance(x, Message)]
 41    if len(valid_messages) != len(messages):
 42        logger.warning(f"Skip save messages to {DB.ENGINE} due to invalid message type")
 43        return False
 44    time_str = valid_messages[0].date.isoformat()
 45    metadata["time"] = time_str
 46    data = []
 47    media_group_ids = set()  # save once
 48    for msg in valid_messages:
 49        info = parse_msg(msg, silent=True)
 50        # Caution: this format should be consistent with `process_message` function in `messages/main.py`
 51        # text = re.sub(r"^👤\[@.*?\]\(tg://user\?id=\d+\)//", "", text)  # remove markdown send_from_user
 52        text = re.sub(r"^👤\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", info["html"])  # remove html send_from_user
 53        msg_extra = {"text": text} if text else {}
 54        if msg.media_group_id:
 55            if msg.media_group_id not in media_group_ids:
 56                logger.trace(f"Saving media group message {msg.id}")
 57                media_group_ids.add(msg.media_group_id)
 58                data.append({"type": "media_group", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
 59            continue
 60        if info["mtype"] == "video":
 61            logger.trace(f"Saving video message {msg.id}")
 62            data.append({"type": "video", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
 63            continue
 64        if info["mtype"] == "photo":
 65            logger.trace(f"Saving photo message {msg.id}")
 66            data.append({"type": "photo", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
 67            continue
 68        if info["mtype"] == "audio":
 69            logger.trace(f"Saving audio message {msg.id}")
 70            data.append({"type": "audio", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
 71            continue
 72        if info["mtype"] == "text":
 73            logger.trace(f"Saving text message {msg.id}")
 74            data.append({"type": "text", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
 75            continue
 76        if info["mtype"] == "document":
 77            logger.trace(f"Saving document message {msg.id}")
 78            data.append({"type": "document", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
 79            continue
 80        logger.warning(f"Skip save message {msg.id} to {DB.ENGINE} due to unknown type: {msg}")
 81    if data:
 82        return await set_db(key, metadata=metadata, data={"data": data})
 83    return False
 84
 85
 86async def copy_messages_from_db(
 87    client: Client,
 88    message: Message,
 89    key: str,
 90    kv: dict | None = None,
 91    *,
 92    copy_video_msg: bool = True,
 93    copy_photo_msg: bool = True,
 94    copy_audio_msg: bool = True,
 95    copy_document_msg: bool = True,
 96    copy_text_msg: bool = True,
 97    copy_media_group_msg: bool = True,
 98    **kwargs,
 99) -> list[Message]:
100    """Copy messages from database.
101
102    data format:
103        {
104            "data": [
105                {
106                    "cid": 111,  # chat id
107                    "type": "text", # photo, video, media_group, etc.
108                    "mid": 222,  # message id
109                    "text": "html format",
110                },
111                ...
112            ]
113        }
114    """
115
116    def custom_sort(item):
117        """Custom sort function to sort messages.
118
119        1. sort by type
120        2. sort by mid
121        """
122        type_order = {"media_group": 0, "video": 1, "photo": 2, "audio": 3, "document": 4, "text": 5}
123        return (type_order.get(item["type"], 999), item["mid"])
124
125    target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
126    reply_msg_id = kwargs.get("reply_msg_id", message.id)
127    if to_int(reply_msg_id) == 0:
128        target_mid = message.id
129    elif to_int(reply_msg_id) == -1:
130        target_mid = None
131    else:
132        target_mid = to_int(reply_msg_id)
133    reply_parameters = ReplyParameters(message_id=target_mid)  # type: ignore
134
135    if kv is None:
136        kv = await get_db(key)
137    if not kv.get("data"):
138        logger.error(f"Wrong {DB.ENGINE} data for key={key}: {kv}")
139        return []
140    data: list[dict] = kv.get("data", [])
141    if isinstance(data, str):
142        data = json.loads(data)
143    logger.debug(f"Sending {len(data)} messages from {DB.ENGINE}: {data}")
144    results: list[Message] = []
145    try:
146        for idx, item in enumerate(sorted(data, key=custom_sort)):
147            cid = to_int(item["cid"])
148            if idx != 0:
149                reply_parameters = ReplyParameters()  # only send as reply of the first message
150            logger.debug(f"Copying {item['type']} message: ({cid}, {item['mid']}) -> target_chat={target_chat}")
151            text = item.get("text")  # str or None
152            if text and kwargs.get("send_from_user"):
153                text = f"{sender_markdown_to_html(kwargs['send_from_user'])}{text}"
154            if true(copy_text_msg) and item["type"] == "text":
155                if text:
156                    results.append(await client.send_message(chat_id=target_chat, text=text, reply_parameters=reply_parameters))
157                else:
158                    db_msg: Message = await client.get_messages(chat_id=cid, message_ids=int(item["mid"]), replies=0)  # type: ignore
159                    results.append(await client.send_message(chat_id=target_chat, text=db_msg.text, reply_parameters=reply_parameters))
160            elif (
161                (true(copy_video_msg) and item["type"] == "video")
162                or (true(copy_photo_msg) and item["type"] == "photo")
163                or (true(copy_audio_msg) and item["type"] == "audio")
164                or (true(copy_document_msg) and item["type"] == "document")
165            ):
166                results.append(await client.copy_message(chat_id=target_chat, caption=text, from_chat_id=cid, message_id=int(item["mid"]), reply_parameters=reply_parameters))  # type: ignore
167            elif true(copy_media_group_msg) and item["type"] == "media_group":
168                results.extend(await client.copy_media_group(chat_id=target_chat, captions=text, from_chat_id=cid, message_id=int(item["mid"]), reply_parameters=reply_parameters))  # type: ignore
169            else:
170                logger.warning(f"Unknown message type: {item}")
171    except Exception as e:
172        logger.error(f"Failed to copy messages for key={key} from {DB.ENGINE}: {e}")
173        await del_db(key)
174        return []
175    if all(isinstance(x, Message) for x in results):
176        logger.success(f"Successfully copied {len(results)} messages for key={key} from {DB.ENGINE}")
177        await modify_progress(del_status=True, **kwargs)
178        return results
179    await del_db(key)
180    return []