Commit fa1eadf
Changed files (2)
src
messages
src/messages/database.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-
+# ty:ignore[invalid-argument-type]
import json
import re
@@ -10,7 +10,7 @@ from pyrogram.types import Message, ReplyParameters
from config import DB
from database.database import del_db, get_db, set_db
-from messages.parser import parse_msg
+from messages.parser import get_thread_id, parse_msg
from messages.progress import modify_progress
from messages.utils import sender_markdown_to_html
from utils import to_int, true
@@ -47,35 +47,37 @@ async def save_messages(messages: list[Message | None], key: str, metadata: dict
media_group_ids = set() # save once
for msg in valid_messages:
info = parse_msg(msg, silent=True)
- # Caution: this format should be consistent with `handle_social_media` function in `handler.py`
+ # Caution: this format should be consistent with `process_message` function in `messages/main.py`
# text = re.sub(r"^👤\[@.*?\]\(tg://user\?id=\d+\)//", "", text) # remove markdown send_from_user
text = re.sub(r"^👤\<a.*?tg://user\?id=\d+.*?@.*?</a>//", "", info["html"]) # remove html send_from_user
- msg_extra = {"text": text} if text else {}
+ msg_extra = {"cid": info["cid"], "mid": info["mid"], "tid": info["tid"]}
+ if text:
+ msg_extra["text"] = text
if msg.media_group_id:
if msg.media_group_id not in media_group_ids:
logger.trace(f"Saving media group message {msg.id}")
media_group_ids.add(msg.media_group_id)
- data.append({"type": "media_group", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ data.append({"type": "media_group"} | msg_extra)
continue
if info["mtype"] == "video":
logger.trace(f"Saving video message {msg.id}")
- data.append({"type": "video", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ data.append({"type": "video"} | msg_extra)
continue
if info["mtype"] == "photo":
logger.trace(f"Saving photo message {msg.id}")
- data.append({"type": "photo", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ data.append({"type": "photo"} | msg_extra)
continue
if info["mtype"] == "audio":
logger.trace(f"Saving audio message {msg.id}")
- data.append({"type": "audio", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ data.append({"type": "audio"} | msg_extra)
continue
if info["mtype"] == "text":
logger.trace(f"Saving text message {msg.id}")
- data.append({"type": "text", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ data.append({"type": "text"} | msg_extra)
continue
if info["mtype"] == "document":
logger.trace(f"Saving document message {msg.id}")
- data.append({"type": "document", "cid": msg.chat.id, "mid": msg.id} | msg_extra)
+ data.append({"type": "document"} | msg_extra)
continue
logger.warning(f"Skip save message {msg.id} to {DB.ENGINE} due to unknown type: {msg}")
if data:
@@ -106,6 +108,7 @@ async def copy_messages_from_db(
"cid": 111, # chat id
"type": "text", # photo, video, media_group, etc.
"mid": 222, # message id
+ "tid": 333, # thread id
"text": "html format",
},
...
@@ -130,8 +133,8 @@ async def copy_messages_from_db(
target_mid = None
else:
target_mid = to_int(reply_msg_id)
- reply_parameters = ReplyParameters(message_id=target_mid) # type: ignore
-
+ reply_parameters = ReplyParameters(message_id=target_mid)
+ tid = get_thread_id(message)
if kv is None:
kv = await get_db(key)
if not kv.get("data"):
@@ -146,26 +149,44 @@ async def copy_messages_from_db(
for idx, item in enumerate(sorted(data, key=custom_sort)):
cid = to_int(item["cid"])
if idx != 0:
- reply_parameters = ReplyParameters() # only send as reply of the first message
+ reply_parameters = None # only send as reply of the first message
logger.debug(f"Copying {item['type']} message: ({cid}, {item['mid']}) -> target_chat={target_chat}")
text = item.get("text") # str or None
if text and kwargs.get("send_from_user"):
text = f"{sender_markdown_to_html(kwargs['send_from_user'])}{text}"
if true(copy_text_msg) and item["type"] == "text":
if text:
- results.append(await client.send_message(chat_id=target_chat, text=text, reply_parameters=reply_parameters))
+ results.append(await client.send_message(chat_id=target_chat, text=text, message_thread_id=tid, reply_parameters=reply_parameters))
else:
db_msg: Message = await client.get_messages(chat_id=cid, message_ids=int(item["mid"]), replies=0) # type: ignore
- results.append(await client.send_message(chat_id=target_chat, text=db_msg.text, reply_parameters=reply_parameters))
+ results.append(await client.send_message(chat_id=target_chat, text=db_msg.text, message_thread_id=tid, reply_parameters=reply_parameters))
elif (
(true(copy_video_msg) and item["type"] == "video")
or (true(copy_photo_msg) and item["type"] == "photo")
or (true(copy_audio_msg) and item["type"] == "audio")
or (true(copy_document_msg) and item["type"] == "document")
):
- results.append(await client.copy_message(chat_id=target_chat, caption=text, from_chat_id=cid, message_id=int(item["mid"]), reply_parameters=reply_parameters)) # type: ignore
+ results.append(
+ await client.copy_message(
+ chat_id=target_chat,
+ caption=text,
+ from_chat_id=cid,
+ message_id=int(item["mid"]),
+ message_thread_id=tid,
+ reply_parameters=reply_parameters,
+ )
+ )
elif true(copy_media_group_msg) and item["type"] == "media_group":
- results.extend(await client.copy_media_group(chat_id=target_chat, captions=text, from_chat_id=cid, message_id=int(item["mid"]), reply_parameters=reply_parameters)) # type: ignore
+ results.extend(
+ await client.copy_media_group(
+ chat_id=target_chat,
+ captions=text,
+ from_chat_id=cid,
+ message_thread_id=tid,
+ message_id=int(item["mid"]),
+ reply_parameters=reply_parameters,
+ )
+ )
else:
logger.warning(f"Unknown message type: {item}")
except Exception as e:
src/messages/sender.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-
+# ty:ignore[invalid-argument-type]
import asyncio
from pathlib import Path
@@ -11,6 +11,7 @@ from pyrogram.parser.markdown import BLOCKQUOTE_DELIM, BLOCKQUOTE_EXPANDABLE_DEL
from pyrogram.types import Message, ReplyParameters
from config import CAPTION_LENGTH, TID
+from messages.parser import get_thread_id
from messages.preprocess import preprocess_media, warp_media_group
from messages.progress import modify_progress, telegram_uploading
from messages.utils import better_blockquote, delete_message, get_reply_to, remove_img_tag, smart_split, summay_media
@@ -22,6 +23,7 @@ async def send2tg(
message: Message,
target_chat: int | str = "",
reply_msg_id: int = 0,
+ thread_id: int = 0,
*,
texts: str = "",
media: list[dict] | None = None,
@@ -45,6 +47,7 @@ async def send2tg(
reply_msg_id (int, optional): If set to integer > 0, the result is sent as a reply message to this message_id.
If set to 0, reply to the trigger message itself.
If set to -1, do not send as a reply message.
+ thread_id (int | None, optional): The thread ID to send the message to. Defaults to None.
texts (str, optional): The texts to send.
media (list[dict], optional): The media files to send.
comments (list[str], optional): The comments to append after texts.
@@ -69,6 +72,7 @@ async def send2tg(
target_chat = kwargs["target_chat"] if kwargs.get("target_chat") else message.chat.id
target_chat = to_int(target_chat)
reply_parameters = get_reply_to(message.id, reply_msg_id)
+ tid = thread_id or get_thread_id(message)
media = media or []
media = await preprocess_media(media)
texts = texts or ""
@@ -83,16 +87,16 @@ async def send2tg(
sent_messages: list[Message | None] = [] # return sent messages
logger.trace(f"Sending {len(media)} media with {len(texts)} texts")
if len(media) == 0:
- return await send_texts(client, target_chat, reply_parameters, texts=texts, cooldown=cooldown)
+ return await send_texts(client, target_chat, reply_parameters, tid, texts=texts, cooldown=cooldown)
if len(media) == 1:
- return await send_single_media(client, target_chat, reply_parameters, media=media[0], texts=texts, cooldown=cooldown, caption_above=caption_above, **kwargs)
+ return await send_single_media(client, target_chat, reply_parameters, tid, media=media[0], texts=texts, cooldown=cooldown, caption_above=caption_above, **kwargs)
caption = (await smart_split(texts, CAPTION_LENGTH))[0]
remaining_texts = texts.removeprefix(caption)
caption = better_blockquote(caption)
if 1 < len(media) <= 10:
group = await warp_media_group(media, caption=caption, caption_above=caption_above)
- sent_messages.extend(await send_media_group(client, target_chat, group, reply_parameters))
+ sent_messages.extend(await send_media_group(client, target_chat, group, reply_parameters, tid))
else: # media > 10
media_chunks = [media[i : i + 10] for i in range(0, len(media), 10)]
num_chunk = len(media_chunks)
@@ -100,15 +104,15 @@ async def send2tg(
for idx, batch in enumerate(media_chunks):
if idx == 0: # first chunk
group = await warp_media_group(batch)
- sent_messages.extend(await send_media_group(client, target_chat, group, reply_parameters))
+ sent_messages.extend(await send_media_group(client, target_chat, group, reply_parameters, tid))
elif idx != num_chunk - 1: # disbale reply if not the last chunk
group = await warp_media_group(batch)
- sent_messages.extend(await send_media_group(client, target_chat, group, ReplyParameters()))
+ sent_messages.extend(await send_media_group(client, target_chat, group, None, tid))
else: # last chunk: media <= 10, add caption here
- sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, texts=caption, media=batch, caption_above=caption_above, cooldown=cooldown, **kwargs))
+ sent_messages.extend(await send2tg(client, message, target_chat, reply_msg_id=-1, thread_id=tid, texts=caption, media=batch, caption_above=caption_above, cooldown=cooldown, **kwargs))
await asyncio.sleep(cooldown)
if remaining_texts:
- sent_messages.extend(await send_texts(client, target_chat, ReplyParameters(), texts=remaining_texts, cooldown=cooldown))
+ sent_messages.extend(await send_texts(client, target_chat, None, tid, texts=remaining_texts, cooldown=cooldown))
# clean up
for x in media:
for key in ["path", "media", "thumb", "audio", "photo", "video"]:
@@ -121,7 +125,8 @@ async def send2tg(
async def send_texts(
client: Client,
target_chat: int | str,
- reply_parameters: ReplyParameters,
+ reply_parameters: ReplyParameters | None,
+ thread_id: int = 0,
*,
texts: str = "",
cooldown: float = 0,
@@ -135,13 +140,13 @@ async def send_texts(
if (f"{BLOCKQUOTE_EXPANDABLE_DELIM}💬" in msg or f"{BLOCKQUOTE_DELIM}💬" in msg) and "💬**点此展开评论区**:" not in msg:
continue
if idx != 0:
- reply_parameters = ReplyParameters()
+ reply_parameters = None
try:
- sent_messages.append(await client.send_message(target_chat, better_blockquote(msg), reply_parameters=reply_parameters))
+ sent_messages.append(await client.send_message(target_chat, better_blockquote(msg), message_thread_id=thread_id, reply_parameters=reply_parameters))
except FloodWait as e:
logger.warning(e)
- await asyncio.sleep(e.value) # type: ignore
- sent_messages.append(await client.send_message(target_chat, better_blockquote(msg), reply_parameters=reply_parameters))
+ await asyncio.sleep(e.value)
+ sent_messages.append(await client.send_message(target_chat, better_blockquote(msg), message_thread_id=thread_id, reply_parameters=reply_parameters))
except Exception as e:
logger.warning(f"send_texts: {e}")
await asyncio.sleep(cooldown)
@@ -151,7 +156,8 @@ async def send_texts(
async def send_single_media(
client: Client,
target_chat: int | str,
- reply_parameters: ReplyParameters,
+ reply_parameters: ReplyParameters | None,
+ thread_id: int = 0,
*,
media: dict,
texts: str = "",
@@ -167,11 +173,12 @@ async def send_single_media(
message = None
try:
if media.get("photo"):
- message = await client.send_photo(chat_id=target_chat, **media, caption=caption, show_caption_above_media=caption_above, reply_parameters=reply_parameters)
+ message = await client.send_photo(chat_id=target_chat, **media, caption=caption, show_caption_above_media=caption_above, reply_parameters=reply_parameters, message_thread_id=thread_id)
elif video := media.get("video"):
message = await client.send_video(
chat_id=target_chat,
reply_parameters=reply_parameters,
+ message_thread_id=thread_id,
caption=caption,
show_caption_above_media=caption_above,
progress=telegram_uploading,
@@ -183,6 +190,7 @@ async def send_single_media(
message = await client.send_audio(
chat_id=target_chat,
reply_parameters=reply_parameters,
+ message_thread_id=thread_id,
caption=caption,
progress=telegram_uploading,
progress_args=(kwargs.get("progress", False), audio, kwargs.get("detail_progress", True)),
@@ -193,6 +201,7 @@ async def send_single_media(
message = await client.send_document(
chat_id=target_chat,
reply_parameters=reply_parameters,
+ message_thread_id=thread_id,
caption=caption,
progress=telegram_uploading,
progress_args=(kwargs.get("progress", False), document, kwargs.get("detail_progress", True)),
@@ -200,13 +209,13 @@ async def send_single_media(
)
except FloodWait as e:
logger.warning(e)
- await asyncio.sleep(e.value) # type: ignore
- return await send_single_media(client, target_chat, reply_parameters, media=media, texts=texts, cooldown=cooldown, **kwargs)
+ await asyncio.sleep(e.value)
+ return await send_single_media(client, target_chat, reply_parameters, thread_id, media=media, texts=texts, cooldown=cooldown, **kwargs)
except Exception as e:
logger.warning(f"send_single_media: {e}")
sent_messages.append(message)
if remaining_texts:
- sent_messages.extend(await send_texts(client, target_chat, ReplyParameters(), texts=remaining_texts, cooldown=cooldown))
+ sent_messages.extend(await send_texts(client, target_chat, None, thread_id, texts=remaining_texts, cooldown=cooldown))
for key in ["path", "thumb", "audio", "photo", "video"]:
if media.get(key) and Path(media[key]).is_file():
@@ -220,6 +229,7 @@ async def send_media_group(
target_chat: int | str,
media_group: list,
reply_parameters: ReplyParameters,
+ thread_id: int = 0,
) -> list[Message]:
"""Send a media group to Telegram.
@@ -233,10 +243,10 @@ async def send_media_group(
if retry > 2:
return []
try:
- return await client.send_media_group(chat_id, media=media_group, reply_parameters=reply_parameters) # type: ignore
+ return await client.send_media_group(chat_id, media=media_group, reply_parameters=reply_parameters, message_thread_id=thread_id)
except FloodWait as e:
logger.warning(e)
- await asyncio.sleep(e.value) # type: ignore
+ await asyncio.sleep(e.value)
except Exception as e:
logger.error(f"Failed to send_media_group: {e}")
return await send(chat_id, media_group, reply_parameters, retry=retry + 1)
@@ -245,10 +255,10 @@ async def send_media_group(
if retry > 2:
return []
try:
- return await client.copy_media_group(to_int(target_chat), from_chat_id=to_int(TID.TEMP), message_id=message_id, reply_parameters=reply_parameters)
+ return await client.copy_media_group(to_int(target_chat), from_chat_id=to_int(TID.TEMP), message_id=message_id, reply_parameters=reply_parameters, message_thread_id=thread_id)
except FloodWait as e:
logger.warning(e)
- await asyncio.sleep(e.value) # type: ignore
+ await asyncio.sleep(e.value)
except Exception as e:
logger.error(f"Failed to copy_media_group: {e}")
return await copy_media_group(message_id, retry=retry + 1)