main
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3# ty:ignore[invalid-argument-type]
4import json
5import re
6
7from loguru import logger
8from pyrogram.client import Client
9from pyrogram.types import Message, ReplyParameters
10
11from database.r2 import del_cf_r2, get_cf_r2, set_cf_r2
12from messages.parser import get_thread_id, parse_msg
13from messages.progress import modify_progress
14from messages.utils import sender_markdown_to_html
15from utils import to_int, true
16
17
18async def save_messages(messages: list[Message], key: str, metadata: dict | None = None) -> bool:
19 """Save the messages to DB.
20
21 data format:
22 {
23 "data": [
24 {
25 "cid": 111, # chat id
26 "type": "text", # photo, video, media_group, etc.
27 "mid": 222, # message id
28 "text": "html format",
29 },
30 ...
31 ]
32 }
33 """
34 if not metadata:
35 metadata = {}
36 if not messages:
37 logger.error("Skip save messages to R2 due to empty message list")
38 return False
39 valid_messages = [x for x in messages if isinstance(x, Message)]
40 if len(valid_messages) != len(messages):
41 logger.warning("Skip save messages to R2 due to invalid message type")
42 return False
43 time_str = valid_messages[0].date.isoformat()
44 metadata["time"] = time_str
45 data = []
46 media_group_ids = set() # save once
47 for msg in valid_messages:
48 info = parse_msg(msg, silent=True)
49 # Caution: this format should be consistent with `process_message` function in `messages/main.py`
50 # text = re.sub(r"^👤\[@.*?\]\(tg://user\?id=\d+\)//", "", text) # remove markdown send_from_user
51 text = re.sub(r"^👤\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", info["html"]) # remove html send_from_user
52 msg_extra = {"cid": info["cid"], "mid": info["mid"], "tid": info["tid"]}
53 if text:
54 msg_extra["text"] = text
55 if msg.media_group_id:
56 if msg.media_group_id not in media_group_ids:
57 logger.trace(f"Saving media group message {msg.id}")
58 media_group_ids.add(msg.media_group_id)
59 data.append({"type": "media_group"} | msg_extra)
60 continue
61 if info["mtype"] == "video":
62 logger.trace(f"Saving video message {msg.id}")
63 data.append({"type": "video"} | msg_extra)
64 continue
65 if info["mtype"] == "photo":
66 logger.trace(f"Saving photo message {msg.id}")
67 data.append({"type": "photo"} | msg_extra)
68 continue
69 if info["mtype"] == "audio":
70 logger.trace(f"Saving audio message {msg.id}")
71 data.append({"type": "audio"} | msg_extra)
72 continue
73 if info["mtype"] == "text":
74 logger.trace(f"Saving text message {msg.id}")
75 data.append({"type": "text"} | msg_extra)
76 continue
77 if info["mtype"] == "document":
78 logger.trace(f"Saving document message {msg.id}")
79 data.append({"type": "document"} | msg_extra)
80 continue
81 logger.warning(f"Skip save message {msg.id} to R2 due to unknown type: {msg}")
82 if data:
83 return await set_cf_r2(key, metadata=metadata, data={"data": data})
84 return False
85
86
87async def copy_messages_from_db(
88 client: Client,
89 message: Message,
90 key: str,
91 kv: dict | None = None,
92 *,
93 copy_video_msg: bool = True,
94 copy_photo_msg: bool = True,
95 copy_audio_msg: bool = True,
96 copy_document_msg: bool = True,
97 copy_text_msg: bool = True,
98 copy_media_group_msg: bool = True,
99 **kwargs,
100) -> list[Message]:
101 """Copy messages from database.
102
103 data format:
104 {
105 "data": [
106 {
107 "cid": 111, # chat id
108 "type": "text", # photo, video, media_group, etc.
109 "mid": 222, # message id
110 "tid": 333, # thread id
111 "text": "html format",
112 },
113 ...
114 ]
115 }
116 """
117
118 def custom_sort(item):
119 """Custom sort function to sort messages.
120
121 1. sort by type
122 2. sort by mid
123 """
124 type_order = {"media_group": 0, "video": 1, "photo": 2, "audio": 3, "document": 4, "text": 5}
125 return (type_order.get(item["type"], 999), item["mid"])
126
127 target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
128 reply_msg_id = kwargs.get("reply_msg_id", message.id)
129 if to_int(reply_msg_id) == 0:
130 target_mid = message.id
131 elif to_int(reply_msg_id) == -1:
132 target_mid = None
133 else:
134 target_mid = to_int(reply_msg_id)
135 reply_parameters = ReplyParameters(message_id=target_mid)
136 tid = get_thread_id(message)
137 if kv is None:
138 kv = await get_cf_r2(key)
139 if not kv.get("data"):
140 logger.error(f"Wrong R2 data for key={key}: {kv}")
141 return []
142 data: list[dict] = kv.get("data", [])
143 if isinstance(data, str):
144 data = json.loads(data)
145 logger.debug(f"Sending {len(data)} messages from R2: {data}")
146 results: list[Message] = []
147 try:
148 for idx, item in enumerate(sorted(data, key=custom_sort)):
149 cid = to_int(item["cid"])
150 if idx != 0:
151 reply_parameters = None # only send as reply of the first message
152 logger.debug(f"Copying {item['type']} message: ({cid}, {item['mid']}) -> target_chat={target_chat}")
153 text = item.get("text") # str or None
154 if text and kwargs.get("send_from_user"):
155 text = f"{sender_markdown_to_html(kwargs['send_from_user'])}{text}"
156 if true(copy_text_msg) and item["type"] == "text":
157 if text:
158 results.append(await client.send_message(chat_id=target_chat, text=text, message_thread_id=tid, reply_parameters=reply_parameters))
159 else:
160 db_msg: Message = await client.get_messages(chat_id=cid, message_ids=int(item["mid"]), replies=0) # type: ignore
161 results.append(await client.send_message(chat_id=target_chat, text=db_msg.text, message_thread_id=tid, reply_parameters=reply_parameters))
162 elif (
163 (true(copy_video_msg) and item["type"] == "video")
164 or (true(copy_photo_msg) and item["type"] == "photo")
165 or (true(copy_audio_msg) and item["type"] == "audio")
166 or (true(copy_document_msg) and item["type"] == "document")
167 ):
168 results.append(
169 await client.copy_message(
170 chat_id=target_chat,
171 caption=text,
172 from_chat_id=cid,
173 message_id=int(item["mid"]),
174 message_thread_id=tid,
175 reply_parameters=reply_parameters,
176 )
177 )
178 elif true(copy_media_group_msg) and item["type"] == "media_group":
179 results.extend(
180 await client.copy_media_group(
181 chat_id=target_chat,
182 captions=text,
183 from_chat_id=cid,
184 message_thread_id=tid,
185 message_id=int(item["mid"]),
186 reply_parameters=reply_parameters,
187 )
188 )
189 else:
190 logger.warning(f"Unknown message type: {item}")
191 except Exception as e:
192 logger.error(f"Failed to copy messages for key={key} from R2: {e}")
193 await del_cf_r2(key)
194 return []
195 if all(isinstance(x, Message) for x in results):
196 logger.success(f"Successfully copied {len(results)} messages for key={key} from R2")
197 await modify_progress(del_status=True, **kwargs)
198 return results
199 await del_cf_r2(key)
200 return []