main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4import json
5import re
6
7from loguru import logger
8from pyrogram.client import Client
9from pyrogram.types import Message, ReplyParameters
10
11from config import DB
12from database.database import del_db, get_db, set_db
13from messages.parser import parse_msg
14from messages.progress import modify_progress
15from messages.utils import sender_markdown_to_html
16from utils import to_int, true
17
18
19async def save_messages(messages: list[Message | None], key: str, metadata: dict | None = None) -> bool:
20 """Save the messages to DB.
21
22 data format:
23 {
24 "data": [
25 {
26 "cid": 111, # chat id
27 "type": "text", # photo, video, media_group, etc.
28 "mid": 222, # message id
29 "text": "html format",
30 },
31 ...
32 ]
33 }
34 """
35 if not metadata:
36 metadata = {}
37 if not messages:
38 logger.error(f"Skip save messages to {DB.ENGINE} due to empty message list")
39 return False
40 valid_messages = [x for x in messages if isinstance(x, Message)]
41 if len(valid_messages) != len(messages):
42 logger.warning(f"Skip save messages to {DB.ENGINE} due to invalid message type")
43 return False
44 time_str = valid_messages[0].date.isoformat()
45 metadata["time"] = time_str
46 data = []
47 media_group_ids = set() # save once
48 for msg in valid_messages:
49 info = parse_msg(msg, silent=True)
50 # Caution: this format should be consistent with `process_message` function in `messages/main.py`
51 # text = re.sub(r"^👤\[@.*?\]\(tg://user\?id=\d+\)//", "", text) # remove markdown send_from_user
52 text = re.sub(r"^👤\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", info["html"]) # remove html send_from_user
53 msg_extra = {"text": text} if text else {}
54 if msg.media_group_id:
55 if msg.media_group_id not in media_group_ids:
56 logger.trace(f"Saving media group message {msg.id}")
57 media_group_ids.add(msg.media_group_id)
58 data.append({"type": "media_group", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
59 continue
60 if info["mtype"] == "video":
61 logger.trace(f"Saving video message {msg.id}")
62 data.append({"type": "video", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
63 continue
64 if info["mtype"] == "photo":
65 logger.trace(f"Saving photo message {msg.id}")
66 data.append({"type": "photo", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
67 continue
68 if info["mtype"] == "audio":
69 logger.trace(f"Saving audio message {msg.id}")
70 data.append({"type": "audio", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
71 continue
72 if info["mtype"] == "text":
73 logger.trace(f"Saving text message {msg.id}")
74 data.append({"type": "text", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
75 continue
76 if info["mtype"] == "document":
77 logger.trace(f"Saving document message {msg.id}")
78 data.append({"type": "document", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
79 continue
80 logger.warning(f"Skip save message {msg.id} to {DB.ENGINE} due to unknown type: {msg}")
81 if data:
82 return await set_db(key, metadata=metadata, data={"data": data})
83 return False
84
85
86async def copy_messages_from_db(
87 client: Client,
88 message: Message,
89 key: str,
90 kv: dict | None = None,
91 *,
92 copy_video_msg: bool = True,
93 copy_photo_msg: bool = True,
94 copy_audio_msg: bool = True,
95 copy_document_msg: bool = True,
96 copy_text_msg: bool = True,
97 copy_media_group_msg: bool = True,
98 **kwargs,
99) -> list[Message]:
100 """Copy messages from database.
101
102 data format:
103 {
104 "data": [
105 {
106 "cid": 111, # chat id
107 "type": "text", # photo, video, media_group, etc.
108 "mid": 222, # message id
109 "text": "html format",
110 },
111 ...
112 ]
113 }
114 """
115
116 def custom_sort(item):
117 """Custom sort function to sort messages.
118
119 1. sort by type
120 2. sort by mid
121 """
122 type_order = {"media_group": 0, "video": 1, "photo": 2, "audio": 3, "document": 4, "text": 5}
123 return (type_order.get(item["type"], 999), item["mid"])
124
125 target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
126 reply_msg_id = kwargs.get("reply_msg_id", message.id)
127 if to_int(reply_msg_id) == 0:
128 target_mid = message.id
129 elif to_int(reply_msg_id) == -1:
130 target_mid = None
131 else:
132 target_mid = to_int(reply_msg_id)
133 reply_parameters = ReplyParameters(message_id=target_mid) # type: ignore
134
135 if kv is None:
136 kv = await get_db(key)
137 if not kv.get("data"):
138 logger.error(f"Wrong {DB.ENGINE} data for key={key}: {kv}")
139 return []
140 data: list[dict] = kv.get("data", [])
141 if isinstance(data, str):
142 data = json.loads(data)
143 logger.debug(f"Sending {len(data)} messages from {DB.ENGINE}: {data}")
144 results: list[Message] = []
145 try:
146 for idx, item in enumerate(sorted(data, key=custom_sort)):
147 cid = to_int(item["cid"])
148 if idx != 0:
149 reply_parameters = ReplyParameters() # only send as reply of the first message
150 logger.debug(f"Copying {item['type']} message: ({cid}, {item['mid']}) -> target_chat={target_chat}")
151 text = item.get("text") # str or None
152 if text and kwargs.get("send_from_user"):
153 text = f"{sender_markdown_to_html(kwargs['send_from_user'])}{text}"
154 if true(copy_text_msg) and item["type"] == "text":
155 if text:
156 results.append(await client.send_message(chat_id=target_chat, text=text, reply_parameters=reply_parameters))
157 else:
158 db_msg: Message = await client.get_messages(chat_id=cid, message_ids=int(item["mid"]), replies=0) # type: ignore
159 results.append(await client.send_message(chat_id=target_chat, text=db_msg.text, reply_parameters=reply_parameters))
160 elif (
161 (true(copy_video_msg) and item["type"] == "video")
162 or (true(copy_photo_msg) and item["type"] == "photo")
163 or (true(copy_audio_msg) and item["type"] == "audio")
164 or (true(copy_document_msg) and item["type"] == "document")
165 ):
166 results.append(await client.copy_message(chat_id=target_chat, caption=text, from_chat_id=cid, message_id=int(item["mid"]), reply_parameters=reply_parameters)) # type: ignore
167 elif true(copy_media_group_msg) and item["type"] == "media_group":
168 results.extend(await client.copy_media_group(chat_id=target_chat, captions=text, from_chat_id=cid, message_id=int(item["mid"]), reply_parameters=reply_parameters)) # type: ignore
169 else:
170 logger.warning(f"Unknown message type: {item}")
171 except Exception as e:
172 logger.error(f"Failed to copy messages for key={key} from {DB.ENGINE}: {e}")
173 await del_db(key)
174 return []
175 if all(isinstance(x, Message) for x in results):
176 logger.success(f"Successfully copied {len(results)} messages for key={key} from {DB.ENGINE}")
177 await modify_progress(del_status=True, **kwargs)
178 return results
179 await del_db(key)
180 return []