main
  1#!/usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# ty:ignore[invalid-argument-type]
  4import json
  5import re
  6
  7from loguru import logger
  8from pyrogram.client import Client
  9from pyrogram.types import Message, ReplyParameters
 10
 11from database.r2 import del_cf_r2, get_cf_r2, set_cf_r2
 12from messages.parser import get_thread_id, parse_msg
 13from messages.progress import modify_progress
 14from messages.utils import sender_markdown_to_html
 15from utils import to_int, true
 16
 17
 18async def save_messages(messages: list[Message], key: str, metadata: dict | None = None) -> bool:
 19    """Save the messages to DB.
 20
 21    data format:
 22    {
 23        "data": [
 24            {
 25                "cid": 111,  # chat id
 26                "type": "text", # photo, video, media_group, etc.
 27                "mid": 222,  # message id
 28                "text": "html format",
 29            },
 30            ...
 31        ]
 32    }
 33    """
 34    if not metadata:
 35        metadata = {}
 36    if not messages:
 37        logger.error("Skip save messages to R2 due to empty message list")
 38        return False
 39    valid_messages = [x for x in messages if isinstance(x, Message)]
 40    if len(valid_messages) != len(messages):
 41        logger.warning("Skip save messages to R2 due to invalid message type")
 42        return False
 43    time_str = valid_messages[0].date.isoformat()
 44    metadata["time"] = time_str
 45    data = []
 46    media_group_ids = set()  # save once
 47    for msg in valid_messages:
 48        info = parse_msg(msg, silent=True)
 49        # Caution: this format should be consistent with `process_message` function in `messages/main.py`
 50        # text = re.sub(r"^👤\[@.*?\]\(tg://user\?id=\d+\)//", "", text)  # remove markdown send_from_user
 51        text = re.sub(r"^👤\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", info["html"])  # remove html send_from_user
 52        msg_extra = {"cid": info["cid"], "mid": info["mid"], "tid": info["tid"]}
 53        if text:
 54            msg_extra["text"] = text
 55        if msg.media_group_id:
 56            if msg.media_group_id not in media_group_ids:
 57                logger.trace(f"Saving media group message {msg.id}")
 58                media_group_ids.add(msg.media_group_id)
 59                data.append({"type": "media_group"} | msg_extra)
 60            continue
 61        if info["mtype"] == "video":
 62            logger.trace(f"Saving video message {msg.id}")
 63            data.append({"type": "video"} | msg_extra)
 64            continue
 65        if info["mtype"] == "photo":
 66            logger.trace(f"Saving photo message {msg.id}")
 67            data.append({"type": "photo"} | msg_extra)
 68            continue
 69        if info["mtype"] == "audio":
 70            logger.trace(f"Saving audio message {msg.id}")
 71            data.append({"type": "audio"} | msg_extra)
 72            continue
 73        if info["mtype"] == "text":
 74            logger.trace(f"Saving text message {msg.id}")
 75            data.append({"type": "text"} | msg_extra)
 76            continue
 77        if info["mtype"] == "document":
 78            logger.trace(f"Saving document message {msg.id}")
 79            data.append({"type": "document"} | msg_extra)
 80            continue
 81        logger.warning(f"Skip save message {msg.id} to R2 due to unknown type: {msg}")
 82    if data:
 83        return await set_cf_r2(key, metadata=metadata, data={"data": data})
 84    return False
 85
 86
 87async def copy_messages_from_db(
 88    client: Client,
 89    message: Message,
 90    key: str,
 91    kv: dict | None = None,
 92    *,
 93    copy_video_msg: bool = True,
 94    copy_photo_msg: bool = True,
 95    copy_audio_msg: bool = True,
 96    copy_document_msg: bool = True,
 97    copy_text_msg: bool = True,
 98    copy_media_group_msg: bool = True,
 99    **kwargs,
100) -> list[Message]:
101    """Copy messages from database.
102
103    data format:
104        {
105            "data": [
106                {
107                    "cid": 111,  # chat id
108                    "type": "text", # photo, video, media_group, etc.
109                    "mid": 222,  # message id
110                    "tid": 333,  # thread id
111                    "text": "html format",
112                },
113                ...
114            ]
115        }
116    """
117
118    def custom_sort(item):
119        """Custom sort function to sort messages.
120
121        1. sort by type
122        2. sort by mid
123        """
124        type_order = {"media_group": 0, "video": 1, "photo": 2, "audio": 3, "document": 4, "text": 5}
125        return (type_order.get(item["type"], 999), item["mid"])
126
127    target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
128    reply_msg_id = kwargs.get("reply_msg_id", message.id)
129    if to_int(reply_msg_id) == 0:
130        target_mid = message.id
131    elif to_int(reply_msg_id) == -1:
132        target_mid = None
133    else:
134        target_mid = to_int(reply_msg_id)
135    reply_parameters = ReplyParameters(message_id=target_mid)
136    tid = get_thread_id(message)
137    if kv is None:
138        kv = await get_cf_r2(key)
139    if not kv.get("data"):
140        logger.error(f"Wrong R2 data for key={key}: {kv}")
141        return []
142    data: list[dict] = kv.get("data", [])
143    if isinstance(data, str):
144        data = json.loads(data)
145    logger.debug(f"Sending {len(data)} messages from R2: {data}")
146    results: list[Message] = []
147    try:
148        for idx, item in enumerate(sorted(data, key=custom_sort)):
149            cid = to_int(item["cid"])
150            if idx != 0:
151                reply_parameters = None  # only send as reply of the first message
152            logger.debug(f"Copying {item['type']} message: ({cid}, {item['mid']}) -> target_chat={target_chat}")
153            text = item.get("text")  # str or None
154            if text and kwargs.get("send_from_user"):
155                text = f"{sender_markdown_to_html(kwargs['send_from_user'])}{text}"
156            if true(copy_text_msg) and item["type"] == "text":
157                if text:
158                    results.append(await client.send_message(chat_id=target_chat, text=text, message_thread_id=tid, reply_parameters=reply_parameters))
159                else:
160                    db_msg: Message = await client.get_messages(chat_id=cid, message_ids=int(item["mid"]), replies=0)  # type: ignore
161                    results.append(await client.send_message(chat_id=target_chat, text=db_msg.text, message_thread_id=tid, reply_parameters=reply_parameters))
162            elif (
163                (true(copy_video_msg) and item["type"] == "video")
164                or (true(copy_photo_msg) and item["type"] == "photo")
165                or (true(copy_audio_msg) and item["type"] == "audio")
166                or (true(copy_document_msg) and item["type"] == "document")
167            ):
168                results.append(
169                    await client.copy_message(
170                        chat_id=target_chat,
171                        caption=text,
172                        from_chat_id=cid,
173                        message_id=int(item["mid"]),
174                        message_thread_id=tid,
175                        reply_parameters=reply_parameters,
176                    )
177                )
178            elif true(copy_media_group_msg) and item["type"] == "media_group":
179                results.extend(
180                    await client.copy_media_group(
181                        chat_id=target_chat,
182                        captions=text,
183                        from_chat_id=cid,
184                        message_thread_id=tid,
185                        message_id=int(item["mid"]),
186                        reply_parameters=reply_parameters,
187                    )
188                )
189            else:
190                logger.warning(f"Unknown message type: {item}")
191    except Exception as e:
192        logger.error(f"Failed to copy messages for key={key} from R2: {e}")
193        await del_cf_r2(key)
194        return []
195    if all(isinstance(x, Message) for x in results):
196        logger.success(f"Successfully copied {len(results)} messages for key={key} from R2")
197        await modify_progress(del_status=True, **kwargs)
198        return results
199    await del_cf_r2(key)
200    return []